# %% import dataclasses as dc import datetime import os import random from pprint import pprint import dotenv import psycopg import sqlalchemy as sql dotenv.load_dotenv() # %% DB_HOST = os.getenv("POSTGRES_HOST", None) DB_PORT = os.getenv("POSTGRES_PORT", None) DB_USER = os.getenv("POSTGRES_USER", None) DB_PASS = os.getenv("POSTGRESS_PASS", None) assert DB_HOST assert DB_PORT assert DB_USER assert DB_PASS RNG = random.Random(42) CONN_STRING = f"postgresql+psycopg://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/ls" # conn_info = "dbname=deine_db user=postgres password=dein_passwort host=localhost port=5432" # %% metadata = sql.MetaData() prod_order_t = sql.Table( "production_order", metadata, sql.Column("Suchschluessel", sql.Text, primary_key=True), sql.Column("Grund_der_Ausloesung_Beschreibung", sql.Text), sql.Column("Produktionsplan_Name", sql.Text), sql.Column("Produktionsplan_Suchschluessel", sql.Text), sql.Column("Status", sql.Text), sql.Column("Freigabe", sql.Text), sql.Column("Startdatum", sql.Text), sql.Column("Enddatum", sql.Text), sql.Column("Arbeitsgang_Arbeitsanweisung", sql.Text), sql.Column("Arbeitsgang_Artikelname", sql.Text), sql.Column("Arbeitsgang_Artikelnummer", sql.Text), sql.Column("Arbeitsgang_Menge", sql.Text), sql.Column("Arbeitsgang_Rohmaterial", sql.Text), sql.Column("ret_forms", sql.Text), sql.Column("ret_materials", sql.Text), sql.Column("ret_dim_1", sql.Integer), sql.Column("Bestelldatum", sql.Text), sql.Column("Kunden_ID", sql.Text), sql.Column("Kundenprioritaet", sql.Integer), sql.Column("order", sql.Integer), sql.Column("clusters", sql.Integer), sql.Column("chunk_id", sql.Integer), sql.Column("prio", sql.Float), sql.Column("erfassungspunkt", sql.Text), schema="ls", ) # %% @dc.dataclass(slots=True, kw_only=True) class DbEntry: Suchschluessel: str Grund_der_Ausloesung_Beschreibung: str Produktionsplan_Name: str Produktionsplan_Suchschluessel: str Status: str Freigabe: str Startdatum: str Enddatum: str Arbeitsgang_Arbeitsanweisung: str Arbeitsgang_Artikelname: str Arbeitsgang_Artikelnummer: str Arbeitsgang_Menge: str Arbeitsgang_Rohmaterial: str ret_forms: str ret_materials: str ret_dim_1: int Bestelldatum: str Kunden_ID: str Kundenprioritaet: int order: int clusters: int chunk_id: int prio: float erfassungspunkt: str def random_prod_no() -> str: num = RNG.randint(100000, 9999999) return f"PRO{num}WS" def random_entry() -> DbEntry: _prod_no = random_prod_no() prod_no = f"random__{_prod_no}" dummy = "DUMMY" datestr = datetime.datetime.now().date().isoformat() rand_prio = RNG.randint(1, 10) / 10 return DbEntry( Suchschluessel=prod_no, Grund_der_Ausloesung_Beschreibung=dummy, Produktionsplan_Name=prod_no, Produktionsplan_Suchschluessel=prod_no, Status="test", Freigabe="N", Startdatum=datestr, Enddatum=datestr, Arbeitsgang_Arbeitsanweisung="TEST", Arbeitsgang_Artikelname="TEST", Arbeitsgang_Artikelnummer="TEST_ARTIKEL", Arbeitsgang_Menge="TEST_MENGE", Arbeitsgang_Rohmaterial="ROHMATERIAL_XYZ", ret_forms="FORMS", ret_materials="MATERIALS", ret_dim_1=1200, Bestelldatum=datestr, Kunden_ID=f"KUNDE_{prod_no}", Kundenprioritaet=2, order=2, clusters=3, chunk_id=4, prio=rand_prio, erfassungspunkt="erfassungspunkt_xyz_dummy", ) # %% engine = sql.create_engine(CONN_STRING, echo=True) # %% stmt = sql.text("SELECT * FROM ls.production_order") with engine.connect() as conn: res = conn.execute(stmt) results = res.all() # %% pprint(list(res.keys())) # for entry in list(res.keys()): # print(f"{entry}: str") # %% NUM_DATAPOINTS = 100 _insert_data = [random_entry() for _ in range(NUM_DATAPOINTS)] insert_data = [dc.asdict(x) for x in _insert_data] stmt = sql.insert(prod_order_t) with engine.begin() as conn: res = conn.execute(stmt, insert_data) # %% stmt = sql.text("TRUNCATE ls.production_order;") with engine.begin() as conn: conn.execute(stmt) # %% engine.dispose() # %%