Data dependent conditional execution - RomainFeron/workshop-snakemake-sibdays2020 GitHub Wiki

In some cases, one or multiple steps of your workflow will have an output that cannot be predicted before executing the workflow, e.g. when splitting a file, clustering samples, or processing files in batches. Moreover, you may want to execute a step differently based on the output of a previous step; for instance, you may want to adjust a software's parameters based on the size of its input file, which is generated by another software.

To handle these situations, Snakemake implements data-dependent conditional execution of rules. In practice, rules that produce variable or unpredictable outputs can be defined as "checkpoints" with the keyword checkpoint instead of rule. Then, if a rule requires the output of this checkpoint, a specific syntax is used in an input function to collect the checkpoint's output. At this stage, Snakemake recomputes the entire Directed Acyclic Graph (DAG) to update inputs and outputs of rules based on the checkpoint's output.

To clarify this concept, the following example shows a checkpoint variable_output_rule that creates an unknown number of output files. The output of this checkpoint is required by the rule aggregate, which input is given by the input function collect_input:

checkpoint variable_output_rule:  # Checkpoint creating an unknown number of output files
    input:
        'data/{sample}.txt'
    output:
        directory('results/{sample}')
    shell:
        # Split input in files of length 1000 lines starting 
        # with the prefix {output}/
        'split {input} {output}/'

def collect_input(wildcards):
    '''
    Input function for the rule "aggregate". 
    '''
    # Retrieve output for the checkpoint "variable_output_rule" based on wilcard values.
    # Here, the value of "checkpoint" will be the path to the directory containing split files.
    checkpoint = checkpoints.variable_output_rule.get(**wildcards).output[0]
    # Use the expand syntax to generate a list of all split files for this specific sample.
    # The "magic" happens with the new wildcard {i} which values are automatically inferred from the
    # checkpoint's output.
    full_output = expand('results/{sample}/{i}.txt', sample=wildcards.sample,
                         i=glob_wildcards(os.path.join(checkpoint, '{i}.txt')).i)
    return full_output

rule aggregate:  # Downstream rule requiring the output of the checkpoint
    input:
        collect_input
    output:
        'results/{sample}.txt'
    shell:
        'cat {input} > {output}'

In summary, checkpoints allow to access the output of a rule in an input function before executing the next rule. Because the DAG is recomputed at this stage, you will not be able to see the whole DAG for your workflow before the last checkpoint completed successfully. This mean you will not be able to check your workflow with --dry-run or to visualize the DAG with --dag. For more information about checkpoints, check the relevant section in the official documentation.