diff --git a/src/pycage/download.py b/src/pycage/download.py deleted file mode 100644 index d780215..0000000 --- a/src/pycage/download.py +++ /dev/null @@ -1,221 +0,0 @@ -import os -import shutil -import stat -import tarfile -import warnings -from pathlib import Path -from typing import Any, Final - -import msgspec -import requests -from dopt_basics import configs -from dopt_basics.io import combine_route, prepare_path -from requests.exceptions import HTTPError - -from pycage.types import JsonMetadata, RetrievalInfo - - -class LazyConfigLoader: - def __init__( - self, - cfg_path: Path, - ) -> None: - self.cfg_path = cfg_path - self._cfg: dict[str, Any] | None = None - - def _load_config(self) -> None: - self._cfg = configs.load_toml(self.cfg_path) - - def __getitem__( - self, - key: str, - ) -> Any: - if self._cfg is None: - self._load_config() - - assert self._cfg is not None, "tried to access not loaded config" - return self._cfg[key] - - -# def load_cfg() -> dict[str, Any]: -# cfg_path = Path.cwd() / "config.toml" -# assert cfg_path.exists(), "config path not found" - -# with open(cfg_path, "rb") as f: -# cfg = tomllib.load(f) - -# return cfg -# cfg = load_cfg() - -cfg_path = Path.cwd() / "../config/config.toml" -CFG = LazyConfigLoader(cfg_path) - - -def make_request( - url: str, - stream: bool = False, -) -> requests.Response: - req = requests.get(url, stream=stream) - if req.status_code != 200: - raise HTTPError(f"Request to the following URL was not successful:\n{url}") - - return req - - -def _right_strip_url_components( - url: str, - number: int, -) -> str: - relevant_comps = url.rsplit("/", number) - - return relevant_comps[0] - - -# ** load metadata and construct asset URL -def get_metadata( - url_metadata: str, - py_version: str, - platform: str, - os_: str, - file_ext: str, - package_type: str, - release_tag: str | None = None, -) -> RetrievalInfo: - req = make_request(url=url_metadata) - metadata = msgspec.json.decode(req.content, type=JsonMetadata) - release_tag = metadata.tag if release_tag is None else release_tag - asset_url = metadata.asset_url_prefix - asset_url = _right_strip_url_components(asset_url, 1) - - target_build: str = ( - f"cpython-{py_version}+{release_tag}-{platform}-{os_}-{package_type}{file_ext}" - ) - - route = f"{release_tag}/{target_build}" - target_url = combine_route(asset_url, route) - - print(f"Target build:\t{target_build},\nTarget URL:\t{target_url}") - - return RetrievalInfo(url=target_url, file=target_build) - - -# ** load file -def load_file_from_url( - url: str, - file_save_path: Path, - overwrite: bool = False, -) -> None: - if file_save_path.exists() and not overwrite: - warnings.warn("File already exists and overwrite option not set. Operation aborted.") - return - - req = make_request(url, stream=True) - with open(file_save_path, "wb") as f: - for chunk in req.iter_content(chunk_size=128): - f.write(chunk) - - -# ** extract file -def extract_archive( - src_path: Path, - target_path: Path, -) -> None: - try: - file_archive = tarfile.open(src_path, mode="r") - file_archive.extractall(path=target_path, filter="data") - except Exception as err: - raise RuntimeError(f"An error occurred during TAR file extraction") from err - finally: - file_archive.close() - - -def path_verify_existence(path: Path) -> None: - if not path.exists(): - raise FileNotFoundError(f"Path does not exist: >{path}<") - - -def get( - url_metadata: str, - py_version: str, - platform: str, - os_: str, - file_ext: str, - package_type: str, - release_tag: str | None = None, - # - reextract: bool = False, - force: bool = False, - folder_dl: Path | None = None, # default CWD -) -> Path: - # DL_FOLDER: Final[str] = CFG["DL_FOLDER"] - # PY_VERSION: Final[str] = CFG["PY_VERSION"] - # ** get relevant metadata - retrieval_info = get_metadata( - url_metadata=url_metadata, - py_version=py_version, - platform=platform, - os_=os_, - file_ext=file_ext, - package_type=package_type, - release_tag=release_tag, - ) - filename = Path(retrieval_info.file) - - # destination folder - folder_dl = folder_dl if folder_dl is not None else Path.cwd() - path_verify_existence(folder_dl) - folder_extract = prepare_path(folder_dl, ("python",), None, None, create_folder=True) - - src_file = folder_dl / filename - - if not src_file.exists(): - print("File not yet available. Download...") - load_file_from_url(url=retrieval_info.url, file_save_path=src_file) - extract_archive(src_file, folder_extract) - print("Downloaded and extraction successfully.") - elif reextract and not force: - print( - "File already downloaded. No re-extraction. Use >force< option " - "if reextraction shall be performed." - ) - elif reextract and force: - print("File already downloaded. Re-extract file...") - extract_archive(src_file, folder_extract) - print("Re-extraction successfully.") - else: - print("File already downloaded. No action performed.") - - return folder_extract - - -def move_and_delete( - src_folder: Path, - overwrite: bool = False, -) -> None: - DIST_FOLDER: Final[str] = CFG["DIST_FOLDER"] - print("Move files to target directory...") - target_directory = Path.cwd() / DIST_FOLDER - if target_directory.exists() and overwrite: - shutil.rmtree(target_directory) - target_directory.mkdir() - - src_path = src_folder / "python" - dest_path = target_directory / "python" - if src_path.exists(): - shutil.copytree(src_path, dest_path) - if not os.access(src_path, os.W_OK): - os.chmod(src_path, stat.S_IWUSR) - shutil.rmtree(src_path) - # venv creation script - # src_script = Path.cwd() / "create_env.py" - # dest_script = dest_path / "create_env.py" - # assert src_script.exists(), "env creation script not found" - # print(src_script, dest_path) - # shutil.copy(src_script, dest_script) - print("Moved files successfully.") - - -def main() -> None: - ... - # target_folder = get(use_default_release_tag=USE_DEFAULT_RELEASE_TAG, reextract=True) - # move_and_delete(target_folder, overwrite=True) diff --git a/src/pycage/get.py b/src/pycage/get.py new file mode 100644 index 0000000..94dea6f --- /dev/null +++ b/src/pycage/get.py @@ -0,0 +1,181 @@ +from __future__ import annotations + +import tarfile +from pathlib import Path +from typing import cast + +import click +import msgspec +import requests +from dopt_basics.io import combine_route +from requests.exceptions import HTTPError + +from pycage import config +from pycage.helpers import delete_folder_recursively, path_verify_existence, print_error +from pycage.types import JsonMetadata, RetrievalInfo + +# cfg_path = Path.cwd() / "../config/config.toml" + + +def make_request( + url: str, + stream: bool = False, +) -> requests.Response: + req = requests.get(url, stream=stream) + if req.status_code != 200: + raise HTTPError(f"Request to the following URL was not successful:\n{url}") + + return req + + +def _right_strip_url_components( + url: str, + number: int, +) -> str: + relevant_comps = url.rsplit("/", number) + + return relevant_comps[0] + + +# ** load metadata and construct asset URL +def get_metadata( + url_metadata: str, + py_version: str, + platform: str, + os_: str, + file_ext: str, + package_type: str, + release_tag: str | None = None, +) -> RetrievalInfo: + req = make_request(url=url_metadata) + metadata = msgspec.json.decode(req.content, type=JsonMetadata) + release_tag = metadata.tag if release_tag is None else release_tag + asset_url = metadata.asset_url_prefix + asset_url = _right_strip_url_components(asset_url, 1) + + target_build: str = ( + f"cpython-{py_version}+{release_tag}-{platform}-{os_}-{package_type}{file_ext}" + ) + + route = f"{release_tag}/{target_build}" + target_url = combine_route(asset_url, route) + + print(f"Target build:\t{target_build},\nTarget URL:\t{target_url}") + + return RetrievalInfo(url=target_url, file=target_build) + + +# ** load file +def load_file_from_url( + url: str, + file_save_path: Path, + overwrite: bool = False, +) -> None: + if file_save_path.exists() and not overwrite: + click.echo("File already exists and overwrite option not set. Operation aborted.") + return + + req = make_request(url, stream=True) + with open(file_save_path, "wb") as f: + for chunk in req.iter_content(chunk_size=128): + f.write(chunk) + + +# ** extract file +def extract_archive( + src_path: Path, + target_path: Path, +) -> None: + try: + file_archive = tarfile.open(src_path, mode="r") + file_archive.extractall(path=target_path, filter="data") + except Exception as err: + raise RuntimeError("An error occurred during TAR file extraction") from err + finally: + file_archive.close() + + +@click.command( + help=( + "download and extract Python standalone images to a specified folder, " + "distribution is done in folder with name >python<\n" + "uses full version specifier to download, scheme: major.minor.patch" + ) +) +@click.option( + "--dl-folder", + type=click.Path( + exists=True, + dir_okay=True, + writable=True, + path_type=Path, + ), + default=None, + help="specifiy a different target location, default: current working directory", +) +@click.option( + "-f", + "--force-reextract", + is_flag=True, + show_default=True, + default=False, + help="forces the re-extraction of an already downloaded archive", +) +@click.option( + "-rt", + "--release-tag", + default=None, + help="specific release tag from Python standalone repo", +) +@click.argument("version", nargs=1) +def get( + version: str, + release_tag: str | None, + force_reextract: bool, + dl_folder: Path | None, +) -> None: + url_metadata = cast(str, config.CFG["metadata"]["URL"]) + os_file_info = config.CFG.os_info + platform = os_file_info.PLATFORM + os_ = os_file_info.OS + file_ext = os_file_info.FILE_EXT + package_type = cast(str, config.CFG["package"]["PACKAGE_TYPE"]) + # ** get relevant metadata + retrieval_info = get_metadata( + url_metadata=url_metadata, + py_version=version, + platform=platform, + os_=os_, + file_ext=file_ext, + package_type=package_type, + release_tag=release_tag, + ) + filename = Path(retrieval_info.file) + + # destination folder + dl_folder = dl_folder if dl_folder is not None else Path.cwd() + path_verify_existence(dl_folder) + # folder_extract = prepare_path(dl_folder, None, None, None, create_folder=True) + target_folder = dl_folder / "python" + folder_extract = dl_folder + src_file = dl_folder / filename + + try: + if not src_file.exists(): + click.echo("File not yet available. Download...") + load_file_from_url(url=retrieval_info.url, file_save_path=src_file) + delete_folder_recursively(target_folder) + extract_archive(src_file, folder_extract) + print("Download and extraction successfully.") + elif force_reextract: + click.echo("File already downloaded. Re-extract file...") + delete_folder_recursively(target_folder) + extract_archive(src_file, folder_extract) + click.echo("Re-extraction successfully.") + else: + click.echo( + "File already downloaded. No action performed. If you wish to delete and " + "re-extract the archive, use the flag ``--force-reextract``" + ) + except Exception as err: + print_error(err)