Advanced Features

Martian supports a number of advanced features to provide:

  • Performance and resource efficiency that scales from single servers to large high-performance compute clusters
  • Early error detection
  • Data storage efficiency
  • Loosely-coupled integration with other systems

Disabling sub-pipelines

Sometimes the choice of which sub-pipelines to run on the data depends on the data. For example, the first part of a pipeline might determine which of several algorithms is likely to yield the best results on the given data set. To support this, Martian 3.0 allows disabling of calls, e.g.

pipeline DUPLICATE_FINDER(
    in  txt  unsorted,
    out txt  duplicates,
)
{
    call CHOOSE_METHOD(
        unsorted = self.unsorted,
    )
    call SORT_1(
        unsorted = self.unsorted,
    ) using (
        disabled = CHOOSE_METHOD.disable1,
    )
    call SORT_2(
        unsorted = self.unsorted,
    ) using (
        disabled = CHOOSE_METHOD.disable2,
    )

    call FIND_DUPLICATES(
        method_1_used = CHOOSE_METHOD.disable2,
        sorted1       = SORT_1.sorted,
        sorted2       = SORT_2.sorted,
    )
    return (
        duplicates = FIND_DUPLICATES.duplicates,
    )
}

Disabled pipelines or stages will not run, and their outputs will be populated with null values. Downstream stages must be prepared to deal with this case.

Parallelization

Subject to resource constraints, Martian parallelizes work by breaking pipeline logic into chunks and parallelizing them in two ways. First, stages can run in parallel if they don’t depend on each other’s outputs. Second, individual stages may split themselves into several chunks.

Stages which split are specified in mro as, for example,

stage SUM_SQUARES(
    in  float[] values,
    out float   sum,
    src comp    "sum_squares",
) split (
    in  float   value,
    out float   value,
)

In this example, the stage takes an array of “values” as inputs. The “split” function (see writing stages) determines how to distribute the input data across chunks, giving a “value” to each, as well as potentially setting thread and memory requirements for each chunk and the join. The after the chunks run, the join phase aggregates the output from all of the chunks into the single output of the stage.

Resource consumption

Martian is designed to run stages in parallel. Either locally or in cluster mode, it tries to ensure sufficient threads and memory are available for each job running in parallel. The default reservation is controlled by the jobmanagers/config.json file (see below). If a job needs more resources than the default, there are two ways to request them.

If the stages splits (see above), the split stage can override the default reservations of the chunk or join phases by setting the __mem_gb or __threads keys in the chunk or join part of the chunk_defs it returns. This is required if for example the split, chunk, or join methods don’t all have the same requirements, or if the split needs to compute the requirements dynamically. Alternatively, setting the resource requirements for the split, or statically declaring the resources for all 3 phases (or just the chunk, if there is no split), can be done in the mro file, e.g.

stage SUM_SQUARES(
    in  float[] values,
    out float   sum,
    src comp    "sum_squares",
) split (
    in  float   value,
    out float   value,
) using (
    mem_gb  = 4,
    threads = 16,
)

As a special signal to the runtime, a stage may request a negative quantity for memory or threads. A negative value serves as a signal to the runtime that the stage requires at least the absolute value of the requested amount, but can use more if available. That is, if a stage requests threads = -4, and mrp was started with --localcores=2 it will fail, but if it were started with --localcores=8 it would be treated as if it had asked for 8 threads. The job can check the metadata in the _jobinfo file to find out how many it actually got.

Job Management

Broadly speaking, Martian has two ways to run stage jobs: Local Mode and Cluster Mode.

Local Mode

In local mode, stage jobs run as a child process of mrp on the same machine. Even in cluster mode, stages may be executed in local mode if the mro invokes them with call local. Several options to mrp control job scheduling behavior for local jobs

Option Effect
--localcores=NUM Specifies the number of threads worth of work mrp should schedule simultaneously. The default is the number of logical cores on the machine.
--localmem=NUM Specifies the amount of memory, in gigabytes, which mrp will allow to be reserved by jobs running in parallel. The default is 90% of the system’s total memory.
--limit-loadavg Instructs mrp to monitor the system loadavg, and avoid starting new jobs if the difference between the number of logical cores on the system and the current one-minute load average is less than the number of threads requested by a job. This may be useful for throttling the start of new jobs on shared systems which are heavily loaded, especially shared systems which do not enforce any kind of quota. However it should be noted that CPU load tends to fluctuate, so in many cases this only delays the job start until the next time the load drops temporarily.

Cluster Mode

Larger research groups often have infrastructure for shared, distributed workloads, such as SGE, slurm, or LSF. Martian supports distributing stage chunks on such platforms through a flexible, extensible interface.

If mrp is started with --jobmode=MODE and MODE is not “local”, it looks in its jobmanagers/config.json file for a key in the jobmodes element corresponding to MODE. In that object, the following values are used to configure the job:

Key Effect
cmd The command (executable) used to submit batch work to the cluster manager.
args Additional arguments to pass to the executable. Ideally the batch submit executable has a mode to return just the “job ID” on standard output, without any other formatting. If an argument is required to enable this mode, it should be added there, as well as any other fixed arguments which are required.
queue_query A script, which mrp will look for in its jobmanagers directory, which accepts a newline-separated list of job IDs on standard input and returns on standard output the newline-separated list of job IDs which are known to the job manager to be still queued or running. This is used for Martian to detect if a job failed without having a chance to write its metadata files, for example if the job template incorrectly specified the environment.
queue_query_grace_secs If queue_query was specified, this determines the minimum time mrp will wait, after the queue_query command determines that a job is no longer running, before it will declare the job dead. In many cases, due to the way filesystems cache metadata, the completion notification files which jobs produce may not be visible from where mrp is running at the same time that the job manager reports the job being complete, especially when the filesystem is under heavy load. The grace period prevents succeeded jobs from being declared failed.
env Specifies environment variables which are required to be set in this job mode.

mrp will execute the specified cmd with the specified args and pipe a job script to its standard input (see Templates below for how the job script is generated). If the command’s standard output consists of a string with no newlines or whitespace, it is interpreted as a job ID and recorded with the job, to potentially be used later with the queue_query script.

Templates

In addition to the information in the config.json file, mrp looks for a file in the jobmanagers directory named MODE.template for the specified MODE. mrp does string substitutions on the content of the file to produce the job script. The string substitutions are of the form __MRO_VALUE__, where VALUE is one of

Key Value
JOB_NAME The fully qualified stage/chunk name.
THREADS The number of threads requested for the job.
STDOUT/STDERR The absolute paths to the expected destination location for the standard output/error files.
JOB_WORKDIR The working directory in which the stage code is expected to.
CMD The actual command line to execute.
MEM_GB The amount of memory, in GB, which the job is expected to use. Additionally, MEM_MB,MEM_KB, and MEM_B provide the value in other units if required.
MEM_GB_PER_THREAD Equal to MEM_GB divided by THREADS. Similarly for MB, KB, and B.

Cluster mode command line options

There are several options for throttling how mrp uses the cluster job manager.

Option Effect Default
--maxjobs Limit the number of jobs queued or pending on the cluster simultaneously. 0 us treated as unlimited.
--jobinterval Limit the rate at which jobs are submitted to the cluster.
--mempercore For clusters which do not manage memory reservations, specifies the amount of memory mrp should expect to be available for each core. If this number is less than the cores to memory ratio of a job, extra threads will be reserved in order to ensure that the job gets enough memory. A very high value will effectively be ignored. A low value will result in many wasted CPUs. none

The “special” resource

In addition to threads and memory, MRO and stage split definitions may include a request for the special resource. This is only used in cluster mode, and is intended for cases where the cluster manager requires special additional flags for some stages, for example if there is a separate queue for jobs which require very large amounts of memory.

The resopt parameter in the jobmanagers/config.json file configures the way such resources are incorporated into the job template for your cluster. For SGE, for example, the parameter is #$ -l __RESOURCES__

The MRO_JOBRESOURCES environment variable may contain a semicolon-separated list of key:value pairs. If the special resource requested for the job corresponds to one of those keys, then __MRO_RESOURCES__ in the job template is replaced with the resopt value from jobmanagers/config.json, with the value corresponding to the given key substituted for __RESOURCES__.

For example, if the resopt config parameter is #$ -l __RESOURCES__, MRO_JOBRESOURCES=highmem;mem640=TRUE;lowmem:mem64=TRUE, and the job requests the special resource "highmem", then __MRO_RESOURCES__ in the job template gets replaced with #$ -l mem640=TRUE.

Additional job management settings

There are a few additional options available in the jobmanagers/config.json file which apply in both cluster and local modes, under the settings key.

Several popular third-party libraries use environment variables to control the number of threads they parallize jobs over, for example OMP_NUM_THREADS for OpenMP. The thread_envs key specifies a list of environment variables which should be set to be equal to the job thread reservation. These are applied in cluster mode, and in local mode if the number of threads is constrained. In local mode without a constraint on mrp’s total thread count, it is not used, as it’s expected that the user is not sharing the machine with other users, so such a constraint on internal parallelism is just potentially idle CPU cycles.

Additionally, for jobs which do not specify their thread or memory requirements, the threads_per_job and memGB_per_job keys specify default values.

Debugging options

The --debug option to mrp causes it to log additional information which may be helpful for debugging.

For debugging stage code, the --stackvars flag sets an option in the jobinfo file given to stage code. For the python adapter, this flag causes it to dump all local variables from every stack frame on failure.

Resource Overrides

By specifying an appropriate json file on the mrp command line with the --override=<FILE> flag, one can override the resource reservation, whether it was left at the default or specified in the mro source or split. The format of this json file is

{
  "TOP_LEVEL_PIPELINE.SUBPIPELINE_1.INNER_SUBPIPELINE_1.STAGE_NAME": {
    "split.mem_gb": 2,
    "chunk.mem_gb": 24,
    "join.mem_gb": 2,
    "chunk.threads": 2
  },
  "TOP_LEVEL_PIPELINE.SUBPIPELINE_2.INNER_SUBPIPELINE_2.OTHER_STAGE": {
    "split.mem_gb": 2,
    "chunk.mem_gb": 24,
    "join.mem_gb": 2,
    "chunk.threads": 2
  }
}

In addition to threads and memory, overrides can be used to turn volatility (see Storage Management) on or off by setting "force_volatile": true or false.

Preflight Checks

Preflight checks are used to “sanity check” the environment and top-level pipeline inputs for a pipeline, for example ensuring that all required software dependencies are available in the user’s PATH environment, or that specified input files are present. A stage can be specified as preflight in the call by specifying

call PREFLIGHT_STAGE(
    arg1 = self.input1,
) using (
    preflight = true,
)

Preflight stages cannot have outputs and cannot have their inputs bound to outputs of other stages. Even when embedded in a sub-pipeline, they will always run before any other stages.

Preflight stages may also specify local = true in the call properties to require that the stage runs as a child process of mrp even in cluster mode. Use this option with care, as mrp may be running on a submit host with very limited resources.

Volatile Data Removal

See Storage Management.

Parameter Sweeping

Often when testing changes to a pipeline, one wants to try the pipeline with several different possible values for one or more of the pipeline inputs. Parameter sweeping is intended for this use case. A call such as

call PIPELINE_NAME(
    arg1 = "foo",
    arg2 = sweep(
        "bar",
        "baz",
    ),
)

will run the pipeline twice, once with arg2 = "bar" and again with arg2 = "baz". The runtime is clever enough to avoid rerunning stages which do not depend on arg2 either directly or by depending on a stage which does. In some cases this can save substantial computing resources.

This feature is intended for testing purposes only and should not be used in production pipelines. It can be confusing to figure out which version of the final pipeline outputs corresponded to which fork, and the runtime may behave poorly if multiple parameters are swept over in the same pipestance.

Performance Analysis

The _jobinfo file in each stage’s split/chunk/join directories includes several performance metrics, including cpu, memory, and I/O usage. On successful pipestance completion these statistics are aggregated into the top-level _perf file.

MRP can be started with the --profile mode to enable various profiling tools for stage code. The cpu and mem modes depend on the language-specific adaptor. For Python, the cpu mode uses cProfile and mem uses an allocator hook. For Go stages, cpu and mem use Go’s native runtime/pprof functionality.

The perf profile mode uses Linux perf record. By default, this will record the task-clock and bpf-output events at 200Hz for the first 2400 seconds of each job (to prevent excessive disk space usage). One can override those values with the MRO_PERF_EVENTS, MRO_PERF_FREQ, and MRO_PERF_DURATION environment variables, respectively.

Completion Hooks

  • path to pipestance
  • {complete|failed}
  • pipestance ID
  • path to error file (if there was an error)

mrp Options

The rest are described here:

Option Description
--zip After pipestance Zip metadata files after pipestance completes.
--tags=TAGS Tag pipestance with comma-separated key:value pairs.
--autoretry=NUM Automatically retry failed runs up to NUM times.
All others See Advanced Features