generated from dopt-python/py311
161 lines
4.2 KiB
Python
161 lines
4.2 KiB
Python
# %%
|
|
import dataclasses as dc
|
|
import datetime
|
|
import os
|
|
import random
|
|
from pprint import pprint
|
|
|
|
import dotenv
|
|
import psycopg
|
|
import sqlalchemy as sql
|
|
|
|
dotenv.load_dotenv()
|
|
|
|
# %%
|
|
DB_HOST = os.getenv("POSTGRES_HOST", None)
|
|
DB_PORT = os.getenv("POSTGRES_PORT", None)
|
|
DB_USER = os.getenv("POSTGRES_USER", None)
|
|
DB_PASS = os.getenv("POSTGRESS_PASS", None)
|
|
assert DB_HOST
|
|
assert DB_PORT
|
|
assert DB_USER
|
|
assert DB_PASS
|
|
|
|
RNG = random.Random(42)
|
|
CONN_STRING = f"postgresql+psycopg://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/ls"
|
|
# conn_info = "dbname=deine_db user=postgres password=dein_passwort host=localhost port=5432"
|
|
|
|
# %%
|
|
metadata = sql.MetaData()
|
|
prod_order_t = sql.Table(
|
|
"production_order",
|
|
metadata,
|
|
sql.Column("Suchschluessel", sql.Text, primary_key=True),
|
|
sql.Column("Grund_der_Ausloesung_Beschreibung", sql.Text),
|
|
sql.Column("Produktionsplan_Name", sql.Text),
|
|
sql.Column("Produktionsplan_Suchschluessel", sql.Text),
|
|
sql.Column("Status", sql.Text),
|
|
sql.Column("Freigabe", sql.Text),
|
|
sql.Column("Startdatum", sql.Text),
|
|
sql.Column("Enddatum", sql.Text),
|
|
sql.Column("Arbeitsgang_Arbeitsanweisung", sql.Text),
|
|
sql.Column("Arbeitsgang_Artikelname", sql.Text),
|
|
sql.Column("Arbeitsgang_Artikelnummer", sql.Text),
|
|
sql.Column("Arbeitsgang_Menge", sql.Text),
|
|
sql.Column("Arbeitsgang_Rohmaterial", sql.Text),
|
|
sql.Column("ret_forms", sql.Text),
|
|
sql.Column("ret_materials", sql.Text),
|
|
sql.Column("ret_dim_1", sql.Integer),
|
|
sql.Column("Bestelldatum", sql.Text),
|
|
sql.Column("Kunden_ID", sql.Text),
|
|
sql.Column("Kundenprioritaet", sql.Integer),
|
|
sql.Column("order", sql.Integer),
|
|
sql.Column("clusters", sql.Integer),
|
|
sql.Column("chunk_id", sql.Integer),
|
|
sql.Column("prio", sql.Float),
|
|
sql.Column("erfassungspunkt", sql.Text),
|
|
schema="ls",
|
|
)
|
|
|
|
|
|
# %%
|
|
@dc.dataclass(slots=True, kw_only=True)
|
|
class DbEntry:
|
|
Suchschluessel: str
|
|
Grund_der_Ausloesung_Beschreibung: str
|
|
Produktionsplan_Name: str
|
|
Produktionsplan_Suchschluessel: str
|
|
Status: str
|
|
Freigabe: str
|
|
Startdatum: str
|
|
Enddatum: str
|
|
Arbeitsgang_Arbeitsanweisung: str
|
|
Arbeitsgang_Artikelname: str
|
|
Arbeitsgang_Artikelnummer: str
|
|
Arbeitsgang_Menge: str
|
|
Arbeitsgang_Rohmaterial: str
|
|
ret_forms: str
|
|
ret_materials: str
|
|
ret_dim_1: int
|
|
Bestelldatum: str
|
|
Kunden_ID: str
|
|
Kundenprioritaet: int
|
|
order: int
|
|
clusters: int
|
|
chunk_id: int
|
|
prio: float
|
|
erfassungspunkt: str
|
|
|
|
|
|
def random_prod_no() -> str:
|
|
num = RNG.randint(100000, 9999999)
|
|
return f"PRO{num}WS"
|
|
|
|
|
|
def random_entry() -> DbEntry:
|
|
_prod_no = random_prod_no()
|
|
prod_no = f"random__{_prod_no}"
|
|
dummy = "DUMMY"
|
|
datestr = datetime.datetime.now().date().isoformat()
|
|
rand_prio = RNG.randint(1, 10) / 10
|
|
|
|
return DbEntry(
|
|
Suchschluessel=prod_no,
|
|
Grund_der_Ausloesung_Beschreibung=dummy,
|
|
Produktionsplan_Name=prod_no,
|
|
Produktionsplan_Suchschluessel=prod_no,
|
|
Status="test",
|
|
Freigabe="N",
|
|
Startdatum=datestr,
|
|
Enddatum=datestr,
|
|
Arbeitsgang_Arbeitsanweisung="TEST",
|
|
Arbeitsgang_Artikelname="TEST",
|
|
Arbeitsgang_Artikelnummer="TEST_ARTIKEL",
|
|
Arbeitsgang_Menge="TEST_MENGE",
|
|
Arbeitsgang_Rohmaterial="ROHMATERIAL_XYZ",
|
|
ret_forms="FORMS",
|
|
ret_materials="MATERIALS",
|
|
ret_dim_1=1200,
|
|
Bestelldatum=datestr,
|
|
Kunden_ID=f"KUNDE_{prod_no}",
|
|
Kundenprioritaet=2,
|
|
order=2,
|
|
clusters=3,
|
|
chunk_id=4,
|
|
prio=rand_prio,
|
|
erfassungspunkt="erfassungspunkt_xyz_dummy",
|
|
)
|
|
|
|
|
|
# %%
|
|
engine = sql.create_engine(CONN_STRING, echo=True)
|
|
# %%
|
|
stmt = sql.text("SELECT * FROM ls.production_order")
|
|
with engine.connect() as conn:
|
|
res = conn.execute(stmt)
|
|
|
|
results = res.all()
|
|
# %%
|
|
pprint(list(res.keys()))
|
|
# for entry in list(res.keys()):
|
|
# print(f"{entry}: str")
|
|
# %%
|
|
NUM_DATAPOINTS = 100
|
|
_insert_data = [random_entry() for _ in range(NUM_DATAPOINTS)]
|
|
insert_data = [dc.asdict(x) for x in _insert_data]
|
|
|
|
stmt = sql.insert(prod_order_t)
|
|
|
|
with engine.begin() as conn:
|
|
res = conn.execute(stmt, insert_data)
|
|
|
|
# %%
|
|
stmt = sql.text("TRUNCATE ls.production_order;")
|
|
|
|
with engine.begin() as conn:
|
|
conn.execute(stmt)
|
|
|
|
# %%
|
|
engine.dispose()
|
|
# %%
|