Pipelines#

A pipeline file (in YAML format) describes a sequence of computational steps that transform and connect data. Each step can be either a pipeline (complex multi-action unit) or a snippet (atomic reusable component). Dependencies between steps define their execution order.

Note

API 2.1.0 is recommended for new pipelines. It provides a flat, declarative structure that is easier to read and write while supporting complex workflows.

Info Section#

Required fields:

  • description: Brief explanation of pipeline purpose

  • api: API version (2.1.0 recommended, 2.0.0 supported)

Optional fields:

  • arguments: Documentation for pipeline arguments

  • defaults: Default values for arguments

  • batches: Batch processing configurations

Example with all fields:

info:
  description: Process multiple FASTA files
  date: 2025-06-01
  api: 2.1.0
  arguments:
    input_dir: Directory containing FASTA files
    output_dir: Output directory for results
  defaults:
    threads: 4
    quality: "high"
  batches:
    sample_sheet:
      required: ["sample_id", "fasta_file"]
      optional: ["quality"]
      snippet: process_fasta

Steps/Items#

Step types:

  • snippet: Single task execution

  • pipeline: Nested pipeline execution

  • batch_snippet: Parallel snippet execution across batch items

  • batch_pipeline: Parallel pipeline execution across batch items

Defines all pipeline components at a single hierarchical level. Dependencies determine execution order.

Each step key is a unique identifier used in depends_on lists.

steps:
  step_1_fastq_to_bam:
    name: gatk4_fastq_to_bam
    type: pipeline
    depends_on: []
    arguments:
      --bam_output: '%(bam_markdups)s'
      --sample_name: '%(sample_name)s'
      --fastq_batch: '%(fastq_batch)s'
      --dup_metrics: '%(dup_metrics)s'
      --tmp_dir: '%(tmp_dir)s'

  step_2_base_recalibrator:
    name: gatk_base_recalibrator
    type: snippet
    depends_on: [step_1_fastq_to_bam]
    arguments:
      -i: '%(bam_markdups)s'
      -o: '%(recalibration_table)s'

  step_3_apply_bsqr:
    name: gatk_apply_BSQR
    type: snippet
    depends_on: [step_2_base_recalibrator]
    arguments:
      -i: '%(bam_markdups)s'
      -o: '%(bam_recalibrated)s'
      -r: '%(recalibration_table)s'

Dependencies

Each step lists the other steps it depends on under depends_on. Dependencies define which steps must complete before the current one can run.

Rules:

  • depends_on accepts a list of step IDs (e.g., [step_1_fastq_to_bam]).

  • Steps without dependencies ([]) are roots and execute first.

  • A step can depend on multiple others to represent joins or merges.

Example:

depends_on: [step_2_prepare_data, step_3_generate_reference]

Arguments

Arguments define how parameters and data are passed into each pipeline or snippet. They correspond to command-line or workflow parameters and can take one of several forms.

  1. Simple Arguments

Most arguments are direct string references or expressions:

arguments:
  -i: '%(bam_input)s'
  -o: '%(recalibrated_bam)s'
  --threads: 8

The value may use string interpolation with pipeline variables (%(variable_name)s).

  1. Multiple Values per Prefix

    Use lists when the same argument appears multiple times:

    arguments:
      -i:
        - '%(tumor_bam)s'
        - '%(normal_bam)s'
    
  2. Extended Argument Objects

    Arguments may include metadata, such as type or special handling flags:

    arguments:
      input_batch:
        value: '%(intervals)s'
        type: batch_file_arg
    

    This supports batched processing or nonstandard argument semantics.

  3. Composite Arguments

    Composite arguments define sub-invocations of other snippets or pipelines. They allow dynamically generating intermediate files or results based on other steps.

    arguments:
      -o: '%(pileup_summary_tumor)s'
      -t: '%(tmp_dir)s'
      -i:
        value:
          snippet_name: gatk_pileup_summary
          result_key: table
          result_arguments:
            input_batch:
              value: '%(intervals)s'
              type: batch_file_arg
            -o: '%(pileup_summary_tumor)s'
        type: composite_arg
    

    How It Works?

    • The value describes the nested snippet to invoke (snippet_name).

    • result_key defines which output from that snippet should be used.

    • result_arguments provides the snippet’s own arguments, written using the same compact syntax.

    • type: composite_arg indicates this argument triggers an internal subtask.

    Composite arguments enable reusable sub-snippets for preprocessing, filtering, or dynamic file generation.

Example: Combined Pipeline
info:
  description: Somatic variant calling with pileup summaries
  date: 2024-09-20
  api: 2.1.0

steps:
  step_1_align:
    name: bwa_mem
    type: snippet
    depends_on: []
    arguments:
      -r: '%(reference)s'
      -1: '%(fastq_1)s'
      -2: '%(fastq_2)s'
      -o: '%(aligned_bam)s'

  step_2_pileup_tumor:
    name: gatk_pileup_summary
    type: snippet
    depends_on: [step_1_align]
    arguments:
      input_batch:
        value: '%(intervals)s'
        type: batch_file_arg
      -i: '%(aligned_bam)s'
      -o: '%(pileup_summary_tumor)s'

  step_3_mutect2:
    name: gatk_mutect2
    type: batch_snippet
    depends_on: [step_2_pileup_tumor]
    arguments:
      -i:
        - '%(bam_tumor)s'
        - '%(bam_normal)s'
      -n: '%(sample_name_normal)s'
      -o: '%(vcf_output)s'
      input_batch:
        value: '%(intervals)s'
        type: batch_file_arg

DAG Execution Model#

Bio_pype uses a Directed Acyclic Graph (DAG) model to execute pipeline steps efficiently and correctly.

How It Works

Bio_pype builds a flat dependency graph where:

  1. All steps coexist at the same level: No nested hierarchies of dependencies

  2. Topological ordering: Steps execute in the order determined by their dependencies using Kahn’s algorithm

  3. Single execution guarantee: Each step runs exactly once, regardless of how many downstream steps depend on it

Why This Design

When a step has multiple downstream dependencies, it needs to run only once with results available to all dependents. This prevents redundant computation and ensures efficient resource usage.

Example:

Consider a pipeline where multiple processing steps both depend on data preparation:

prepare_data (root)
    ├── process_type_a (depends on prepare_data)
    └── process_type_b (also depends on prepare_data)

With DAG execution: prepare_data runs once, and both type_a and type_b use the same prepared data.

Topological Sorting Example

For this dependency structure:

steps:
  step_1_prepare:         # No dependencies - runs first
    depends_on: []
  step_2_process_a:       # Depends on step_1
    depends_on: [step_1_prepare]
  step_2_process_b:       # Also depends on step_1
    depends_on: [step_1_prepare]
  step_3_aggregate:       # Depends on both step_2s
    depends_on: [step_2_process_a, step_2_process_b]

Execution order is determined by topological sorting:

Execution Order:
1. step_1_prepare          (no dependencies, start)
2. step_2_process_a        (after step_1 completes)
3. step_2_process_b        (after step_1 completes)
4. step_3_aggregate        (after both step_2s complete)

Benefits

  • No duplicate execution of shared dependencies

  • Correct execution order guaranteed

  • Support for complex dependency graphs

  • Proper data flow between steps

See Progress Tracking for details on the deduplication strategy.

Job Deduplication and Batch Items#

Bio_pype tracks and deduplicates jobs intelligently to handle both: - Resuming previously completed work - Tracking each item in batch processing separately

How Job Matching Works

Jobs are matched by both their name and their command string. This ensures precise tracking of which specific work has completed.

For example:

# Scenario: Batch processing 3 samples with same step
prepare_sample:
  name: "process_sample"
  command: "bash process.sh --sample sample1.txt"

prepare_sample:
  name: "process_sample"
  command: "bash process.sh --sample sample2.txt"

prepare_sample:
  name: "process_sample"
  command: "bash process.sh --sample sample3.txt"

# Each gets a separate progress entry because commands differ
# This allows batch items to track independently

Implementation

When executing a step, the pipeline:

  1. Builds a command string from the step name and arguments

  2. Searches for existing jobs with matching name AND command

  3. Reuses existing job if both match (skip execution)

  4. Creates new job if no match found

Code reference (pype/utils/pipeline.py):

command = " ".join(map(str, [item.name] + arg_str)) if arg_str else item.name

# Try to find existing job matching BOTH name and command
existing_job = None
for jid, job in progress.jobs.items():
    if job.name == friendly_name and job.command == command:
        existing_job = jid
        break

if not existing_job:
    progress.add_job(job_id, friendly_name, command, ...)
else:
    job_id = existing_job  # Reuse existing

Design Benefits

  • Precise batch tracking: Each batch item is tracked independently based on what it actually executes

  • Correct resume behavior: On resume, identical work is skipped while variations are re-evaluated

  • Accurate runtime tracking: pipeline_runtime.yaml reflects exactly which commands have completed, not just names

  • Parametric pipeline support: Pipelines with varying arguments are properly handled

Example: Batch Processing Pipeline

steps:
  step_1_prepare:
    name: prepare_data
    type: snippet
    depends_on: []
    arguments:
      -i: raw_data.txt
      -o: '%(prepared_data)s'

  step_2_process_batch:
    name: process_sample
    type: batch_snippet
    depends_on: [step_1_prepare]
    arguments:
      batch_snippet: process_snippet
      input_batch:
        value: '%(sample_list)s'
        type: batch_file_arg
      -o: '%(output_prefix)s_%(batch_item)s.txt'

# When executed with 3 samples:
# - step_1_prepare: Creates 1 job (prepare_data command)
# - step_2_process_batch: Creates 3 jobs (one for each sample)
#
# Progress tracking:
# - 4 separate job entries in pipeline_runtime.yaml
# - step_1_prepare: runs once, results available to all batch items
# - Each batch item: tracks independently on resume

Running Pipelines#

Basic execution:

pype pipelines my_pipeline --input input.fa --output output.fa

With specific queue:

pype pipelines --queue slurm my_pipeline --input input.fa --output output.fa

Batch processing:

pype pipelines my_pipeline --sample_sheet samples.tsv

Complete Example#

Here’s a tested example combining multiple features:

Complete pipeline example#
info:
  description: Reverse Complement Lower case a fasta
  date: 01/10/2020
  api: 2.1.0
steps:
  step_reverse_fa:
    name: reverse_fa
    type: snippet
    depends_on: []
    arguments:
      -i: '%(input_fa)s'
      -o: '%(reverse_fa)s'
  step_complement_fa:
    name: complement_fa
    type: snippet
    depends_on:
    - step_reverse_fa
    arguments:
      -i: '%(reverse_fa)s'
      -o: '%(complement_fa)s'
  step_lower_fa:
    name: lower_fa
    type: snippet
    depends_on:
    - step_complement_fa
    arguments:
      -i: '%(complement_fa)s'
      -o: '%(output)s'

This pipeline:

  1. Takes a FASTA file as input

  2. Reverses the sequences

  3. Creates complement sequences

  4. Converts to lowercase

  5. Demonstrates dependency management


Quick Reference#

Info Section#

Required: description, api

Optional: arguments, defaults, batches

Step Structure (API 2.1.0)#

steps:
  step_id:
    name: snippet_or_pipeline_name
    type: snippet|pipeline|batch_snippet|batch_pipeline
    depends_on: [list_of_step_ids]
    arguments:
      --arg: value

Argument Types#

  • Simple: --arg: '%(variable)s' or --arg: fixed_value

  • List: --arg: ['%(val1)s', '%(val2)s']

  • Batch file: value: '%(var)s' with type: batch_file_arg

  • Batch list: value: {dict-of-lists or list-of-dicts} with type: batch_list_arg

  • Composite: value: {snippet_name, result_key, result_arguments} with type: composite_arg

See Composite Arguments and Snippet Results Pattern for details on composite arguments.

See Batch List Arguments and Dynamic Batch Expansion for details on batch list arguments and dynamic batch expansion.