add download function

This commit is contained in:
Florian Förster 2025-03-19 13:00:07 +01:00
parent 9695c975cc
commit 3950b6673a
2 changed files with 181 additions and 221 deletions

View File

@ -1,221 +0,0 @@
import os
import shutil
import stat
import tarfile
import warnings
from pathlib import Path
from typing import Any, Final
import msgspec
import requests
from dopt_basics import configs
from dopt_basics.io import combine_route, prepare_path
from requests.exceptions import HTTPError
from pycage.types import JsonMetadata, RetrievalInfo
class LazyConfigLoader:
def __init__(
self,
cfg_path: Path,
) -> None:
self.cfg_path = cfg_path
self._cfg: dict[str, Any] | None = None
def _load_config(self) -> None:
self._cfg = configs.load_toml(self.cfg_path)
def __getitem__(
self,
key: str,
) -> Any:
if self._cfg is None:
self._load_config()
assert self._cfg is not None, "tried to access not loaded config"
return self._cfg[key]
# def load_cfg() -> dict[str, Any]:
# cfg_path = Path.cwd() / "config.toml"
# assert cfg_path.exists(), "config path not found"
# with open(cfg_path, "rb") as f:
# cfg = tomllib.load(f)
# return cfg
# cfg = load_cfg()
cfg_path = Path.cwd() / "../config/config.toml"
CFG = LazyConfigLoader(cfg_path)
def make_request(
url: str,
stream: bool = False,
) -> requests.Response:
req = requests.get(url, stream=stream)
if req.status_code != 200:
raise HTTPError(f"Request to the following URL was not successful:\n{url}")
return req
def _right_strip_url_components(
url: str,
number: int,
) -> str:
relevant_comps = url.rsplit("/", number)
return relevant_comps[0]
# ** load metadata and construct asset URL
def get_metadata(
url_metadata: str,
py_version: str,
platform: str,
os_: str,
file_ext: str,
package_type: str,
release_tag: str | None = None,
) -> RetrievalInfo:
req = make_request(url=url_metadata)
metadata = msgspec.json.decode(req.content, type=JsonMetadata)
release_tag = metadata.tag if release_tag is None else release_tag
asset_url = metadata.asset_url_prefix
asset_url = _right_strip_url_components(asset_url, 1)
target_build: str = (
f"cpython-{py_version}+{release_tag}-{platform}-{os_}-{package_type}{file_ext}"
)
route = f"{release_tag}/{target_build}"
target_url = combine_route(asset_url, route)
print(f"Target build:\t{target_build},\nTarget URL:\t{target_url}")
return RetrievalInfo(url=target_url, file=target_build)
# ** load file
def load_file_from_url(
url: str,
file_save_path: Path,
overwrite: bool = False,
) -> None:
if file_save_path.exists() and not overwrite:
warnings.warn("File already exists and overwrite option not set. Operation aborted.")
return
req = make_request(url, stream=True)
with open(file_save_path, "wb") as f:
for chunk in req.iter_content(chunk_size=128):
f.write(chunk)
# ** extract file
def extract_archive(
src_path: Path,
target_path: Path,
) -> None:
try:
file_archive = tarfile.open(src_path, mode="r")
file_archive.extractall(path=target_path, filter="data")
except Exception as err:
raise RuntimeError(f"An error occurred during TAR file extraction") from err
finally:
file_archive.close()
def path_verify_existence(path: Path) -> None:
if not path.exists():
raise FileNotFoundError(f"Path does not exist: >{path}<")
def get(
url_metadata: str,
py_version: str,
platform: str,
os_: str,
file_ext: str,
package_type: str,
release_tag: str | None = None,
#
reextract: bool = False,
force: bool = False,
folder_dl: Path | None = None, # default CWD
) -> Path:
# DL_FOLDER: Final[str] = CFG["DL_FOLDER"]
# PY_VERSION: Final[str] = CFG["PY_VERSION"]
# ** get relevant metadata
retrieval_info = get_metadata(
url_metadata=url_metadata,
py_version=py_version,
platform=platform,
os_=os_,
file_ext=file_ext,
package_type=package_type,
release_tag=release_tag,
)
filename = Path(retrieval_info.file)
# destination folder
folder_dl = folder_dl if folder_dl is not None else Path.cwd()
path_verify_existence(folder_dl)
folder_extract = prepare_path(folder_dl, ("python",), None, None, create_folder=True)
src_file = folder_dl / filename
if not src_file.exists():
print("File not yet available. Download...")
load_file_from_url(url=retrieval_info.url, file_save_path=src_file)
extract_archive(src_file, folder_extract)
print("Downloaded and extraction successfully.")
elif reextract and not force:
print(
"File already downloaded. No re-extraction. Use >force< option "
"if reextraction shall be performed."
)
elif reextract and force:
print("File already downloaded. Re-extract file...")
extract_archive(src_file, folder_extract)
print("Re-extraction successfully.")
else:
print("File already downloaded. No action performed.")
return folder_extract
def move_and_delete(
src_folder: Path,
overwrite: bool = False,
) -> None:
DIST_FOLDER: Final[str] = CFG["DIST_FOLDER"]
print("Move files to target directory...")
target_directory = Path.cwd() / DIST_FOLDER
if target_directory.exists() and overwrite:
shutil.rmtree(target_directory)
target_directory.mkdir()
src_path = src_folder / "python"
dest_path = target_directory / "python"
if src_path.exists():
shutil.copytree(src_path, dest_path)
if not os.access(src_path, os.W_OK):
os.chmod(src_path, stat.S_IWUSR)
shutil.rmtree(src_path)
# venv creation script
# src_script = Path.cwd() / "create_env.py"
# dest_script = dest_path / "create_env.py"
# assert src_script.exists(), "env creation script not found"
# print(src_script, dest_path)
# shutil.copy(src_script, dest_script)
print("Moved files successfully.")
def main() -> None:
...
# target_folder = get(use_default_release_tag=USE_DEFAULT_RELEASE_TAG, reextract=True)
# move_and_delete(target_folder, overwrite=True)

181
src/pycage/get.py Normal file
View File

@ -0,0 +1,181 @@
from __future__ import annotations
import tarfile
from pathlib import Path
from typing import cast
import click
import msgspec
import requests
from dopt_basics.io import combine_route
from requests.exceptions import HTTPError
from pycage import config
from pycage.helpers import delete_folder_recursively, path_verify_existence, print_error
from pycage.types import JsonMetadata, RetrievalInfo
# cfg_path = Path.cwd() / "../config/config.toml"
def make_request(
url: str,
stream: bool = False,
) -> requests.Response:
req = requests.get(url, stream=stream)
if req.status_code != 200:
raise HTTPError(f"Request to the following URL was not successful:\n{url}")
return req
def _right_strip_url_components(
url: str,
number: int,
) -> str:
relevant_comps = url.rsplit("/", number)
return relevant_comps[0]
# ** load metadata and construct asset URL
def get_metadata(
url_metadata: str,
py_version: str,
platform: str,
os_: str,
file_ext: str,
package_type: str,
release_tag: str | None = None,
) -> RetrievalInfo:
req = make_request(url=url_metadata)
metadata = msgspec.json.decode(req.content, type=JsonMetadata)
release_tag = metadata.tag if release_tag is None else release_tag
asset_url = metadata.asset_url_prefix
asset_url = _right_strip_url_components(asset_url, 1)
target_build: str = (
f"cpython-{py_version}+{release_tag}-{platform}-{os_}-{package_type}{file_ext}"
)
route = f"{release_tag}/{target_build}"
target_url = combine_route(asset_url, route)
print(f"Target build:\t{target_build},\nTarget URL:\t{target_url}")
return RetrievalInfo(url=target_url, file=target_build)
# ** load file
def load_file_from_url(
url: str,
file_save_path: Path,
overwrite: bool = False,
) -> None:
if file_save_path.exists() and not overwrite:
click.echo("File already exists and overwrite option not set. Operation aborted.")
return
req = make_request(url, stream=True)
with open(file_save_path, "wb") as f:
for chunk in req.iter_content(chunk_size=128):
f.write(chunk)
# ** extract file
def extract_archive(
src_path: Path,
target_path: Path,
) -> None:
try:
file_archive = tarfile.open(src_path, mode="r")
file_archive.extractall(path=target_path, filter="data")
except Exception as err:
raise RuntimeError("An error occurred during TAR file extraction") from err
finally:
file_archive.close()
@click.command(
help=(
"download and extract Python standalone images to a specified folder, "
"distribution is done in folder with name >python<\n"
"uses full version specifier to download, scheme: major.minor.patch"
)
)
@click.option(
"--dl-folder",
type=click.Path(
exists=True,
dir_okay=True,
writable=True,
path_type=Path,
),
default=None,
help="specifiy a different target location, default: current working directory",
)
@click.option(
"-f",
"--force-reextract",
is_flag=True,
show_default=True,
default=False,
help="forces the re-extraction of an already downloaded archive",
)
@click.option(
"-rt",
"--release-tag",
default=None,
help="specific release tag from Python standalone repo",
)
@click.argument("version", nargs=1)
def get(
version: str,
release_tag: str | None,
force_reextract: bool,
dl_folder: Path | None,
) -> None:
url_metadata = cast(str, config.CFG["metadata"]["URL"])
os_file_info = config.CFG.os_info
platform = os_file_info.PLATFORM
os_ = os_file_info.OS
file_ext = os_file_info.FILE_EXT
package_type = cast(str, config.CFG["package"]["PACKAGE_TYPE"])
# ** get relevant metadata
retrieval_info = get_metadata(
url_metadata=url_metadata,
py_version=version,
platform=platform,
os_=os_,
file_ext=file_ext,
package_type=package_type,
release_tag=release_tag,
)
filename = Path(retrieval_info.file)
# destination folder
dl_folder = dl_folder if dl_folder is not None else Path.cwd()
path_verify_existence(dl_folder)
# folder_extract = prepare_path(dl_folder, None, None, None, create_folder=True)
target_folder = dl_folder / "python"
folder_extract = dl_folder
src_file = dl_folder / filename
try:
if not src_file.exists():
click.echo("File not yet available. Download...")
load_file_from_url(url=retrieval_info.url, file_save_path=src_file)
delete_folder_recursively(target_folder)
extract_archive(src_file, folder_extract)
print("Download and extraction successfully.")
elif force_reextract:
click.echo("File already downloaded. Re-extract file...")
delete_folder_recursively(target_folder)
extract_archive(src_file, folder_extract)
click.echo("Re-extraction successfully.")
else:
click.echo(
"File already downloaded. No action performed. If you wish to delete and "
"re-extract the archive, use the flag ``--force-reextract``"
)
except Exception as err:
print_error(err)