Files
kmi-ls/prototpyes/01_connect.py

161 lines
4.2 KiB
Python

# %%
import dataclasses as dc
import datetime
import os
import random
from pprint import pprint
import dotenv
import psycopg
import sqlalchemy as sql
dotenv.load_dotenv()
# %%
DB_HOST = os.getenv("POSTGRES_HOST", None)
DB_PORT = os.getenv("POSTGRES_PORT", None)
DB_USER = os.getenv("POSTGRES_USER", None)
DB_PASS = os.getenv("POSTGRESS_PASS", None)
assert DB_HOST
assert DB_PORT
assert DB_USER
assert DB_PASS
RNG = random.Random(42)
CONN_STRING = f"postgresql+psycopg://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/ls"
# conn_info = "dbname=deine_db user=postgres password=dein_passwort host=localhost port=5432"
# %%
metadata = sql.MetaData()
prod_order_t = sql.Table(
"production_order",
metadata,
sql.Column("Suchschluessel", sql.Text, primary_key=True),
sql.Column("Grund_der_Ausloesung_Beschreibung", sql.Text),
sql.Column("Produktionsplan_Name", sql.Text),
sql.Column("Produktionsplan_Suchschluessel", sql.Text),
sql.Column("Status", sql.Text),
sql.Column("Freigabe", sql.Text),
sql.Column("Startdatum", sql.Text),
sql.Column("Enddatum", sql.Text),
sql.Column("Arbeitsgang_Arbeitsanweisung", sql.Text),
sql.Column("Arbeitsgang_Artikelname", sql.Text),
sql.Column("Arbeitsgang_Artikelnummer", sql.Text),
sql.Column("Arbeitsgang_Menge", sql.Text),
sql.Column("Arbeitsgang_Rohmaterial", sql.Text),
sql.Column("ret_forms", sql.Text),
sql.Column("ret_materials", sql.Text),
sql.Column("ret_dim_1", sql.Integer),
sql.Column("Bestelldatum", sql.Text),
sql.Column("Kunden_ID", sql.Text),
sql.Column("Kundenprioritaet", sql.Integer),
sql.Column("order", sql.Integer),
sql.Column("clusters", sql.Integer),
sql.Column("chunk_id", sql.Integer),
sql.Column("prio", sql.Float),
sql.Column("erfassungspunkt", sql.Text),
schema="ls",
)
# %%
@dc.dataclass(slots=True, kw_only=True)
class DbEntry:
Suchschluessel: str
Grund_der_Ausloesung_Beschreibung: str
Produktionsplan_Name: str
Produktionsplan_Suchschluessel: str
Status: str
Freigabe: str
Startdatum: str
Enddatum: str
Arbeitsgang_Arbeitsanweisung: str
Arbeitsgang_Artikelname: str
Arbeitsgang_Artikelnummer: str
Arbeitsgang_Menge: str
Arbeitsgang_Rohmaterial: str
ret_forms: str
ret_materials: str
ret_dim_1: int
Bestelldatum: str
Kunden_ID: str
Kundenprioritaet: int
order: int
clusters: int
chunk_id: int
prio: float
erfassungspunkt: str
def random_prod_no() -> str:
num = RNG.randint(100000, 9999999)
return f"PRO{num}WS"
def random_entry() -> DbEntry:
_prod_no = random_prod_no()
prod_no = f"random__{_prod_no}"
dummy = "DUMMY"
datestr = datetime.datetime.now().date().isoformat()
rand_prio = RNG.randint(1, 10) / 10
return DbEntry(
Suchschluessel=prod_no,
Grund_der_Ausloesung_Beschreibung=dummy,
Produktionsplan_Name=prod_no,
Produktionsplan_Suchschluessel=prod_no,
Status="test",
Freigabe="N",
Startdatum=datestr,
Enddatum=datestr,
Arbeitsgang_Arbeitsanweisung="TEST",
Arbeitsgang_Artikelname="TEST",
Arbeitsgang_Artikelnummer="TEST_ARTIKEL",
Arbeitsgang_Menge="TEST_MENGE",
Arbeitsgang_Rohmaterial="ROHMATERIAL_XYZ",
ret_forms="FORMS",
ret_materials="MATERIALS",
ret_dim_1=1200,
Bestelldatum=datestr,
Kunden_ID=f"KUNDE_{prod_no}",
Kundenprioritaet=2,
order=2,
clusters=3,
chunk_id=4,
prio=rand_prio,
erfassungspunkt="erfassungspunkt_xyz_dummy",
)
# %%
engine = sql.create_engine(CONN_STRING, echo=True)
# %%
stmt = sql.text("SELECT * FROM ls.production_order")
with engine.connect() as conn:
res = conn.execute(stmt)
results = res.all()
# %%
pprint(list(res.keys()))
# for entry in list(res.keys()):
# print(f"{entry}: str")
# %%
NUM_DATAPOINTS = 100
_insert_data = [random_entry() for _ in range(NUM_DATAPOINTS)]
insert_data = [dc.asdict(x) for x in _insert_data]
stmt = sql.insert(prod_order_t)
with engine.begin() as conn:
res = conn.execute(stmt, insert_data)
# %%
stmt = sql.text("TRUNCATE ls.production_order;")
with engine.begin() as conn:
conn.execute(stmt)
# %%
engine.dispose()
# %%