Queue Systems#

Bio_pype supports multiple queue/scheduling systems through a modular adapter architecture. This allows pipelines to run on different computational environments without changing pipeline definitions.

Overview#

Queue systems in Bio_pype:

  • Submit jobs to various schedulers (SLURM, PBS/Torque, SGE, etc.)

  • Handle job dependencies and execution order

  • Manage resource requirements (CPUs, memory, walltime)

  • Track job submission and completion

  • Support both local and distributed execution

Available Queue Systems#

Local Execution (none)#

Executes jobs directly on the local machine without a queue system.

When to use:

  • Testing pipelines

  • Small-scale analyses

  • Single-machine workflows

  • Debugging

Usage:

pype pipeline --queue none my_pipeline --input data.txt

Characteristics:

  • Jobs run immediately and sequentially

  • No job scheduling or queuing

  • Resource limits not enforced

  • Dependencies handled by pipeline

  • Output in stdout/stderr files

Configuration:

No special configuration needed. This is the default fallback queue.

SLURM#

Submits jobs to the SLURM workload manager (Simple Linux Utility for Resource Management).

When to use:

  • HPC clusters with SLURM

  • Parallel job execution

  • Resource-intensive workflows

  • Production pipelines

Usage:

pype pipeline --queue slurm my_pipeline --input data.txt

Resource Mapping:

Bio_pype

SLURM Flag

Example

ncpu

–cpus-per-task

–cpus-per-task=8

mem

–mem

–mem=16G

time

–time

–time=02:00:00

Example snippet requirements:

requirements:
  ncpu: 8
  mem: 16gb
  time: '02:00:00'

Translates to SLURM:

sbatch --cpus-per-task=8 --mem=16G --time=02:00:00 job_script.sh

Configuration:

In your queue module (pype_modules/queues/slurm.py), you can customize:

  • Default partition

  • Quality of service (QoS)

  • Account/project codes

  • Additional sbatch options

PBS/Torque#

Submits jobs to PBS (Portable Batch System) or Torque resource managers.

When to use:

  • Clusters with PBS/Torque

  • Traditional HPC environments

  • Legacy systems

Usage:

pype pipeline --queue pbs my_pipeline --input data.txt

Resource Mapping:

Bio_pype

PBS Flag

Example

ncpu

-l nodes=1:ppn=

-l nodes=1:ppn=8

mem

-l mem=

-l mem=16gb

time

-l walltime=

-l walltime=02:00:00

Example snippet requirements:

requirements:
  ncpu: 8
  mem: 16gb
  time: '02:00:00'

Translates to PBS:

qsub -l nodes=1:ppn=8,mem=16gb,walltime=02:00:00 job_script.sh

SGE (Sun Grid Engine)#

Submits jobs to SGE or compatible grid engines (e.g., UGE, OGE).

When to use:

  • Clusters with SGE/UGE/OGE

  • Grid computing environments

Usage:

pype pipeline --queue sge my_pipeline --input data.txt

Resource Mapping:

Bio_pype

SGE Flag

Example

ncpu

-pe smp

-pe smp 8

mem

-l h_vmem=

-l h_vmem=2G (per core)

time

-l h_rt=

-l h_rt=02:00:00

Resource Requirements#

Defining Requirements in Snippets#

Markdown snippets:

## requirements

```yaml
ncpu: 8          # Number of CPU cores
mem: 16gb        # Memory allocation
time: '02:00:00' # Max runtime (HH:MM:SS)
```

Python snippets:

def requirements():
    return {
        'ncpu': 8,
        'mem': '16gb',
        'time': '02:00:00'
    }

Supported units:

  • Memory: ‘gb’, ‘GB’, ‘mb’, ‘MB’, ‘kb’, ‘KB’

  • Time: ‘HH:MM:SS’ format

  • CPUs: Integer number of cores

Overriding Requirements in Pipelines#

You can override snippet requirements in pipeline definitions:

steps:
  step_1_align:
    name: bwa_mem
    type: snippet
    requirements:
      ncpu: 16      # Override default 8 CPUs
      mem: 32gb     # Override default 16GB
      time: '04:00:00'  # Override default 2 hours
    arguments:
      -i: '%(input_bam)s'

Job Dependencies#

Automatic Dependency Management#

Bio_pype automatically handles job dependencies:

  1. Within pipelines: Jobs wait for dependencies to complete

  2. Across steps: Output from one step becomes input to the next

  3. Array jobs: All jobs in group tracked together

Example:

steps:
  step_1_prepare:
    name: prepare_reference
    type: snippet
    depends_on: []

  step_2_align:
    name: align_reads
    type: snippet
    depends_on: [step_1_prepare]  # Waits for step_1

  step_3_sort:
    name: sort_bam
    type: snippet
    depends_on: [step_2_align]  # Waits for step_2

Queue System Dependency Handling#

Different queue systems handle dependencies differently:

SLURM:

sbatch --dependency=afterok:12345 job_script.sh

PBS/Torque:

qsub -W depend=afterok:12345 job_script.sh

SGE:

qsub -hold_jid 12345 job_script.sh

Local (none):

Jobs run sequentially, no queue-level dependencies needed.

Job Submission and Monitoring#

Submitting Jobs#

Bio_pype handles job submission automatically when you run a pipeline:

$ pype pipeline --queue slurm genomic_analysis --input sample.fq

For each snippet in the pipeline:

  1. Checks if job should be skipped (via progress tracking)

  2. Prepares job script with appropriate environment

  3. Submits to queue system

  4. Records queue ID in pipeline_runtime.yaml

  5. Sets up log files

  6. Moves to next job

Monitoring Jobs#

SLURM:

# View all your jobs
squeue -u $USER

# View specific job
squeue -j 12345

# View job details
scontrol show job 12345

PBS/Torque:

# View all your jobs
qstat -u $USER

# View specific job
qstat -f 12345

SGE:

# View all your jobs
qstat

# View specific job
qstat -j 12345

Local (none):

# Check process
ps aux | grep pype

# View logs in real-time
tail -f ~/.bio_pype/logs/*/jobs/*/stdout

Checking Job Status#

Bio_pype progress tracking provides job status:

# View runtime file
cat /path/to/logs/<run_id>_<pipeline>/pipeline_runtime.yaml

# Look for job status
align_reads_abc123:
  status: completed
  queue_id: '12345'
      "submitted_at": "2025-01-24T10:00:00",
      "completed_at": "2025-01-24T10:30:00"
    }
  }
}

Custom Queue Adapters#

Creating a Custom Queue Adapter#

You can create custom queue adapters for unsupported systems.

Required structure:

my_queues/
├── __init__.py           # Required for module
└── my_custom_queue.py    # Your queue adapter

Required functions:

def submit(command, snippet_name, requirements, dependencies, log, profile):
    """
    Submit a job to the queue system.

    Args:
        command: Command string to execute
        snippet_name: Name of the snippet
        requirements: Dict of resource requirements
        dependencies: List of job IDs this job depends on
        log: Logger object
        profile: Profile name

    Returns:
        Queue job ID (string) or None
    """
    # Your implementation here
    pass

def post_run(log):
    """
    Optional: Cleanup or post-processing after pipeline completes.

    Args:
        log: Logger object
    """
    # Optional implementation
    pass

Example custom queue adapter:

import subprocess
import re

def submit(command, snippet_name, requirements, dependencies, log, profile):
    \"\"\"Submit job to custom queue system.\"\"\"

    # Extract requirements
    ncpu = requirements.get('ncpu', 1)
    mem = requirements.get('mem', '4gb')
    time = requirements.get('time', '01:00:00')

    # Build queue command
    queue_cmd = [
        'my_submit_command',
        f'--cpus={ncpu}',
        f'--memory={mem}',
        f'--time={time}',
        '--job-name', snippet_name
    ]

    # Add dependencies
    if dependencies:
        dep_str = ','.join(map(str, dependencies))
        queue_cmd.extend(['--depends-on', dep_str])

    # Add command to execute
    queue_cmd.append(command)

    # Submit job
    log.log.info(f'Submitting to queue: {" ".join(queue_cmd)}')
    result = subprocess.run(queue_cmd, capture_output=True, text=True)

    # Parse job ID from output
    match = re.search(r'Job ID: (\d+)', result.stdout)
    if match:
        job_id = match.group(1)
        log.log.info(f'Job {snippet_name} submitted with ID {job_id}')
        return job_id
    else:
        log.log.error(f'Failed to get job ID: {result.stderr}')
        return None

Configuration:

# Set queue path in config
export PYPE_QUEUES=/path/to/my_queues

# Use custom queue
pype pipeline --queue my_custom_queue my_pipeline --input data.txt

Best Practices#

  1. Choose appropriate queue: Use none for testing, cluster queues for production

  2. Set realistic resource requirements: Don’t over-request resources

  3. Monitor queue usage: Check for failed or stuck jobs regularly

  4. Use resume functionality: Let Bio_pype handle interrupted runs

  5. Test locally first: Use --queue none before submitting to cluster

  6. Check queue limits: Ensure requirements fit within queue limits

  7. Handle errors gracefully: Check logs when jobs fail

  8. Use job arrays: For batch processing, submit array jobs

  9. Track progress: Monitor pipeline_runtime.yaml for long-running pipelines

  10. Clean up: Remove completed jobs and old log files

Troubleshooting#

Job Won’t Submit#

Symptoms:

  • No queue ID returned

  • Job not appearing in queue

  • Submission errors in logs

Solutions:

  1. Check queue system is available:

    which sbatch  # For SLURM
    which qsub    # For PBS/SGE
    
  2. Verify queue module is installed:

    ls $PYPE_QUEUES/
    
  3. Check resource requirements are valid:

    # Look in logs for submission command
    cat ~/.bio_pype/logs/*/pipeline.log | grep "Submitting"
    
  4. Test queue manually:

    echo "#!/bin/bash\necho hello" | sbatch
    

Job Fails Immediately#

Symptoms:

  • Job completes with error code

  • Short runtime

  • Error messages in stderr

Solutions:

  1. Check stderr log:

    cat ~/.bio_pype/logs/*/jobs/*/stderr
    
  2. Verify environment:

    # Check if modules/containers are accessible
    # Check if paths exist
    
  3. Test command locally:

    pype pipeline --queue none my_pipeline --input data.txt
    
  4. Check resource allocation:

    # Job may have run out of memory or time
    

Job Stays Pending#

Symptoms:

  • Job status: PENDING for extended period

  • Never starts running

Solutions:

  1. Check queue status:

    squeue -u $USER  # SLURM
    qstat -u $USER   # PBS/SGE
    
  2. Check resource availability:

    sinfo  # SLURM
    pbsnodes  # PBS
    
  3. Reduce resource requirements:

    # Decrease ncpu, mem, or time in snippet
    
  4. Check queue limits:

    # Verify you haven't exceeded job or resource limits
    

Dependencies Not Working#

Symptoms:

  • Jobs run in wrong order

  • Jobs fail due to missing inputs

Solutions:

  1. Check pipeline dependencies:

    # Review depends_on in pipeline YAML
    
  2. Verify job IDs are tracked:

    # Check runtime file for queue_ids
    grep "queue_id" pipeline_runtime.yaml
    
  3. Test without queue:

    pype pipeline --queue none my_pipeline ...
    

Integration with Progress Tracking#

Queue systems integrate seamlessly with Bio_pype’s progress tracking:

  • Job submission: Queue ID recorded in pipeline_runtime.yaml

  • Status tracking: Job status updated (pending, running, completed, failed)

  • Resume: Failed or cancelled jobs can be rerun

  • Monitoring: pipeline_runtime.yaml shows queue IDs for all jobs

See Progress Tracking and Pipeline Resume for more details.

See Also#