pype.utils.pipeline.PipelineItem#

class pype.utils.pipeline.PipelineItem(item, pype_snippets_modules={'_benchmark_power': <pype.utils.snippets.Snippet object>, '_dowload_files': <pype.utils.snippets.SnippetMd object>, 'complement_fa': <pype.utils.snippets.Snippet object>, 'hello': <pype.utils.snippets.Snippet object>, 'lower_fa': <pype.utils.snippets.Snippet object>, 'merge_fa': <pype.utils.snippets.Snippet object>, 'reverse_fa': <pype.utils.snippets.Snippet object>, 'test_adv': <pype.utils.snippets.Snippet object>, 'test_base': <pype.utils.snippets.SnippetMd object>, 'test_fail_exit_code': <pype.utils.snippets.SnippetMd object>, 'test_fail_missing_output': <pype.utils.snippets.SnippetMd object>, 'test_skip_check_results_md': <pype.utils.snippets.SnippetMd object>, 'test_skip_check_results_py': <pype.utils.snippets.Snippet object>})[source]#
Parameters:
__init__(item, pype_snippets_modules={'_benchmark_power': <pype.utils.snippets.Snippet object>, '_dowload_files': <pype.utils.snippets.SnippetMd object>, 'complement_fa': <pype.utils.snippets.Snippet object>, 'hello': <pype.utils.snippets.Snippet object>, 'lower_fa': <pype.utils.snippets.Snippet object>, 'merge_fa': <pype.utils.snippets.Snippet object>, 'reverse_fa': <pype.utils.snippets.Snippet object>, 'test_adv': <pype.utils.snippets.Snippet object>, 'test_base': <pype.utils.snippets.SnippetMd object>, 'test_fail_exit_code': <pype.utils.snippets.SnippetMd object>, 'test_fail_missing_output': <pype.utils.snippets.SnippetMd object>, 'test_skip_check_results_md': <pype.utils.snippets.SnippetMd object>, 'test_skip_check_results_py': <pype.utils.snippets.Snippet object>})[source]#
Parameters:

Methods

__init__(item[, pype_snippets_modules])

generate_batch_id()

run(argv, queue, profile, log, jobs[, ...])

Run pipeline item (without recursively running dependencies).

__init__(item, pype_snippets_modules={'_benchmark_power': <pype.utils.snippets.Snippet object>, '_dowload_files': <pype.utils.snippets.SnippetMd object>, 'complement_fa': <pype.utils.snippets.Snippet object>, 'hello': <pype.utils.snippets.Snippet object>, 'lower_fa': <pype.utils.snippets.Snippet object>, 'merge_fa': <pype.utils.snippets.Snippet object>, 'reverse_fa': <pype.utils.snippets.Snippet object>, 'test_adv': <pype.utils.snippets.Snippet object>, 'test_base': <pype.utils.snippets.SnippetMd object>, 'test_fail_exit_code': <pype.utils.snippets.SnippetMd object>, 'test_fail_missing_output': <pype.utils.snippets.SnippetMd object>, 'test_skip_check_results_md': <pype.utils.snippets.SnippetMd object>, 'test_skip_check_results_py': <pype.utils.snippets.Snippet object>})[source]#
Parameters:
generate_batch_id()[source]#
run(argv, queue, profile, log, jobs, progress=None, registry=None)[source]#

Run pipeline item (without recursively running dependencies).

In DAG execution model, dependencies are handled by Pipeline.submit(), so this method just executes the current step.

For backwards compatibility, still supports deprecated nested deps.

Parameters:
  • argv (List[str]) – Command line arguments

  • queue (str) – Queue system to use

  • profile (str) – Profile name

  • log (Any) – Logger instance

  • jobs (List[Any]) – List of submitted jobs (job IDs of dependencies)

  • progress (Any | None) – Optional Progress tracker for resumable execution

  • registry (PipelineRegistry | None)

Returns:

List of job results

Return type:

List[Any] | None