203 lines
7.7 KiB
Python

import os
import re
import sys
import time
import traceback
from pathlib import Path
from KSG_anomaly_detection import config, config_for_test, delegator
from KSG_anomaly_detection.preparation import Preparation
from KSG_anomaly_detection.window_manager import WindowManager
# Identifikation aller Unterordner jeweils in Verifizierstation_1 und Verifizierstation_2, eindeutige Lose
def get_third_level_subfolders(path):
seen_basenames = set()
result = set()
pattern = re.compile(r"^(.*)_\d+$")
for level1 in os.listdir(path):
level1_path = os.path.join(path, level1)
if not os.path.isdir(level1_path):
continue
if not level1 >= config.START_DATE:
continue
for level2 in os.listdir(level1_path):
level2_path = os.path.join(level1_path, level2)
if not os.path.isdir(level2_path):
continue
for level3 in os.listdir(level2_path):
level3_path = os.path.join(level2_path, level3)
if not os.path.isdir(level3_path):
continue
match = pattern.match(level3)
if match:
base_name = level3
if base_name not in seen_basenames:
seen_basenames.add(base_name)
base_path = os.path.join(level2_path, base_name)
result.add(base_path)
return result
# zur Identifikation, ob wir uns in Verifizierstation_1 und Verifizierstation_2 befinden
def get_first_level_name(folder_path):
# returns the name of the immediate parent folder (e.g., V1 or V2)
return Path(folder_path).parts[-4]
def is_entire_folder_unchanged(folder_path, threshold_seconds=300):
now = time.time()
for root, dirs, files in os.walk(folder_path):
for entry in dirs + files:
try:
entry_path = os.path.join(root, entry)
mtime = os.path.getmtime(entry_path)
if now - mtime < threshold_seconds:
return False
except FileNotFoundError:
continue
return True
def are_all_matching_folders_unchanged(base_path, name_substring, threshold_seconds):
for root, dirs, _ in os.walk(base_path):
for d in dirs:
if name_substring in d:
folder_path = os.path.join(root, d)
if not is_entire_folder_unchanged(folder_path, threshold_seconds):
return False
return True
# Hauptfunktion: Check auf neue Ordner und ggf. Auslösen des KI-Algorithmus
def monitor_folder(manager: WindowManager):
print("starting thread...")
while True:
for folder in config_for_test.FOLDER_LIST:
try:
if are_all_matching_folders_unchanged(
os.path.dirname(folder),
os.path.basename(folder),
threshold_seconds=config.AGE_THRESHOLD,
):
# prüfen, ob Verifizierstation_1 oder Verifizierstation_2
first_level = get_first_level_name(folder)
# ein zu Verifizierstation_1 zugehöriger Ordner und KI-Algorithmus soll durchgeführt werden
if first_level == config.V_1 and manager.get_checkbox_state_v1():
# Vorbereitung
preparation = Preparation(folder)
# Aufgabe 2: NGT und check_img von Originalordner in KI kopieren
# Rückgabewert: Ordner Fileserver/KI auf dem Fileserver, wo dann die Heatmaps abgelegt werden
file_ki_folder, result = preparation.copy_ngt_and_checkimg()
if result: # d. h. Fehler ist aufgetreten
continue # zu nächstem neuen folder springen
# Aufgabe 3: check_img im Originalordner anpassen (d. h. gelbe Farbe: work in progress)
preparation.change_image_to_yellow()
# Aufgabe 4: AOI-Bilder in RGB überführen und zwischenspeichern
# wir erhalten hier den Speicherort sowie ggf. Fehlermeldungen zurück
current_folder, result = preparation.create_rgb_images_and_patches()
print("finished routine")
if result is not None:
print(result)
continue
except Exception as e:
tb = traceback.extract_tb(e.__traceback__)
no = tb[-1].lineno
print(e, no)
time.sleep(60)
def monitor_folder_simple(mp_pool: delegator.MPPool, use_new: bool, use_mp: bool):
print("starting procedure...")
for folder in config_for_test.FOLDER_LIST:
# try:
if are_all_matching_folders_unchanged(
os.path.dirname(folder),
os.path.basename(folder),
threshold_seconds=config.AGE_THRESHOLD,
):
# prüfen, ob Verifizierstation_1 oder Verifizierstation_2
first_level = get_first_level_name(folder)
# ein zu Verifizierstation_1 zugehöriger Ordner und KI-Algorithmus soll durchgeführt werden
if first_level == config.V_1:
# Vorbereitung
preparation = Preparation(folder)
# Aufgabe 2: NGT und check_img von Originalordner in KI kopieren
# Rückgabewert: Ordner Fileserver/KI auf dem Fileserver, wo dann die Heatmaps abgelegt werden
print("'copy_ngt_and_checkimg'...")
if use_new:
file_ki_folder, result = preparation.copy_ngt_and_checkimg_new()
else:
file_ki_folder, result = preparation.copy_ngt_and_checkimg()
if result: # d. h. Fehler ist aufgetreten
continue # zu nächstem neuen folder springen
# Aufgabe 3: check_img im Originalordner anpassen (d. h. gelbe Farbe: work in progress)
SKIP_NEXT = False
if not SKIP_NEXT:
print("'change_image_to_yellow'...")
if use_new:
preparation.change_image_to_yellow_new()
else:
preparation.change_image_to_yellow()
# sys.exit(0)
# Aufgabe 4: AOI-Bilder in RGB überführen und zwischenspeichern
# wir erhalten hier den Speicherort sowie ggf. Fehlermeldungen zurück
SKIP_NEXT = False
if not use_mp and not SKIP_NEXT:
print("'create_rgb_images_and_patches'...")
if use_new:
current_folder, result = (
preparation.create_rgb_images_and_patches_new()
)
else:
current_folder, result = preparation.create_rgb_images_and_patches()
SKIP_NEXT = False
if use_mp and not SKIP_NEXT:
print("'create_rgb_images_and_patches' multiprocessing...")
if use_new:
current_folder, result = (
preparation.create_rgb_images_and_patches_new2(mp_pool)
)
else:
current_folder, result = preparation.create_rgb_images_and_patches()
print("finished routine")
if result is not None:
print(result)
continue
# except Exception as e:
# tb = traceback.extract_tb(e.__traceback__)
# no = tb[-1].lineno
# print(e, no)
# time.sleep(60)