Storage Backends#
A storage backend is the single abstraction that controls how data moves in and
out of a snippet execution. Every time a Command runs, a backend is resolved
once and handles all I/O for that execution: making inputs available at the expected
paths, intercepting output writes, persisting results, and tearing everything down
afterwards.
The abstraction exists so that the rest of the framework — snippet code, pipeline execution, profile building — never needs to know whether files live on a local disk, an HPC overlay scratch directory, or a cloud block-volume snapshot.
Overview#
The backend is selected via the PYPE_OVERLAY_MODE environment variable (or
pype_overlay_mode in the config file). Three values are built in; any other
value is resolved as a queue-module name:
|
Backend |
|---|---|
|
|
|
|
any other string (e.g. |
A |
_resolve_backend(log) in pype/utils/overlay.py is the single dispatch point.
No call site inspects PYPE_OVERLAY_MODE directly.
Interface#
All backends implement the following methods. The Direct and Overlay
implementations live in pype/utils/overlay.py; cloud implementations live in
their queue module (e.g. pype/pype_modules/queues/scaleway.py).
Execution-time methods#
These are called once per snippet run, in order, by Command.run() and
Command.close().
def prepare_inputs(self, inputs: List[str], overlay_volumes: List[OverlayVolume]) -> None:
"""Make declared inputs available at their expected paths.
For cloud backends: restore input snapshots as read-only overlayfs lower dirs;
add input OverlayVolume entries so the path-substitution machinery rewrites
script and command paths to the merged view.
For Direct/Overlay: no-op (inputs are already on disk).
"""
def restore_program(self, profile_name: str, program_key: str) -> None:
"""Make the snippet's program (SIF image or conda env) available.
For cloud backends: restore the program snapshot as an additional overlayfs
lower dir so the SIF or conda prefix appears at its expected path inside
/mnt/merged/.
For Direct/Overlay: no-op (program is already at its profile-defined path).
"""
def prepare_outputs(
self,
outputs: List[str],
output_harvest: Dict[str, bool],
overlay_volumes: List[OverlayVolume],
) -> None:
"""Intercept writes to declared output paths.
For Overlay: create per-output-dir scratch subdirs under PYPE_OVERLAY_SCRATCH;
populate overlay_volumes so the path-substitution machinery redirects writes.
For cloud backends: provision a scratch block volume; mount the full overlayfs
(lower dirs from prepare_inputs + restore_program, upper on the scratch volume);
populate overlay_volumes with /mnt/merged/-prefixed paths.
For Direct: no-op.
"""
def harvest(self, upper: Path, original_dir: Path) -> None:
"""Persist a completed output directory.
Called by Command.close() for each OverlayVolume with harvest=True when the
snippet exits with return code 0.
For Overlay: rsync upper/ back to original_dir/.
For cloud backends: copy upper layer to a new block volume, snapshot it,
write the snapshot ID to snapshot_registry.json in the Command log directory.
For Direct: no-op.
"""
def cleanup(self) -> None:
"""Tear down all mounts and release all transient storage.
Always called from Command.close() in a finally block, regardless of exit code.
For Overlay: rmtree the scratch base directory.
For cloud backends: umount all overlayfs and bind mounts in reverse order;
detach and delete all transient block volumes.
For Direct: no-op.
"""
def check_outputs(self, paths: List[str]) -> Dict[str, Optional[int]]:
"""Return {path: size_in_bytes} for present outputs, {path: None} for missing.
Used by the pipeline skip logic (LAYER 2a and 2b in exec_snippet_unit) and
by the prune logic in Pipeline._prune_upstream_jobs to determine whether a
step's outputs already exist and can be skipped.
For Direct/Overlay: stat each path directly.
For cloud backends: group paths by snapshot_id, restore each snapshot once,
stat the corresponding file inside the mounted volume, then detach and delete.
"""
Build-time methods#
These are called during pype profiles build to set up and tear down the
environment for profile construction. They share the same cleanup path as the
execution-time methods.
def prepare_build(self, paths: List[str], program_keys: Optional[List[str]] = None) -> None:
"""Overlay already-staged program snapshots over the pull paths.
Called once at the start of build_profiles. For each program key found in
the snapshot registry, restores the corresponding snapshot as a read-only
overlayfs lower dir stacked over the original path (e.g. the singularity
cache directory). pull_profile_images then sees the SIF as already present
and skips re-pulling it. A single shared scratch block volume is provisioned
as the upper layer so new pulls land there rather than on the root volume.
For Direct/Overlay: no-op.
"""
def stage_program(self, profile_name: str, program_key: str, artifact_path: Path) -> None:
"""Persist a freshly pulled program artifact into the backend's storage.
Called by pull_profile_images after each successful singularity pull or
conda environment create. The backend snapshots the artifact at artifact_path
and records the snapshot ID under the key "{profile_name}/programs/{program_key}"
in the snapshot registry.
For Direct/Overlay: no-op.
"""
Data flow during snippet execution#
The sequence below shows how the backend integrates with Command for a snippet
that reads /data/sample.bam and writes /results/output.vcf.
Command.__init__()
backend = _resolve_backend(log) # once per Command
Command.run()
backend.prepare_inputs(inputs, ovvols) # make /data/sample.bam visible
backend.restore_program(profile, key) # make the SIF/conda env visible
backend.prepare_outputs(outputs, ...) # intercept writes to /results/
# path-substitution rewrites script and cmd args to overlay paths
# snippet executes normally
Command.close()
try:
for ov in overlay_volumes where ov.harvest:
backend.harvest(ov.scratch_dir, ov.original_dir)
finally:
backend.cleanup()
OverlayVolume#
OverlayVolume (defined in pype/utils/overlay.py) is the data structure that
connects the backend to the path-substitution machinery in Command.
@dataclass
class OverlayVolume:
path: str # original declared output path
original_dir: str # parent dir of path
scratch_dir: str # scratch dir that maps to original_dir (harvest source)
bind_file: str # scratch equivalent of path (substitution target)
harvest: bool # whether to harvest this output after execution
prepare_inputs adds entries with harvest=False (inputs are read-only).
prepare_outputs adds entries with harvest=True (outputs are persisted on
success). The same substitution loop in Command handles both directions
unchanged regardless of backend.
Snapshot registry (cloud backends)#
Cloud backends maintain a local JSON file that maps file paths and program keys to snapshot IDs. The registry has two key spaces:
Data entries — keyed by absolute file path:
"/data/sample.bam": {"snapshot_id": "snap-aaa", "in_snapshot_path": "/data/sample.bam"}
Program entries — keyed by "{profile_name}/programs/{program_key}":
"hg38/programs/samtools": {"snapshot_id": "snap-bbb", "in_snapshot_path": "/samtools_1.21.sif"}
The registry path is set via PYPE_SNAPSHOT_REGISTRY (default
/var/lib/pype/snapshot_registry.json). The coordinator distributes it to
each worker instance via cloud-init before the snippet runs. After execution,
harvest() appends the new result snapshot to the registry, which the
coordinator rsyncs back and merges into the working registry for subsequent steps.
Implementing a new backend#
A new backend (e.g. for AWS or GCP) is a class in a queue module that implements
all eight methods above. Register it by placing the class at module level under
the name StorageBackend:
# pype/pype_modules/queues/my_cloud.py
class StorageBackend:
def __init__(self, log) -> None:
...
def prepare_inputs(self, inputs, overlay_volumes): ...
def restore_program(self, profile_name, program_key): ...
def prepare_outputs(self, outputs, output_harvest, overlay_volumes): ...
def harvest(self, upper, original_dir): ...
def cleanup(self): ...
def check_outputs(self, paths): ...
def prepare_build(self, paths, program_keys=None): ...
def stage_program(self, profile_name, program_key, artifact_path): ...
Activate it with:
export PYPE_OVERLAY_MODE=my_cloud
export PYPE_QUEUES=/path/to/queue/modules
The framework resolves StorageBackend from the module automatically. No
changes to Command, Pipeline, or profile code are required.
Note
Direct and Overlay backends are built into pype/utils/overlay.py
and need no queue module. External backends always live in a queue module
because they are provider-specific and often share credentials and API clients
with the queue implementation.