.. index:: Queue Systems .. _queues: Queue Systems ============= Bio_pype supports multiple queue/scheduling systems through a modular adapter architecture. This allows pipelines to run on different computational environments without changing pipeline definitions. Overview -------- Queue systems in Bio_pype: - Submit jobs to various schedulers (SLURM, PBS/Torque, SGE, etc.) - Handle job dependencies and execution order - Manage resource requirements (CPUs, memory, walltime) - Track job submission and completion - Support both local and distributed execution Available Queue Systems ----------------------- Local Execution (none) ^^^^^^^^^^^^^^^^^^^^^^ Executes jobs directly on the local machine without a queue system. **When to use:** - Testing pipelines - Small-scale analyses - Single-machine workflows - Debugging **Usage**:: pype pipeline --queue none my_pipeline --input data.txt **Characteristics:** - Jobs run immediately and sequentially - No job scheduling or queuing - Resource limits not enforced - Dependencies handled by pipeline - Output in stdout/stderr files **Configuration:** No special configuration needed. This is the default fallback queue. SLURM ^^^^^ Submits jobs to the SLURM workload manager (Simple Linux Utility for Resource Management). **When to use:** - HPC clusters with SLURM - Parallel job execution - Resource-intensive workflows - Production pipelines **Usage**:: pype pipeline --queue slurm my_pipeline --input data.txt **Resource Mapping:** .. list-table:: :widths: 33 33 33 :header-rows: 1 * - Bio_pype - SLURM Flag - Example * - ncpu - --cpus-per-task - --cpus-per-task=8 * - mem - --mem - --mem=16G * - time - --time - --time=02:00:00 **Example snippet requirements**:: requirements: ncpu: 8 mem: 16gb time: '02:00:00' **Translates to SLURM**:: sbatch --cpus-per-task=8 --mem=16G --time=02:00:00 job_script.sh **Configuration:** In your queue module (``pype_modules/queues/slurm.py``), you can customize: - Default partition - Quality of service (QoS) - Account/project codes - Additional sbatch options PBS/Torque ^^^^^^^^^^ Submits jobs to PBS (Portable Batch System) or Torque resource managers. **When to use:** - Clusters with PBS/Torque - Traditional HPC environments - Legacy systems **Usage**:: pype pipeline --queue pbs my_pipeline --input data.txt **Resource Mapping:** .. list-table:: :widths: 33 33 33 :header-rows: 1 * - Bio_pype - PBS Flag - Example * - ncpu - -l nodes=1:ppn= - -l nodes=1:ppn=8 * - mem - -l mem= - -l mem=16gb * - time - -l walltime= - -l walltime=02:00:00 **Example snippet requirements**:: requirements: ncpu: 8 mem: 16gb time: '02:00:00' **Translates to PBS**:: qsub -l nodes=1:ppn=8,mem=16gb,walltime=02:00:00 job_script.sh SGE (Sun Grid Engine) ^^^^^^^^^^^^^^^^^^^^^ Submits jobs to SGE or compatible grid engines (e.g., UGE, OGE). **When to use:** - Clusters with SGE/UGE/OGE - Grid computing environments **Usage**:: pype pipeline --queue sge my_pipeline --input data.txt **Resource Mapping:** .. list-table:: :widths: 33 33 33 :header-rows: 1 * - Bio_pype - SGE Flag - Example * - ncpu - -pe smp - -pe smp 8 * - mem - -l h_vmem= - -l h_vmem=2G (per core) * - time - -l h_rt= - -l h_rt=02:00:00 Resource Requirements --------------------- Defining Requirements in Snippets ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ **Markdown snippets**:: ## requirements ```yaml ncpu: 8 # Number of CPU cores mem: 16gb # Memory allocation time: '02:00:00' # Max runtime (HH:MM:SS) ``` **Python snippets**:: def requirements(): return { 'ncpu': 8, 'mem': '16gb', 'time': '02:00:00' } **Supported units:** - Memory: 'gb', 'GB', 'mb', 'MB', 'kb', 'KB' - Time: 'HH:MM:SS' format - CPUs: Integer number of cores Overriding Requirements in Pipelines ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ You can override snippet requirements in pipeline definitions:: steps: step_1_align: name: bwa_mem type: snippet requirements: ncpu: 16 # Override default 8 CPUs mem: 32gb # Override default 16GB time: '04:00:00' # Override default 2 hours arguments: -i: '%(input_bam)s' Job Dependencies ---------------- Automatic Dependency Management ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Bio_pype automatically handles job dependencies: 1. **Within pipelines**: Jobs wait for dependencies to complete 2. **Across steps**: Output from one step becomes input to the next 3. **Array jobs**: All jobs in group tracked together **Example**:: steps: step_1_prepare: name: prepare_reference type: snippet depends_on: [] step_2_align: name: align_reads type: snippet depends_on: [step_1_prepare] # Waits for step_1 step_3_sort: name: sort_bam type: snippet depends_on: [step_2_align] # Waits for step_2 Queue System Dependency Handling ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Different queue systems handle dependencies differently: **SLURM**:: sbatch --dependency=afterok:12345 job_script.sh **PBS/Torque**:: qsub -W depend=afterok:12345 job_script.sh **SGE**:: qsub -hold_jid 12345 job_script.sh **Local (none)**: Jobs run sequentially, no queue-level dependencies needed. Job Submission and Monitoring ------------------------------ Submitting Jobs ^^^^^^^^^^^^^^^ Bio_pype handles job submission automatically when you run a pipeline:: $ pype pipeline --queue slurm genomic_analysis --input sample.fq For each snippet in the pipeline: 1. Checks if job should be skipped (via progress tracking) 2. Prepares job script with appropriate environment 3. Submits to queue system 4. Records queue ID in pipeline_runtime.yaml 5. Sets up log files 6. Moves to next job Monitoring Jobs ^^^^^^^^^^^^^^^ **SLURM**:: # View all your jobs squeue -u $USER # View specific job squeue -j 12345 # View job details scontrol show job 12345 **PBS/Torque**:: # View all your jobs qstat -u $USER # View specific job qstat -f 12345 **SGE**:: # View all your jobs qstat # View specific job qstat -j 12345 **Local (none)**:: # Check process ps aux | grep pype # View logs in real-time tail -f ~/.bio_pype/logs/*/jobs/*/stdout Checking Job Status ^^^^^^^^^^^^^^^^^^^ Bio_pype progress tracking provides job status:: # View runtime file cat /path/to/logs/_/pipeline_runtime.yaml # Look for job status align_reads_abc123: status: completed queue_id: '12345' "submitted_at": "2025-01-24T10:00:00", "completed_at": "2025-01-24T10:30:00" } } } Custom Queue Adapters --------------------- Creating a Custom Queue Adapter ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ You can create custom queue adapters for unsupported systems. **Required structure**:: my_queues/ ├── __init__.py # Required for module └── my_custom_queue.py # Your queue adapter **Required functions**:: def submit(command, snippet_name, requirements, dependencies, log, profile): """ Submit a job to the queue system. Args: command: Command string to execute snippet_name: Name of the snippet requirements: Dict of resource requirements dependencies: List of job IDs this job depends on log: Logger object profile: Profile name Returns: Queue job ID (string) or None """ # Your implementation here pass def post_run(log): """ Optional: Cleanup or post-processing after pipeline completes. Args: log: Logger object """ # Optional implementation pass **Example custom queue adapter**:: import subprocess import re def submit(command, snippet_name, requirements, dependencies, log, profile): \"\"\"Submit job to custom queue system.\"\"\" # Extract requirements ncpu = requirements.get('ncpu', 1) mem = requirements.get('mem', '4gb') time = requirements.get('time', '01:00:00') # Build queue command queue_cmd = [ 'my_submit_command', f'--cpus={ncpu}', f'--memory={mem}', f'--time={time}', '--job-name', snippet_name ] # Add dependencies if dependencies: dep_str = ','.join(map(str, dependencies)) queue_cmd.extend(['--depends-on', dep_str]) # Add command to execute queue_cmd.append(command) # Submit job log.log.info(f'Submitting to queue: {" ".join(queue_cmd)}') result = subprocess.run(queue_cmd, capture_output=True, text=True) # Parse job ID from output match = re.search(r'Job ID: (\d+)', result.stdout) if match: job_id = match.group(1) log.log.info(f'Job {snippet_name} submitted with ID {job_id}') return job_id else: log.log.error(f'Failed to get job ID: {result.stderr}') return None **Configuration**:: # Set queue path in config export PYPE_QUEUES=/path/to/my_queues # Use custom queue pype pipeline --queue my_custom_queue my_pipeline --input data.txt Best Practices -------------- 1. **Choose appropriate queue**: Use ``none`` for testing, cluster queues for production 2. **Set realistic resource requirements**: Don't over-request resources 3. **Monitor queue usage**: Check for failed or stuck jobs regularly 4. **Use resume functionality**: Let Bio_pype handle interrupted runs 5. **Test locally first**: Use ``--queue none`` before submitting to cluster 6. **Check queue limits**: Ensure requirements fit within queue limits 7. **Handle errors gracefully**: Check logs when jobs fail 8. **Use job arrays**: For batch processing, submit array jobs 9. **Track progress**: Monitor pipeline_runtime.yaml for long-running pipelines 10. **Clean up**: Remove completed jobs and old log files Troubleshooting --------------- Job Won't Submit ^^^^^^^^^^^^^^^^ **Symptoms:** - No queue ID returned - Job not appearing in queue - Submission errors in logs **Solutions:** 1. Check queue system is available:: which sbatch # For SLURM which qsub # For PBS/SGE 2. Verify queue module is installed:: ls $PYPE_QUEUES/ 3. Check resource requirements are valid:: # Look in logs for submission command cat ~/.bio_pype/logs/*/pipeline.log | grep "Submitting" 4. Test queue manually:: echo "#!/bin/bash\necho hello" | sbatch Job Fails Immediately ^^^^^^^^^^^^^^^^^^^^^ **Symptoms:** - Job completes with error code - Short runtime - Error messages in stderr **Solutions:** 1. Check stderr log:: cat ~/.bio_pype/logs/*/jobs/*/stderr 2. Verify environment:: # Check if modules/containers are accessible # Check if paths exist 3. Test command locally:: pype pipeline --queue none my_pipeline --input data.txt 4. Check resource allocation:: # Job may have run out of memory or time Job Stays Pending ^^^^^^^^^^^^^^^^^ **Symptoms:** - Job status: PENDING for extended period - Never starts running **Solutions:** 1. Check queue status:: squeue -u $USER # SLURM qstat -u $USER # PBS/SGE 2. Check resource availability:: sinfo # SLURM pbsnodes # PBS 3. Reduce resource requirements:: # Decrease ncpu, mem, or time in snippet 4. Check queue limits:: # Verify you haven't exceeded job or resource limits Dependencies Not Working ^^^^^^^^^^^^^^^^^^^^^^^^^ **Symptoms:** - Jobs run in wrong order - Jobs fail due to missing inputs **Solutions:** 1. Check pipeline dependencies:: # Review depends_on in pipeline YAML 2. Verify job IDs are tracked:: # Check runtime file for queue_ids grep "queue_id" pipeline_runtime.yaml 3. Test without queue:: pype pipeline --queue none my_pipeline ... Integration with Progress Tracking ----------------------------------- Queue systems integrate seamlessly with Bio_pype's progress tracking: - **Job submission**: Queue ID recorded in pipeline_runtime.yaml - **Status tracking**: Job status updated (pending, running, completed, failed) - **Resume**: Failed or cancelled jobs can be rerun - **Monitoring**: pipeline_runtime.yaml shows queue IDs for all jobs See :ref:`progress` and :ref:`resume` for more details. See Also -------- - :ref:`progress` - Progress tracking system - :ref:`resume` - Pipeline resume functionality - :ref:`pipelines` - Pipeline definitions - :ref:`snippets` - Snippet creation - :ref:`profiles` - Profile configuration