239 lines
7.0 KiB
Python
239 lines
7.0 KiB
Python
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from lang_main import io
|
|
from lang_main.errors import (
|
|
NoPerformableActionError,
|
|
OutputInPipelineContainerError,
|
|
WrongActionTypeError,
|
|
)
|
|
from lang_main.pipelines import base
|
|
|
|
PIPELINE_NAME = 'test'
|
|
|
|
|
|
@pytest.fixture(scope='module')
|
|
def working_dir() -> Path:
|
|
work_dir = Path.cwd() / 'tests/work_dir'
|
|
if not work_dir.exists():
|
|
work_dir.mkdir()
|
|
return work_dir
|
|
|
|
|
|
@pytest.fixture(scope='function')
|
|
def pipeline_container(working_dir) -> base.PipelineContainer:
|
|
return base.PipelineContainer(name=PIPELINE_NAME, working_dir=working_dir)
|
|
|
|
|
|
@pytest.fixture(scope='function')
|
|
def pipeline(working_dir) -> base.Pipeline:
|
|
return base.Pipeline(name=PIPELINE_NAME, working_dir=working_dir)
|
|
|
|
|
|
def test_empty_pipeline_container(pipeline_container, working_dir):
|
|
container = pipeline_container
|
|
assert container.name == PIPELINE_NAME
|
|
assert container.working_dir == working_dir
|
|
assert len(container.actions) == 0
|
|
assert len(container.action_names) == 0
|
|
assert len(container.action_skip) == 0
|
|
assert container.curr_proc_idx == 1
|
|
|
|
with pytest.raises(NoPerformableActionError):
|
|
container.prep_run()
|
|
|
|
assert container.post_run() is None
|
|
|
|
|
|
@pytest.mark.parametrize('skip', [True, False])
|
|
def test_pipeline_container_valid(pipeline_container, skip):
|
|
test_string = 'test'
|
|
|
|
def valid_action(): # pragma: no cover
|
|
nonlocal test_string
|
|
test_string += '_2'
|
|
|
|
pipeline_container.add(valid_action, skip=skip)
|
|
assert len(pipeline_container.actions) == 1
|
|
assert len(pipeline_container.action_names) == 1
|
|
assert len(pipeline_container.action_skip) == 1
|
|
|
|
ret = pipeline_container.run()
|
|
assert pipeline_container.curr_proc_idx == 2
|
|
assert ret is None
|
|
if skip:
|
|
assert test_string == 'test'
|
|
else:
|
|
assert test_string == 'test_2'
|
|
|
|
pipeline_container.prep_run()
|
|
assert pipeline_container.curr_proc_idx == 1
|
|
|
|
|
|
def test_pipeline_container_invalid_action(pipeline_container):
|
|
test_string = 'test'
|
|
|
|
def invalid_action():
|
|
nonlocal test_string
|
|
test_string += '_2'
|
|
new = 'ret'
|
|
return new
|
|
|
|
with pytest.raises(WrongActionTypeError):
|
|
pipeline_container.add(test_string, skip=False)
|
|
|
|
pipeline_container.add(invalid_action, skip=False)
|
|
with pytest.raises(OutputInPipelineContainerError):
|
|
pipeline_container.run()
|
|
|
|
|
|
def test_empty_pipeline(pipeline, working_dir):
|
|
pipe = pipeline
|
|
assert pipe.name == PIPELINE_NAME
|
|
assert pipe.working_dir == working_dir
|
|
assert len(pipe.actions) == 0
|
|
assert len(pipe.action_names) == 0
|
|
assert len(pipe.actions_kwargs) == 0
|
|
assert len(pipe.save_results) == 0
|
|
assert len(pipe.load_results) == 0
|
|
assert pipe.curr_proc_idx == 1
|
|
assert pipe._intermediate_result is None
|
|
|
|
with pytest.raises(NoPerformableActionError):
|
|
pipe.prep_run()
|
|
|
|
assert pipe.post_run() is None
|
|
|
|
|
|
@pytest.mark.parametrize('alter_content', [True, False])
|
|
def test_pipeline_valid(pipeline, alter_content):
|
|
pipe = pipeline
|
|
test_string = 'test'
|
|
|
|
# action preparation
|
|
def valid_action(string, add_content=False):
|
|
if add_content:
|
|
string += '_2'
|
|
return string
|
|
|
|
pipe.add(valid_action, {'add_content': alter_content})
|
|
assert len(pipe.actions) == 1
|
|
assert len(pipe.action_names) == 1
|
|
assert len(pipe.actions_kwargs) == 1
|
|
assert len(pipe.action_skip) == 1
|
|
assert len(pipe.save_results) == 1
|
|
assert len(pipe.load_results) == 1
|
|
assert pipe.save_results[0] == (False, None)
|
|
assert pipe.load_results[0] == (False, None)
|
|
# filenames and saving/loading
|
|
target_filename = f'Pipe-{pipe.name}_Step-{pipe.curr_proc_idx}_valid_action'
|
|
target_pth = (pipe.working_dir / target_filename).with_suffix('.pkl')
|
|
ret_pth, action_name = pipe.get_result_path(0, filename=None)
|
|
assert ret_pth == target_pth
|
|
assert action_name == 'valid_action'
|
|
filename = 'test'
|
|
ret_pth, action_name = pipe.get_result_path(0, filename=filename)
|
|
target_pth = (pipe.working_dir / filename).with_suffix('.pkl')
|
|
assert ret_pth == target_pth
|
|
assert action_name == 'valid_action'
|
|
# load non-existing files
|
|
with pytest.raises(FileNotFoundError):
|
|
pipe.load_step(0, 'non_existing')
|
|
|
|
# running
|
|
ret = pipe.run(starting_values=(test_string,))
|
|
assert isinstance(ret, tuple)
|
|
assert pipe._intermediate_result == ret
|
|
assert pipe.curr_proc_idx == 2
|
|
assert ret is not None
|
|
if alter_content:
|
|
assert ret[0] == 'test_2'
|
|
else:
|
|
assert ret[0] == 'test'
|
|
|
|
pipe.prep_run()
|
|
assert pipe.curr_proc_idx == 1
|
|
|
|
# load existing files
|
|
loaded_res = pipe.load_step(0, None)
|
|
assert loaded_res is not None
|
|
assert isinstance(loaded_res, tuple)
|
|
assert loaded_res[0] == ret[0]
|
|
|
|
|
|
def test_pipeline_valid_action_load(pipeline, working_dir):
|
|
pipe = pipeline
|
|
test_string = 'test'
|
|
|
|
# action preparation
|
|
def valid_action(string, add_content=False): # pragma: no cover
|
|
if add_content:
|
|
string += '_2'
|
|
return string
|
|
|
|
pipe.add(valid_action, {'add_content': False}, load_result=True)
|
|
assert len(pipe.actions) == 1
|
|
assert len(pipe.action_names) == 1
|
|
assert len(pipe.actions_kwargs) == 1
|
|
assert len(pipe.action_skip) == 1
|
|
assert len(pipe.save_results) == 1
|
|
assert len(pipe.load_results) == 1
|
|
assert pipe.save_results[0] == (False, None)
|
|
assert pipe.load_results[0] == (True, None)
|
|
|
|
ret = pipe.run(starting_values=(test_string,))
|
|
assert isinstance(ret, tuple)
|
|
assert pipe._intermediate_result == ret
|
|
assert pipe.curr_proc_idx == 2
|
|
assert ret is not None
|
|
|
|
# load non-tuple result
|
|
filename = 'non_tuple.pkl'
|
|
save_pth = working_dir / filename
|
|
io.save_pickle(test_string, save_pth)
|
|
with pytest.raises(TypeError):
|
|
pipe.load_step(0, filename)
|
|
|
|
|
|
def test_pipeline_multiple_actions(pipeline):
|
|
pipe = pipeline
|
|
test_string = 'test'
|
|
|
|
# action preparation
|
|
def valid_action(string, add_content=True):
|
|
if add_content:
|
|
string += '_2'
|
|
return string
|
|
|
|
def valid_action_2(string, add_content=True):
|
|
if add_content:
|
|
string += '_3'
|
|
return string
|
|
|
|
pipe.add(valid_action, {'add_content': True}, skip=True)
|
|
pipe.add(valid_action, {'add_content': True})
|
|
pipe.add(valid_action_2)
|
|
assert len(pipe.actions) == 3
|
|
assert len(pipe.action_names) == 3
|
|
assert len(pipe.actions_kwargs) == 3
|
|
assert len(pipe.action_skip) == 3
|
|
assert len(pipe.save_results) == 3
|
|
assert len(pipe.load_results) == 3
|
|
assert pipe.save_results[1] == (False, None)
|
|
assert pipe.load_results[1] == (False, None)
|
|
|
|
ret = pipe.run(starting_values=(test_string,))
|
|
assert isinstance(ret, tuple)
|
|
assert pipe._intermediate_result == ret
|
|
assert pipe.curr_proc_idx == 4
|
|
assert ret is not None
|
|
assert ret[0] == 'test_2_3'
|
|
|
|
|
|
def test_pipeline_invalid_action(pipeline):
|
|
test_string = 'test'
|
|
|
|
with pytest.raises(WrongActionTypeError):
|
|
pipeline.add(test_string, skip=False)
|