Creating Workflows with Snakemake
Authors
So far we have been creating single rules (representing a single step in our analysis) which are the building blocks of a Snakemake workflow. Now let’s practice connecting mulitple rules to each other to create a workflow.
As an example, let’s expand the clean_data rule from the previous section. Once we have the cleaned data, we might want to extract specific columns of it for downstream analysis. This means that we will have a new rule that as input needs the clean version of the data.
rule clean_data:
input: "data/raw/dataset.csv"
output: "data/processed/clean_dataset.csv"
run:
import pandas as pd
df = pd.read_csv(input[0])
df_clean = df.dropna()
df_clean.to_csv(output[0], index=False)
rule extract_columns:
input: "data/processed/clean_dataset.csv"
output: "data/processed/specific_columns.csv"
run:
import pandas as pd
df = pd.read_csv(input[0])
# Assuming we want columns 'A' and 'B'
df_extracted = df[['A', 'B']]
df_extracted.to_csv(output[0], index=False)No need to run every single rule: Since the second rule depends on the first rule (i.e. the input to the second rule is the output of the first rule) if we only run the second rule, Snakemake will automallically detect this dependency and run the first rule as well. So the command to run the whole workflow would be:
snakemake --cores 1 extract_columnsLet’s go through some exercises and practice creating Snakemake workflows.
Section 1: Creating Workflows
Setup: Let’s create a new Snakefile for the following exercises:
- inside the
workflowsfolder create a new folder calledworkflow2 - inside the
workflow2folder create aSnakefile - as you go through the exerices, please implement the rules inside this newly created
Snakefile - also, feel free to re-use the rules we created in the previous session when applicable
Exercise: Create a workflow that contains two rules:
- the first rule combines
array1.npyandarray2.npyand saves as a new file calledcombined_array.npy - the second rule depends on the first rule such that it takes the
combined_array.npyand save the standardized verion of it ascombined_array_standardized.npy.
Please run the workflow and check if it works.
Solution:
rule combine_arrays:
input:
"data/raw/array1.npy",
"data/raw/array2.npy"
output: "data/processed/combined_array.npy"
run:
import numpy as np
array1 = np.load(input[0])
array2 = np.load(input[1])
combined = np.concatenate([array1, array2])
np.save(output[0], combined)
rule standardize_combined:
input: "data/processed/combined_array.npy"
output: "data/processed/combined_array_standardized.npy"
run:
import numpy as np
array = np.load(input[0])
standardized = (array - np.mean(array)) / np.std(array)
np.save(output[0], standardized)Run with:
snakemake --cores 1 standardize_combinedExercise: Create a workflow that contains two rules:
- the first rule combines
array1.npyandarray2.npyand saves as a new file calledcombined_array.npy. Note that we do not need to create this rule again since we already implemented it in the previous exercise. - the second rule depends on the first rule such that it takes the
combined_array.npyand save the normalized verion of it ascombined_array_normalized.npy.
Please run the workflow and check if it works.
Solution:
rule combine_arrays:
input:
"data/raw/array1.npy",
"data/raw/array2.npy"
output: "data/processed/combined_array.npy"
run:
import numpy as np
array1 = np.load(input[0])
array2 = np.load(input[1])
combined = np.concatenate([array1, array2])
np.save(output[0], combined)
rule normalize_combined:
input: "data/processed/combined_array.npy"
output: "data/processed/combined_array_normalized.npy"
run:
import numpy as np
array = np.load(input[0])
normalized = (array - np.min(array)) / (np.max(array) - np.min(array))
np.save(output[0], normalized)Run with:
snakemake --cores 1 normalize_combinedExercise: Create a workflow that contains two rules:
- the first rule takes the
session.csv(in thedata/rawfolder) and saves a new filesession_valid.csv(in thedata/processedfolder) only containing valid trials. - the second rule takes
session_valid.csvand saves a new filesession_valid_correct_response.csv(in thedata/processedfolder) only containing valid trials in which the subjects’ response was correct (i.e. response=1).
Please run the workflow and check if it works.
Solution:
rule extract_valid_trials:
input: "data/raw/session.csv"
output: "data/processed/session_valid.csv"
run:
import pandas as pd
df = pd.read_csv(input[0])
df_valid = df.loc[df['valid'] == 1]
df_valid.to_csv(output[0], index=False)
rule extract_correct_response:
input: "data/processed/session_valid.csv"
output: "data/processed/session_valid_correct_response.csv"
run:
import pandas as pd
df = pd.read_csv(input[0])
df_correct = df.loc[df['response'] == 1]
df_correct.to_csv(output[0], index=False)Run with:
snakemake --cores 1 extract_correct_responseSection 2: Running multiple workflows together
So far we have created multiple workflows that are independent of each other (i.e. parallel workflows). And we can run each one of them by simply calling the name of the last rule in the workflow. But can we run multiple workflows that are independent from each other using just one command?
Yes! to do this we can use rule all. All we need to do is to add a new rule at the very top of our Snakefile called rule all and we list the final outputs of all workflows as input for rule all:
# Define the rule all with all final output files as inputs
rule all:
input:
"data/other_workflow/combined_array_standardized.npy",
"data/processed/session_valid_correct_response.csv"With this example, now if we run the following command, worflows that are responsible to create the listed ouputs files will run:
snakemake --cores 1Exercise: Modify the existing Snakefile such that all the output files are created by just running snakemake --cores 1
Solution:
rule all:
input:
"data/processed/combined_array_standardized.npy",
"data/processed/combined_array_normalized.npy",
"data/processed/session_valid_correct_response.csv"
rule combine_arrays:
input:
"data/raw/array1.npy",
"data/raw/array2.npy"
output: "data/processed/combined_array.npy"
run:
import numpy as np
array1 = np.load(input[0])
array2 = np.load(input[1])
combined = np.concatenate([array1, array2])
np.save(output[0], combined)
rule standardize_combined:
input: "data/processed/combined_array.npy"
output: "data/processed/combined_array_standardized.npy"
run:
import numpy as np
array = np.load(input[0])
standardized = (array - np.mean(array)) / np.std(array)
np.save(output[0], standardized)
rule normalize_combined:
input: "data/processed/combined_array.npy"
output: "data/processed/combined_array_normalized.npy"
run:
import numpy as np
array = np.load(input[0])
normalized = (array - np.min(array)) / (np.max(array) - np.min(array))
np.save(output[0], normalized)
rule extract_valid_trials:
input: "data/raw/session.csv"
output: "data/processed/session_valid.csv"
run:
import pandas as pd
df = pd.read_csv(input[0])
df_valid = df.loc[df['valid'] == 1]
df_valid.to_csv(output[0], index=False)
rule extract_correct_response:
input: "data/processed/session_valid.csv"
output: "data/processed/session_valid_correct_response.csv"
run:
import pandas as pd
df = pd.read_csv(input[0])
df_correct = df.loc[df['response'] == 1]
df_correct.to_csv(output[0], index=False)Run with:
snakemake --cores 1