add basic functionality and corresponding tests
This commit is contained in:
25
src/dopt_basics/configs.py
Normal file
25
src/dopt_basics/configs.py
Normal file
@@ -0,0 +1,25 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import tomllib
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
def load_toml(
|
||||
path_to_toml: str | Path,
|
||||
print_success: bool = False,
|
||||
) -> dict[str, Any]:
|
||||
if isinstance(path_to_toml, str):
|
||||
path_to_toml = Path(path_to_toml)
|
||||
|
||||
if not path_to_toml.exists():
|
||||
raise FileNotFoundError(f"Config file seems not to exist under: >{path_to_toml}<")
|
||||
path_to_toml = path_to_toml.with_suffix(".toml")
|
||||
|
||||
with open(path_to_toml, "rb") as f:
|
||||
data = tomllib.load(f)
|
||||
|
||||
if print_success: # pragma: no cover
|
||||
print("Loaded TOML config file successfully.", flush=True)
|
||||
|
||||
return data
|
||||
79
src/dopt_basics/datastructures.py
Normal file
79
src/dopt_basics/datastructures.py
Normal file
@@ -0,0 +1,79 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Iterator, MutableMapping
|
||||
from typing import Any, TypeAlias, TypeVar
|
||||
|
||||
FlattableObject: TypeAlias = (
|
||||
list["FlattableObject | Any"]
|
||||
| tuple["FlattableObject | Any", ...]
|
||||
| set["FlattableObject | Any"]
|
||||
)
|
||||
|
||||
K = TypeVar("K")
|
||||
V = TypeVar("V")
|
||||
|
||||
|
||||
def flatten(
|
||||
obj: FlattableObject,
|
||||
) -> Iterator[Any]:
|
||||
"""flattens an arbitrarily nested list or tuple
|
||||
|
||||
Parameters
|
||||
----------
|
||||
obj : FlattableObject
|
||||
arbitrarily nested list, tuple, set
|
||||
|
||||
Yields
|
||||
------
|
||||
Iterator[Any]
|
||||
elements of the non-nested list, tuple, set
|
||||
"""
|
||||
for x in obj:
|
||||
# only flatten lists and tuples
|
||||
if isinstance(x, (list, tuple, set)):
|
||||
yield from flatten(x)
|
||||
else:
|
||||
yield x
|
||||
|
||||
|
||||
class DualDict(MutableMapping[K, V]):
|
||||
def __init__(self, **kwargs: V):
|
||||
self._store: dict[K, V] = dict(**kwargs)
|
||||
self._inverted = self._calc_inverted()
|
||||
|
||||
def __str__(self) -> str:
|
||||
return str(self._store) + str(self._inverted)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return self.__str__()
|
||||
|
||||
@property
|
||||
def inverted(self) -> dict[V, K]:
|
||||
return self._inverted
|
||||
|
||||
def _calc_inverted(self) -> dict[V, K]:
|
||||
invert = {val: key for key, val in self._store.items()}
|
||||
if len(invert) != len(self._store):
|
||||
raise ValueError("DualDict does not support identical values")
|
||||
return invert
|
||||
|
||||
def __setitem__(self, key: K, value: V) -> None:
|
||||
self._store[key] = value
|
||||
self._inverted = self._calc_inverted()
|
||||
|
||||
def __getitem__(self, key: K) -> V:
|
||||
return self._store[key]
|
||||
|
||||
def __delitem__(self, key: K) -> None:
|
||||
del self._store[key]
|
||||
self._inverted = self._calc_inverted()
|
||||
|
||||
def __iter__(self) -> Iterator[K]:
|
||||
return iter(self._store)
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self._store)
|
||||
|
||||
def update(self, **kwargs: V) -> None:
|
||||
self._store.update(**kwargs)
|
||||
self._inverted = self._calc_inverted()
|
||||
234
src/dopt_basics/datetime.py
Normal file
234
src/dopt_basics/datetime.py
Normal file
@@ -0,0 +1,234 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import enum
|
||||
import zoneinfo as tz_info
|
||||
from datetime import datetime as Datetime
|
||||
from datetime import timedelta as Timedelta
|
||||
from datetime import timezone as Timezone
|
||||
from datetime import tzinfo as TZInfo
|
||||
from typing import Final
|
||||
|
||||
from dopt_basics.enums import enum_str_values_as_frzset
|
||||
|
||||
|
||||
class TimeUnitsDatetime(enum.StrEnum):
|
||||
YEAR = enum.auto()
|
||||
MONTH = enum.auto()
|
||||
DAY = enum.auto()
|
||||
HOUR = enum.auto()
|
||||
MINUTE = enum.auto()
|
||||
SECOND = enum.auto()
|
||||
MICROSECOND = enum.auto()
|
||||
|
||||
|
||||
class TimeUnitsTimedelta(enum.StrEnum):
|
||||
WEEKS = enum.auto()
|
||||
DAYS = enum.auto()
|
||||
HOURS = enum.auto()
|
||||
MINUTES = enum.auto()
|
||||
SECONDS = enum.auto()
|
||||
MILLISECONDS = enum.auto()
|
||||
MICROSECONDS = enum.auto()
|
||||
|
||||
|
||||
TIMEZONE_CEST: Final[tz_info.ZoneInfo] = tz_info.ZoneInfo("Europe/Berlin")
|
||||
TIMEZONE_UTC: Final[Timezone] = Timezone.utc
|
||||
|
||||
|
||||
def get_timestamp(
|
||||
tz: TZInfo = TIMEZONE_UTC,
|
||||
with_time: bool = False,
|
||||
) -> str:
|
||||
dt = current_time_tz(tz)
|
||||
if with_time:
|
||||
return dt.strftime(r"%Y-%m-%d--%H-%M-%S")
|
||||
return dt.strftime(r"%Y-%m-%d")
|
||||
|
||||
|
||||
def timedelta_from_val(
|
||||
val: float,
|
||||
time_unit: TimeUnitsTimedelta,
|
||||
) -> Timedelta:
|
||||
"""create Python timedelta object by choosing time value and time unit
|
||||
|
||||
Parameters
|
||||
----------
|
||||
val : float
|
||||
duration
|
||||
time_unit : str
|
||||
target time unit
|
||||
|
||||
Returns
|
||||
-------
|
||||
Timedelta
|
||||
timedelta object corresponding to the given values
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
if chosen time unit not implemented
|
||||
"""
|
||||
try:
|
||||
TimeUnitsTimedelta(time_unit)
|
||||
except ValueError:
|
||||
allowed_time_units = enum_str_values_as_frzset(TimeUnitsTimedelta)
|
||||
raise ValueError(
|
||||
f"Time unit >>{time_unit}<< not supported. Choose from {allowed_time_units}"
|
||||
)
|
||||
else:
|
||||
kwargs = {time_unit: val}
|
||||
return Timedelta(**kwargs)
|
||||
|
||||
|
||||
def dt_with_tz_UTC(
|
||||
*args,
|
||||
**kwargs,
|
||||
) -> Datetime:
|
||||
return Datetime(*args, **kwargs, tzinfo=TIMEZONE_UTC)
|
||||
|
||||
|
||||
def round_td_by_seconds(
|
||||
td: Timedelta,
|
||||
round_to_next_seconds: int = 1,
|
||||
) -> Timedelta:
|
||||
"""round timedelta object to the next full defined seconds
|
||||
|
||||
Parameters
|
||||
----------
|
||||
td : Timedelta
|
||||
timedelta object to be rounded
|
||||
round_to_next_seconds : int, optional
|
||||
number of seconds to round to, by default 1
|
||||
|
||||
Returns
|
||||
-------
|
||||
Timedelta
|
||||
rounded timedelta object
|
||||
"""
|
||||
total_seconds = td.total_seconds()
|
||||
rounded_seconds = round(total_seconds / round_to_next_seconds) * round_to_next_seconds
|
||||
return Timedelta(seconds=rounded_seconds)
|
||||
|
||||
|
||||
def current_time_tz(
|
||||
tz: TZInfo = TIMEZONE_UTC,
|
||||
cut_microseconds: bool = False,
|
||||
) -> Datetime:
|
||||
"""current time as datetime object with
|
||||
associated time zone information (UTC by default)
|
||||
|
||||
Parameters
|
||||
----------
|
||||
tz : TZInfo, optional
|
||||
time zone information, by default TIMEZONE_UTC
|
||||
|
||||
Returns
|
||||
-------
|
||||
Datetime
|
||||
datetime object with corresponding time zone
|
||||
"""
|
||||
if cut_microseconds:
|
||||
return Datetime.now(tz=tz).replace(microsecond=0)
|
||||
else:
|
||||
return Datetime.now(tz=tz)
|
||||
|
||||
|
||||
def add_timedelta_with_tz(
|
||||
starting_dt: Datetime,
|
||||
td: Timedelta,
|
||||
) -> Datetime:
|
||||
"""time-zone-aware calculation of an end point in time
|
||||
with a given timedelta
|
||||
|
||||
Parameters
|
||||
----------
|
||||
starting_dt : Datetime
|
||||
starting point in time
|
||||
td : Timedelta
|
||||
duration as timedelta object
|
||||
|
||||
Returns
|
||||
-------
|
||||
Datetime
|
||||
time-zone-aware end point
|
||||
"""
|
||||
|
||||
if starting_dt.tzinfo is None:
|
||||
# no time zone information
|
||||
raise ValueError("The provided starting date does not contain time zone information.")
|
||||
else:
|
||||
# obtain time zone information from starting datetime object
|
||||
tz_info = starting_dt.tzinfo
|
||||
|
||||
# transform starting point in time to utc
|
||||
dt_utc = starting_dt.astimezone(TIMEZONE_UTC)
|
||||
# all calculations are done in UTC
|
||||
# add duration
|
||||
ending_dt_utc = dt_utc + td
|
||||
# transform back to previous time zone
|
||||
ending_dt = ending_dt_utc.astimezone(tz=tz_info)
|
||||
|
||||
return ending_dt
|
||||
|
||||
|
||||
def validate_dt_UTC(
|
||||
dt: Datetime,
|
||||
) -> None:
|
||||
"""validates if datetime object is timezone-aware and references
|
||||
UTC time
|
||||
|
||||
Parameters
|
||||
----------
|
||||
dt : Datetime
|
||||
datetime object to be checked for available UTC time zone
|
||||
information
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
if no UTC time zone information is found
|
||||
"""
|
||||
|
||||
if dt.tzinfo != TIMEZONE_UTC:
|
||||
raise ValueError(
|
||||
f"Datetime object {dt} does not contain necessary UTC time zone information"
|
||||
)
|
||||
|
||||
|
||||
def dt_to_timezone(
|
||||
dt: Datetime,
|
||||
target_tz: TZInfo = TIMEZONE_CEST,
|
||||
) -> Datetime:
|
||||
"""convert a datetime object from one timezone to another
|
||||
|
||||
Parameters
|
||||
----------
|
||||
dt : Datetime
|
||||
datetime with time zone information
|
||||
target_tz : TZInfo, optional
|
||||
target time zone information, by default TIMEZONE_CEST
|
||||
|
||||
Returns
|
||||
-------
|
||||
Datetime
|
||||
datetime object adjusted to given local time zone
|
||||
|
||||
Raises
|
||||
------
|
||||
RuntimeError
|
||||
if datetime object does not contain time zone information
|
||||
"""
|
||||
|
||||
if dt.tzinfo is None:
|
||||
# no time zone information
|
||||
raise ValueError("The provided starting date does not contain time zone information.")
|
||||
# transform to given target time zone
|
||||
dt_local_tz = dt.astimezone(tz=target_tz)
|
||||
|
||||
return dt_local_tz
|
||||
|
||||
|
||||
def cut_dt_microseconds(
|
||||
dt: Datetime,
|
||||
) -> Datetime:
|
||||
return dt.replace(microsecond=0)
|
||||
22
src/dopt_basics/enums.py
Normal file
22
src/dopt_basics/enums.py
Normal file
@@ -0,0 +1,22 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from enum import StrEnum
|
||||
from typing import Type
|
||||
|
||||
|
||||
def enum_str_values_as_frzset(
|
||||
enum_class: Type[StrEnum],
|
||||
) -> frozenset[str]:
|
||||
"""returns the values of an StrEnum class as a frozenset
|
||||
|
||||
Parameters
|
||||
----------
|
||||
enum_cls : Any
|
||||
Enum class
|
||||
|
||||
Returns
|
||||
-------
|
||||
frozenset
|
||||
values of the Enum class
|
||||
"""
|
||||
return frozenset(val.value for val in enum_class)
|
||||
166
src/dopt_basics/paths.py
Normal file
166
src/dopt_basics/paths.py
Normal file
@@ -0,0 +1,166 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import shutil
|
||||
from collections.abc import Sequence
|
||||
from pathlib import Path
|
||||
|
||||
from dopt_basics.datetime import TIMEZONE_CEST, get_timestamp
|
||||
|
||||
|
||||
def create_folder(
|
||||
path: Path,
|
||||
delete_existing: bool = False,
|
||||
) -> None:
|
||||
if delete_existing and path.exists():
|
||||
shutil.rmtree(path)
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
def prepare_save_path(
|
||||
root_folder: Path,
|
||||
dirs: Sequence[str] | None,
|
||||
filename: str | None,
|
||||
suffix: str | None,
|
||||
include_timestamp: bool = False,
|
||||
create_folder: bool = False,
|
||||
) -> Path:
|
||||
if not any((dirs, filename, suffix)):
|
||||
raise ValueError("Dirs or filename must be provided")
|
||||
if not (
|
||||
all(x is None for x in (filename, suffix))
|
||||
or all(x is not None for x in (filename, suffix))
|
||||
):
|
||||
raise ValueError("Filename and suffix must be provided together")
|
||||
if include_timestamp and filename is None:
|
||||
raise ValueError("Timestamp only with filename")
|
||||
|
||||
folders: str = ""
|
||||
if dirs is not None:
|
||||
folders = "/".join(dirs)
|
||||
filename = "" if filename is None else filename
|
||||
if include_timestamp:
|
||||
timestamp = get_timestamp(tz=TIMEZONE_CEST, with_time=True)
|
||||
filename = f"{timestamp}_{filename}"
|
||||
|
||||
if suffix is None:
|
||||
suffix = ""
|
||||
elif suffix is not None and suffix == ".":
|
||||
raise ValueError("Suffix can not be just dot.")
|
||||
elif suffix is not None and not suffix.startswith("."):
|
||||
suffix = f".{suffix}"
|
||||
|
||||
pth_parent = (root_folder / folders).resolve()
|
||||
if create_folder and not pth_parent.exists():
|
||||
pth_parent.mkdir(parents=True)
|
||||
|
||||
return (pth_parent / filename).with_suffix(suffix)
|
||||
|
||||
|
||||
def search_cwd(
|
||||
glob_pattern: str,
|
||||
) -> Path | None:
|
||||
"""Searches the current working directory and looks for files
|
||||
matching the glob pattern.
|
||||
Returns the first match encountered.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
glob_pattern : str, optional
|
||||
pattern to look for, first match will be returned
|
||||
|
||||
Returns
|
||||
-------
|
||||
Path | None
|
||||
Path if corresponding object was found, None otherwise
|
||||
"""
|
||||
path_found: Path | None = None
|
||||
res = tuple(Path.cwd().glob(glob_pattern))
|
||||
if res:
|
||||
path_found = res[0]
|
||||
|
||||
return path_found
|
||||
|
||||
|
||||
def search_file_iterative(
|
||||
starting_path: Path,
|
||||
glob_pattern: str,
|
||||
stop_folder_name: str | None = None,
|
||||
) -> Path | None:
|
||||
"""Iteratively searches the parent directories of the starting path
|
||||
and look for files matching the glob pattern. The starting path is not
|
||||
searched, only its parents. Therefore the starting path can also point
|
||||
to a file. The folder in which it is placed in will be searched.
|
||||
Returns the first match encountered.
|
||||
The parent of the stop folder will be searched if it exists.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
starting_path : Path
|
||||
non-inclusive starting path
|
||||
glob_pattern : str, optional
|
||||
pattern to look for, first match will be returned
|
||||
stop_folder_name : str, optional
|
||||
name of the last folder in the directory tree where search should stop
|
||||
(parent is searched), by default None
|
||||
|
||||
Returns
|
||||
-------
|
||||
Path | None
|
||||
Path if corresponding object was found, None otherwise
|
||||
"""
|
||||
file_path: Path | None = None
|
||||
stop_folder_reached: bool = False
|
||||
for search_path in starting_path.parents:
|
||||
res = tuple(search_path.glob(glob_pattern))
|
||||
if res:
|
||||
file_path = res[0]
|
||||
break
|
||||
elif stop_folder_reached:
|
||||
break
|
||||
|
||||
if stop_folder_name is not None and search_path.name == stop_folder_name:
|
||||
# library is placed inside a whole python installation for deployment
|
||||
# if this folder is reached, only look up one parent above
|
||||
stop_folder_reached = True
|
||||
|
||||
return file_path
|
||||
|
||||
|
||||
def search_folder_path(
|
||||
starting_path: Path,
|
||||
stop_folder_name: str | None = None,
|
||||
) -> Path | None:
|
||||
"""Iteratively searches the parent directories of the starting path
|
||||
and look for folders matching the given name. If a match is encountered,
|
||||
the parent path will be returned.
|
||||
|
||||
Example:
|
||||
starting_path = path/to/start/folder
|
||||
stop_folder_name = 'to'
|
||||
returned path = 'path/'
|
||||
|
||||
Parameters
|
||||
----------
|
||||
starting_path : Path
|
||||
non-inclusive starting path
|
||||
stop_folder_name : str, optional
|
||||
name of the last folder in the directory tree to search, by default None
|
||||
|
||||
Returns
|
||||
-------
|
||||
Path | None
|
||||
Path if corresponding base path was found, None otherwise
|
||||
"""
|
||||
stop_folder_path: Path | None = None
|
||||
base_path: Path | None = None
|
||||
for search_path in starting_path.parents:
|
||||
if stop_folder_name is not None and search_path.name == stop_folder_name:
|
||||
# library is placed inside a whole python installation for deployment
|
||||
# only look up to this folder
|
||||
stop_folder_path = search_path
|
||||
break
|
||||
|
||||
if stop_folder_path is not None:
|
||||
base_path = stop_folder_path.parent
|
||||
|
||||
return base_path
|
||||
Reference in New Issue
Block a user