Queue Systems#
Bio_pype supports multiple queue/scheduling systems through a modular adapter architecture. This allows pipelines to run on different computational environments without changing pipeline definitions.
Overview#
Queue systems in Bio_pype:
Submit jobs to various schedulers (SLURM, PBS/Torque, SGE, etc.)
Handle job dependencies and execution order
Manage resource requirements (CPUs, memory, walltime)
Track job submission and completion
Support both local and distributed execution
Available Queue Systems#
Local Execution (none)#
Executes jobs directly on the local machine without a queue system.
When to use:
Testing pipelines
Small-scale analyses
Single-machine workflows
Debugging
Usage:
pype pipeline --queue none my_pipeline --input data.txt
Characteristics:
Jobs run immediately and sequentially
No job scheduling or queuing
Resource limits not enforced
Dependencies handled by pipeline
Output in stdout/stderr files
Configuration:
No special configuration needed. This is the default fallback queue.
SLURM#
Submits jobs to the SLURM workload manager (Simple Linux Utility for Resource Management).
When to use:
HPC clusters with SLURM
Parallel job execution
Resource-intensive workflows
Production pipelines
Usage:
pype pipeline --queue slurm my_pipeline --input data.txt
Resource Mapping:
Bio_pype |
SLURM Flag |
Example |
|---|---|---|
ncpu |
–cpus-per-task |
–cpus-per-task=8 |
mem |
–mem |
–mem=16G |
time |
–time |
–time=02:00:00 |
Example snippet requirements:
requirements:
ncpu: 8
mem: 16gb
time: '02:00:00'
Translates to SLURM:
sbatch --cpus-per-task=8 --mem=16G --time=02:00:00 job_script.sh
Configuration:
In your queue module (pype_modules/queues/slurm.py), you can customize:
Default partition
Quality of service (QoS)
Account/project codes
Additional sbatch options
PBS/Torque#
Submits jobs to PBS (Portable Batch System) or Torque resource managers.
When to use:
Clusters with PBS/Torque
Traditional HPC environments
Legacy systems
Usage:
pype pipeline --queue pbs my_pipeline --input data.txt
Resource Mapping:
Bio_pype |
PBS Flag |
Example |
|---|---|---|
ncpu |
-l nodes=1:ppn= |
-l nodes=1:ppn=8 |
mem |
-l mem= |
-l mem=16gb |
time |
-l walltime= |
-l walltime=02:00:00 |
Example snippet requirements:
requirements:
ncpu: 8
mem: 16gb
time: '02:00:00'
Translates to PBS:
qsub -l nodes=1:ppn=8,mem=16gb,walltime=02:00:00 job_script.sh
SGE (Sun Grid Engine)#
Submits jobs to SGE or compatible grid engines (e.g., UGE, OGE).
When to use:
Clusters with SGE/UGE/OGE
Grid computing environments
Usage:
pype pipeline --queue sge my_pipeline --input data.txt
Resource Mapping:
Bio_pype |
SGE Flag |
Example |
|---|---|---|
ncpu |
-pe smp |
-pe smp 8 |
mem |
-l h_vmem= |
-l h_vmem=2G (per core) |
time |
-l h_rt= |
-l h_rt=02:00:00 |
Resource Requirements#
Defining Requirements in Snippets#
Markdown snippets:
## requirements
```yaml
ncpu: 8 # Number of CPU cores
mem: 16gb # Memory allocation
time: '02:00:00' # Max runtime (HH:MM:SS)
```
Python snippets:
def requirements():
return {
'ncpu': 8,
'mem': '16gb',
'time': '02:00:00'
}
Supported units:
Memory: ‘gb’, ‘GB’, ‘mb’, ‘MB’, ‘kb’, ‘KB’
Time: ‘HH:MM:SS’ format
CPUs: Integer number of cores
Overriding Requirements in Pipelines#
You can override snippet requirements in pipeline definitions:
steps:
step_1_align:
name: bwa_mem
type: snippet
requirements:
ncpu: 16 # Override default 8 CPUs
mem: 32gb # Override default 16GB
time: '04:00:00' # Override default 2 hours
arguments:
-i: '%(input_bam)s'
Job Dependencies#
Automatic Dependency Management#
Bio_pype automatically handles job dependencies:
Within pipelines: Jobs wait for dependencies to complete
Across steps: Output from one step becomes input to the next
Array jobs: All jobs in group tracked together
Example:
steps:
step_1_prepare:
name: prepare_reference
type: snippet
depends_on: []
step_2_align:
name: align_reads
type: snippet
depends_on: [step_1_prepare] # Waits for step_1
step_3_sort:
name: sort_bam
type: snippet
depends_on: [step_2_align] # Waits for step_2
Queue System Dependency Handling#
Different queue systems handle dependencies differently:
SLURM:
sbatch --dependency=afterok:12345 job_script.sh
PBS/Torque:
qsub -W depend=afterok:12345 job_script.sh
SGE:
qsub -hold_jid 12345 job_script.sh
Local (none):
Jobs run sequentially, no queue-level dependencies needed.
Job Submission and Monitoring#
Submitting Jobs#
Bio_pype handles job submission automatically when you run a pipeline:
$ pype pipeline --queue slurm genomic_analysis --input sample.fq
For each snippet in the pipeline:
Checks if job should be skipped (via progress tracking)
Prepares job script with appropriate environment
Submits to queue system
Records queue ID in pipeline_runtime.yaml
Sets up log files
Moves to next job
Monitoring Jobs#
SLURM:
# View all your jobs
squeue -u $USER
# View specific job
squeue -j 12345
# View job details
scontrol show job 12345
PBS/Torque:
# View all your jobs
qstat -u $USER
# View specific job
qstat -f 12345
SGE:
# View all your jobs
qstat
# View specific job
qstat -j 12345
Local (none):
# Check process
ps aux | grep pype
# View logs in real-time
tail -f ~/.bio_pype/logs/*/jobs/*/stdout
Checking Job Status#
Bio_pype progress tracking provides job status:
# View runtime file
cat /path/to/logs/<run_id>_<pipeline>/pipeline_runtime.yaml
# Look for job status
align_reads_abc123:
status: completed
queue_id: '12345'
"submitted_at": "2025-01-24T10:00:00",
"completed_at": "2025-01-24T10:30:00"
}
}
}
Custom Queue Adapters#
Creating a Custom Queue Adapter#
You can create custom queue adapters for unsupported systems.
Required structure:
my_queues/
├── __init__.py # Required for module
└── my_custom_queue.py # Your queue adapter
Required functions:
def submit(command, snippet_name, requirements, dependencies, log, profile):
"""
Submit a job to the queue system.
Args:
command: Command string to execute
snippet_name: Name of the snippet
requirements: Dict of resource requirements
dependencies: List of job IDs this job depends on
log: Logger object
profile: Profile name
Returns:
Queue job ID (string) or None
"""
# Your implementation here
pass
def post_run(log):
"""
Optional: Cleanup or post-processing after pipeline completes.
Args:
log: Logger object
"""
# Optional implementation
pass
Example custom queue adapter:
import subprocess
import re
def submit(command, snippet_name, requirements, dependencies, log, profile):
\"\"\"Submit job to custom queue system.\"\"\"
# Extract requirements
ncpu = requirements.get('ncpu', 1)
mem = requirements.get('mem', '4gb')
time = requirements.get('time', '01:00:00')
# Build queue command
queue_cmd = [
'my_submit_command',
f'--cpus={ncpu}',
f'--memory={mem}',
f'--time={time}',
'--job-name', snippet_name
]
# Add dependencies
if dependencies:
dep_str = ','.join(map(str, dependencies))
queue_cmd.extend(['--depends-on', dep_str])
# Add command to execute
queue_cmd.append(command)
# Submit job
log.log.info(f'Submitting to queue: {" ".join(queue_cmd)}')
result = subprocess.run(queue_cmd, capture_output=True, text=True)
# Parse job ID from output
match = re.search(r'Job ID: (\d+)', result.stdout)
if match:
job_id = match.group(1)
log.log.info(f'Job {snippet_name} submitted with ID {job_id}')
return job_id
else:
log.log.error(f'Failed to get job ID: {result.stderr}')
return None
Configuration:
# Set queue path in config
export PYPE_QUEUES=/path/to/my_queues
# Use custom queue
pype pipeline --queue my_custom_queue my_pipeline --input data.txt
Best Practices#
Choose appropriate queue: Use
nonefor testing, cluster queues for productionSet realistic resource requirements: Don’t over-request resources
Monitor queue usage: Check for failed or stuck jobs regularly
Use resume functionality: Let Bio_pype handle interrupted runs
Test locally first: Use
--queue nonebefore submitting to clusterCheck queue limits: Ensure requirements fit within queue limits
Handle errors gracefully: Check logs when jobs fail
Use job arrays: For batch processing, submit array jobs
Track progress: Monitor pipeline_runtime.yaml for long-running pipelines
Clean up: Remove completed jobs and old log files
Troubleshooting#
Job Won’t Submit#
Symptoms:
No queue ID returned
Job not appearing in queue
Submission errors in logs
Solutions:
Check queue system is available:
which sbatch # For SLURM which qsub # For PBS/SGE
Verify queue module is installed:
ls $PYPE_QUEUES/
Check resource requirements are valid:
# Look in logs for submission command cat ~/.bio_pype/logs/*/pipeline.log | grep "Submitting"
Test queue manually:
echo "#!/bin/bash\necho hello" | sbatch
Job Fails Immediately#
Symptoms:
Job completes with error code
Short runtime
Error messages in stderr
Solutions:
Check stderr log:
cat ~/.bio_pype/logs/*/jobs/*/stderr
Verify environment:
# Check if modules/containers are accessible # Check if paths exist
Test command locally:
pype pipeline --queue none my_pipeline --input data.txt
Check resource allocation:
# Job may have run out of memory or time
Job Stays Pending#
Symptoms:
Job status: PENDING for extended period
Never starts running
Solutions:
Check queue status:
squeue -u $USER # SLURM qstat -u $USER # PBS/SGE
Check resource availability:
sinfo # SLURM pbsnodes # PBS
Reduce resource requirements:
# Decrease ncpu, mem, or time in snippetCheck queue limits:
# Verify you haven't exceeded job or resource limits
Dependencies Not Working#
Symptoms:
Jobs run in wrong order
Jobs fail due to missing inputs
Solutions:
Check pipeline dependencies:
# Review depends_on in pipeline YAMLVerify job IDs are tracked:
# Check runtime file for queue_ids grep "queue_id" pipeline_runtime.yaml
Test without queue:
pype pipeline --queue none my_pipeline ...
Integration with Progress Tracking#
Queue systems integrate seamlessly with Bio_pype’s progress tracking:
Job submission: Queue ID recorded in pipeline_runtime.yaml
Status tracking: Job status updated (pending, running, completed, failed)
Resume: Failed or cancelled jobs can be rerun
Monitoring: pipeline_runtime.yaml shows queue IDs for all jobs
See Progress Tracking and Pipeline Resume for more details.
See Also#
Progress Tracking - Progress tracking system
Pipeline Resume - Pipeline resume functionality
Pipelines - Pipeline definitions
Snippets - Snippet creation
Profiles - Profile configuration