From 1f28683d2068e7384a25a62302f883eb63981770 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20F=C3=B6rster?= Date: Fri, 14 Mar 2025 18:08:07 +0100 Subject: [PATCH] begin refactoring --- src/pycage/download.py | 221 +++++++++++++++++++++++++++++++++++++++++ src/pycage/types.py | 15 +++ 2 files changed, 236 insertions(+) create mode 100644 src/pycage/download.py create mode 100644 src/pycage/types.py diff --git a/src/pycage/download.py b/src/pycage/download.py new file mode 100644 index 0000000..d780215 --- /dev/null +++ b/src/pycage/download.py @@ -0,0 +1,221 @@ +import os +import shutil +import stat +import tarfile +import warnings +from pathlib import Path +from typing import Any, Final + +import msgspec +import requests +from dopt_basics import configs +from dopt_basics.io import combine_route, prepare_path +from requests.exceptions import HTTPError + +from pycage.types import JsonMetadata, RetrievalInfo + + +class LazyConfigLoader: + def __init__( + self, + cfg_path: Path, + ) -> None: + self.cfg_path = cfg_path + self._cfg: dict[str, Any] | None = None + + def _load_config(self) -> None: + self._cfg = configs.load_toml(self.cfg_path) + + def __getitem__( + self, + key: str, + ) -> Any: + if self._cfg is None: + self._load_config() + + assert self._cfg is not None, "tried to access not loaded config" + return self._cfg[key] + + +# def load_cfg() -> dict[str, Any]: +# cfg_path = Path.cwd() / "config.toml" +# assert cfg_path.exists(), "config path not found" + +# with open(cfg_path, "rb") as f: +# cfg = tomllib.load(f) + +# return cfg +# cfg = load_cfg() + +cfg_path = Path.cwd() / "../config/config.toml" +CFG = LazyConfigLoader(cfg_path) + + +def make_request( + url: str, + stream: bool = False, +) -> requests.Response: + req = requests.get(url, stream=stream) + if req.status_code != 200: + raise HTTPError(f"Request to the following URL was not successful:\n{url}") + + return req + + +def _right_strip_url_components( + url: str, + number: int, +) -> str: + relevant_comps = url.rsplit("/", number) + + return relevant_comps[0] + + +# ** load metadata and construct asset URL +def get_metadata( + url_metadata: str, + py_version: str, + platform: str, + os_: str, + file_ext: str, + package_type: str, + release_tag: str | None = None, +) -> RetrievalInfo: + req = make_request(url=url_metadata) + metadata = msgspec.json.decode(req.content, type=JsonMetadata) + release_tag = metadata.tag if release_tag is None else release_tag + asset_url = metadata.asset_url_prefix + asset_url = _right_strip_url_components(asset_url, 1) + + target_build: str = ( + f"cpython-{py_version}+{release_tag}-{platform}-{os_}-{package_type}{file_ext}" + ) + + route = f"{release_tag}/{target_build}" + target_url = combine_route(asset_url, route) + + print(f"Target build:\t{target_build},\nTarget URL:\t{target_url}") + + return RetrievalInfo(url=target_url, file=target_build) + + +# ** load file +def load_file_from_url( + url: str, + file_save_path: Path, + overwrite: bool = False, +) -> None: + if file_save_path.exists() and not overwrite: + warnings.warn("File already exists and overwrite option not set. Operation aborted.") + return + + req = make_request(url, stream=True) + with open(file_save_path, "wb") as f: + for chunk in req.iter_content(chunk_size=128): + f.write(chunk) + + +# ** extract file +def extract_archive( + src_path: Path, + target_path: Path, +) -> None: + try: + file_archive = tarfile.open(src_path, mode="r") + file_archive.extractall(path=target_path, filter="data") + except Exception as err: + raise RuntimeError(f"An error occurred during TAR file extraction") from err + finally: + file_archive.close() + + +def path_verify_existence(path: Path) -> None: + if not path.exists(): + raise FileNotFoundError(f"Path does not exist: >{path}<") + + +def get( + url_metadata: str, + py_version: str, + platform: str, + os_: str, + file_ext: str, + package_type: str, + release_tag: str | None = None, + # + reextract: bool = False, + force: bool = False, + folder_dl: Path | None = None, # default CWD +) -> Path: + # DL_FOLDER: Final[str] = CFG["DL_FOLDER"] + # PY_VERSION: Final[str] = CFG["PY_VERSION"] + # ** get relevant metadata + retrieval_info = get_metadata( + url_metadata=url_metadata, + py_version=py_version, + platform=platform, + os_=os_, + file_ext=file_ext, + package_type=package_type, + release_tag=release_tag, + ) + filename = Path(retrieval_info.file) + + # destination folder + folder_dl = folder_dl if folder_dl is not None else Path.cwd() + path_verify_existence(folder_dl) + folder_extract = prepare_path(folder_dl, ("python",), None, None, create_folder=True) + + src_file = folder_dl / filename + + if not src_file.exists(): + print("File not yet available. Download...") + load_file_from_url(url=retrieval_info.url, file_save_path=src_file) + extract_archive(src_file, folder_extract) + print("Downloaded and extraction successfully.") + elif reextract and not force: + print( + "File already downloaded. No re-extraction. Use >force< option " + "if reextraction shall be performed." + ) + elif reextract and force: + print("File already downloaded. Re-extract file...") + extract_archive(src_file, folder_extract) + print("Re-extraction successfully.") + else: + print("File already downloaded. No action performed.") + + return folder_extract + + +def move_and_delete( + src_folder: Path, + overwrite: bool = False, +) -> None: + DIST_FOLDER: Final[str] = CFG["DIST_FOLDER"] + print("Move files to target directory...") + target_directory = Path.cwd() / DIST_FOLDER + if target_directory.exists() and overwrite: + shutil.rmtree(target_directory) + target_directory.mkdir() + + src_path = src_folder / "python" + dest_path = target_directory / "python" + if src_path.exists(): + shutil.copytree(src_path, dest_path) + if not os.access(src_path, os.W_OK): + os.chmod(src_path, stat.S_IWUSR) + shutil.rmtree(src_path) + # venv creation script + # src_script = Path.cwd() / "create_env.py" + # dest_script = dest_path / "create_env.py" + # assert src_script.exists(), "env creation script not found" + # print(src_script, dest_path) + # shutil.copy(src_script, dest_script) + print("Moved files successfully.") + + +def main() -> None: + ... + # target_folder = get(use_default_release_tag=USE_DEFAULT_RELEASE_TAG, reextract=True) + # move_and_delete(target_folder, overwrite=True) diff --git a/src/pycage/types.py b/src/pycage/types.py new file mode 100644 index 0000000..9b7a698 --- /dev/null +++ b/src/pycage/types.py @@ -0,0 +1,15 @@ +from dataclasses import dataclass + +import msgspec + + +class JsonMetadata(msgspec.Struct): + version: int + tag: str + release_url: str + asset_url_prefix: str + + +class RetrievalInfo(msgspec.Struct): + url: str + file: str