Introduction to Nextflow

Anuved Verma and Anja Bog, Software Engineers at 23andMe

23andMe Engineering
23andMe Engineering
17 min readOct 27, 2021

--

Are you planning to run “some kind of workflow” and don’t want to worry about infrastructure, monitoring, or resumability? Do you want to orchestrate different kinds of tasks, like Python scripts, command-line tools, or applications written in other programming languages? Yes? Then stay with us and have a look at Nextflow!

Nextflow is an open-source software developed by Seqera Labs. We’ve found it especially useful for data-intensive computational pipelines where you want to string together command-line tools and code artifacts. We’ll demonstrate how to accomplish this with some basic concepts and then look at one of the pipelines we are running at 23andMe. Let’s get started!

Nextflow Concepts

By the end of this read, you will be familiar with Nextflow scripting and some of the features we use for our imputation pipelines. These principles can be adapted for your own use cases.

The Nextflow scripting language is an extension of Groovy and is quite extensive, so we will focus on a few key concepts to help get started. Many of the definitions and examples here are borrowed from the Nextflow docs. These are a great resource with plenty of examples and can expand on the starter code we’ll be providing.

Dataflow Programming Model

The Nextflow language is based on the Dataflow Programming Model. In this context, that’s a fancy way of saying “you can visualize it as the flow of data through a directed acyclic graph (DAG).” The arrows of the DAG are asynchronous FIFO queues from which data can be consumed. These are called Channels in Nextflow. The nodes of the DAG consume from these Channels, perform some tasks, and produce data outputs. These nodes are called Processes in Nextflow.

Channels

At the risk of oversimplifying, you can think of Nextflow Channels as queues that hold data for a Process to read from. Nextflow offers two types of channels called Queue Channels and Value Channels. We will demonstrate the difference between the two below.

To explicitly create a Channel, you would use one of the Nextflow-provided Channel Factory methods. Go here for a full list of Channel factory methods.

Queue Channels

Queue Channels are asynchronous unidirectional FIFO queues. So let’s break that down:

  • Asynchronous: sending data out of the queue does not depend on the receiving Process completing successfully
  • Unidirectional: If Channel X connects Process A to Process B, then Process A can send data to Process B via Channel X, but Process B cannot send data back to Process A via the same Channel X
  • FIFO: first-in-first-out — data that is first added to the Channel queue will be the first to be received by the consuming Process

Here are a few examples of Channel Factory methods used to create Queue Channels:

of
The of factory method allows you to create a queue channel that emits a sequence of values defined in the method parameters. For example:

ch = Channel.of(1, 3, 5, 7)

Check out the Processes — Generic Inputs section below to see how this channel might get consumed.

fromPath
The fromPath factory method creates Channels that emit one or more file paths. Here’s a basic example for a single file path:

file_ch = Channel.fromPath(‘data/example.txt’)

fromPath also interprets glob patterns, so you can easily emit multiple files from the same channel. For example:

file_ch = Channel.fromPath(‘data/*.txt’)

Value Channels

Nextflow’s Value Channels are similar to Queue Channels in that they are also asynchronous, unidirectional, and are created using Channel Factory methods. However, Value Channels are bound to a single value and can be read unlimited times without consuming its content.

value
To create a value channel, Nextflow offers a straightforward channel factory method called value, which can be used like this:

pi = Channel.value(‘3.1415’)

Processes

The docs say it best: “In Nextflow a process is the basic processing primitive to execute a user script.” In other words, a Nextflow process is basically an isolated user-defined script, wrapped with additional parameters (e.g., directives, inputs, outputs–explained in the following) that define how the script should be executed in the Nextflow pipeline.

The full syntax of a Nextflow process is outlined here. An example process might look something like the following:

# allows you to define processes to be used for modular libraries
nextflow.enable.dsl = 2
workflow {
ids = Channel.fromPath('data/ids.txt')
chunksize = Channel.value(1000)
split_ids(ids, chunksize)
}
process split_ids { input:
path(ids)
val(chunksize)

output:
file('batch-*')

shell:
"""
split -l !{chunksize} !{ids} batch-
"""
}

Here, we have a file ids.txt containing a large list of IDs, which are to be split into chunks of size 1000. The process will take the ids.txt file as input, and output smaller files (batch-aa, batch-ab, etc.), each containing a separate chunk of 1000 IDs from the original ids.txt file.

Inputs

If you remember from the “Dataflow Programming Model” section, Nextflow Processes consume data through Channels. The Input block of a Process defines which Channels the Process should receive data from.

The Inputs to a Nextflow Process directly influence how many tasks the Process spawns. The process consumes the next element from each Input Channel, spawning a new task execution. It repeats the same logic until one or more input channels have no more content to consume. Nextflow creates an execution directory for each task. The relevant files, logs, standard outputs, etc. for that task are found in each execution directory. This directory is identified by a unique hash that Nextflow generates for each task (see Tracing & Visualization section of this post).

There are a variety of ways that a Process can define its Inputs to consume Channels. We’ll focus on ways to consume the example Channels we created above.

Generic Inputs

You can then consume this channel in a process like so:

ch = Channel.of(1, 3, 5, 7)
process basicExample {
input:
val x from ch
shell:
"echo $x"
}

This process will generate four tasks, each consuming a different element from ch and “echoing” that element to standard output.

Consuming from a value channel is no different, except that there is no limit to the number of times a process can consume from it:

value_ch = Channel.value(3.14)
queue_ch = Channel.of( 1, 3, 5, 7 )
process basicExample {
input:
val x from value_ch
val y from queue_ch
shell:
"echo $x $y"
}

This process will generate four tasks, each consuming one element from queue_ch and the same element repeatedly from value_ch.

Files

There are several ways to pass file inputs into a process. Nextflow will create a symlink to the original file in the relevant task execution directory when a process is handling an input file through the methods described below. The file can then be accessed by the script using the name specified in the input declaration.

Here is an example of how one might process multiple files in a directory in parallel:

proteins = Channel.fromPath('data/*.txt')
process catThemAll {
input:
file query_file from proteins
shell:
"cat ${query_file}"
}

Each file ending with .txt in the data/ directory will be processed by a separate task generated by the catThemAll process. Each task can be identified by a unique task name (workflow:catThemAll (2)) or task_id hash (eg. 7a/7b3084). Nextflow will stage a symlink to the specific file that the task will process in the tasks’ execution directory, eg. /work/tasks/7a/7b3084/.file-2.txt.

Another way to process files is using the path qualifier. Both file and path qualifiers are similar, except that the former expects file objects, whereas the latter can also interpret strings as the path of the input file. Note that when using raw strings, the path qualifier does not interpret special characters (eg. wildcards), so this syntax works best if you know the absolute string path of your file. Here is the path qualifier in action:

process catThemAll {
input:
path x1 from file('data/example-*.txt')
path x2 from 'file:///absolute/path/to/working-dir/data/ids.txt'
shell:
"""
cat ${x1}
cat ${x2}
"""
}

Assume you had two files under your data/ directory: example-1.txt and example-2.txt. Will the above process spawn one task or two? In this case, the path-string qualifier for x2 behaves more like a Value Channel, allowing the process to consume it infinitely many times. As a result the process will spawn tasks until x1 runs out of files, resulting in two tasks.

Outputs

Similar to Process Inputs, the Output block of a Process defines to which Channels the Process should send out the results. For example:

customer_ids = Channel.from(1, 2, 3, 4)
process get_data_for_ids {
input:
val id from customer_ids
output:
file data_for_id_*' into data_for_ids
shell:
'''
echo !{id} > data_for_id_!{id}.txt
'''
}
data_for_ids.view()
>> /path/to/dir/work/36/1ecd790e4eeb3a786c2e5e288b/data_for_id_3.txt
>> /path/to/dir/work/aa/19b1ac052387cd05ab04021f5f/data_for_id_2.txt
>> /path/to/dir/work/6e/a5ce292656c2c8802108293c97/data_for_id_4.txt
>> /path/to/dir/work/89/0efb6979d54a412274f0ad685d/data_for_id_1.txt

In the above example, the get_data_for_ids process sends the files generated by the shell command into the data_for_ids channel, which downstream processes can then consume. As with Inputs, anything from values to files to stdout can be output to the channel. For more information on how to leverage the Output block, see the Nextflow documentation.

Directives

Finally, processes allow you to define Directives, which are optional settings that will affect the execution of the current process. These can range from specifying what hardware the process should use, which Docker container to run the process in (if any), retry strategy, how much memory / CPU to allocate for the process, what scripts to run before/after the process, etc.

Following is an example of one way you can use Directives to scale up resource allocation for a task based on the number of retries:

proteins = Channel.fromPath('data/*.txt')
process catThemAll {
errorStrategy 'retry'
maxRetries 4
cpus { 3 + task.attempt }
memory { 5.GB * task.cpus }
input:
file query_file from proteins
shell:
"cat ${query_file}"
}

There are numerous directives available, outlined here in the documentation.

Operators

So far we’ve talked about Channels and Processes, and how Processes can consume from a Channel (via Input declarations) or output to a Channel (via Output declarations). But what if you wanted to transform the contents of a Channel before it gets consumed by a Process? One way to achieve this is by creating another Process that handles the transformation before outputting it into a new channel; however, this can lead to repeated Processes just built for these common transformations. Thankfully, Nextflow offers an easier way to directly create a channel from another channel with the desired transformations occurring in between. The way to achieve this is by using Operators.

Nextflow operators are methods that allow you to connect channels or to transform values emitted by a channel applying some user-provided rules. Some common reasons we’ve used Operators at 23andMe are for filtering, transforming, splitting, and combining. Building on our example above, here’s how we might leverage the map operator to transform the contents of our channel:

squares = Channel.from(1, 2, 3, 4, 5).map { it * it }
squares.view()
>> 1
>> 4
>> 9
>> 16
>> 25

Another example of how we might flatten a Channel with lists:

flat_list = Channel.from([1, [2, 3]], 4, [5, [6]]).flatten()
flat_list.view()
>> 1
>> 2
>> 3
>> 4
>> 5
>> 6

There are numerous other Operators available for all sorts of Channel transformations, and it would be worthwhile to explore the docs to get a sense of all the options Nextflow offers.

What we use Nextflow for at 23andMe

At 23andMe, we use Nextflow to run one of our largest data creation pipelines for product research: Genotype Imputation. Genotype imputation statistically infers the markers of the genome we haven’t observed via our microarrays. (A marker is a DNA sequence with a known physical location on a chromosome.)

Why is this important? Our raw genome data is microarray-based, meaning it covers only a portion of the whole genome. Additionally, our products evolve over time, as do our microarrays. As a result we have customer data originating from different microarrays that overlap to some degree. For research into new features for our customers we run genome-wide association studies (GWAS). GWASs are most powerful when run on the whole genome and when able to access the same set of markers across all individuals contained. Genotype imputation helps us create a data set with these properties.

Our genotype imputation workflow consists of several sequential steps on a high level:

  1. Reshape the input data for the set of individuals to be processed from a 23andMe internal highly compressed format to the Variant Call Format (VCF), the “de-facto” standard in bioinformatics. We are doing this conversion to be able to run standard bioinformatics tools on the data set.
  2. Phase the raw input data using Eagle. Phasing is a prerequisite for imputation. Our genomes are the composition of our mom’s and dad’s genomes, and phasing is the statistical estimation of the groups of alleles inherited from a single parent.
  3. Impute using Beagle.
  4. Update metadata database. This is an important step to allow downstream applications to discover and use the data.

The following graph is a simplified version of our Nextflow workflow showing the major steps, how many tasks we run per step, and some of the transformations happening in between. The number of tasks run is especially interesting.

As an example, we start the process with a list of 20,000 IDs. Then controlled by one of our internal parameters, that large list gets split up into subsets of 100 IDs each, and for each of those smaller lists we run the “Reshape input data” step. Therefore, the “Reshape input data” step is run 200 times, once for each split input list. As an output of this step, we get 200 VCF files with the data of 100 individuals, each containing all genotype calls for those individuals.

We’ve profiled the individual steps and realized that different steps are cost-effective at different data dimensions. For example, the phasing sweet spot lies in running on a large number of individuals, but a smaller subset of markers. In our case, each phasing step takes in 20,000 individuals, but splits up the chromosomes into smaller region slices to process — 293 regions in total — and that’s our number of tasks for that step.

In contrast, our imputation step takes in only 1000 individuals at a time, but runs on whole chromosomes. Since we impute against 3 different reference panels that cover different characteristics and run a Beagle task for each chromosome (1, … , 22, X.nonpar, X.par1, X.par2¹) we see 25[chromosomes]*20[batches]*3[panels] = 1500 tasks. Nextflow manages monitoring all of the tasks for us. It moves the flow forward as soon as all inputs for a specific task are available.

All the statistics and runs shown in this post are based on 1000 Genomes (1KG) Project data. For runs that required larger inputs than numbers of individuals provided by the 1KG project, we’ve multiplied the individual data for scale. As a result there are slight statistical differences compared to 23andMe customer data production runs, but none that are relevant for the purpose of this post.

Running on AWS Batch

The Nextflow architecture allows for a complete separation of the definition of the pipeline and where it is run. The same pipeline script can be run locally or in the cloud by simply changing the `Executor` and some additional parameters in the configuration handed to the Nextflow process at kick-off time. Aside from local execution, Nextflow supports AWS Batch, Kubernetes and quite a few more.

A prerequisite is that the infrastructure supporting the chosen Executor is set up. For using AWS Batch as the executor, the prerequisite is that a Job Queue and Compute Environment are set up and a container is defined in which all dependencies and scripts are installed to run the workflow.

Here is what changes in our configuration file between a local execution running in docker (top) and one run on AWS Batch (bottom):

# local execution running in docker:
docker.enabled = true
process.container = "<your_image>"
# execution on AWS Batch:
docker.enabled = true
process {
executor = "awsbatch"
queue = "<job_queue_arn>"
container = "<job_definition_name>"
}
aws.region = "<your_region>"

You can see that the most important difference is the executor specification. If not provided, Nextflow assumes a local execution. The container is encapsulated in the AWS Job definition. The AWS Job Queue specifies the compute environment the tasks will be executed on. No changes to the actual workflow are required at all! If you specify a container instead of a job definition name, Nextflow will automatically create the job definition that encapsulates your container for you. We use our own job definition to be able to control a few more parameters.

A benefit that Nextflow gives us for free is staging or even streaming of inputs from S3 to the executors of the respective process steps. All we need to do is define the input S3 location as a Channel and the Nextflow framework makes sure it is going to be available for the task.

We use S3 to preserve and restore our sessions. We trap the EXIT signal of the script from which we run Nextflow and sync the local .nextflow directory to S3 as our last action. This directory is the place where logs, caches, and status are stored for the run. Before calling Nextflow in a subsequent run, that same script restores the contents it finds at the S3 .nextflow backup location to the local environment. This gives us the ability to have Nextflow automatically resume a pipeline from the last successful task if it has failed or been interrupted in a previous run. Note that the cache of a run can become quite large as it stores all intermediary artifacts. Frequent cleanups are a necessity, for example, using lifecycle policies expiring objects after a certain time if running and storing data on AWS S3. Another important aspect to think about is the directory structure of the cache: A job should be able to find its own previous cache to resume from without having to read through a lot of data from other jobs.

Tracing and Visualization

Nextflow offers valuable tracing and visualizations to help monitor workflows and identify and debug performance bottlenecks. The following examples are taken from a small (manually interrupted) test run of our imputation pipeline on 1KG data with only a few chromosomes.

During execution we’ve included the options — with-trace, — with-timeline and — with-report to log additional processing data that can help to tune and debug a pipeline.

Trace

You can specify the format of your trace as part of your Nextflow configuration file to include desired metrics. The following is the configuration for our example trace output and it only includes a subset of the possible fields that can be tracked.

trace {
Fields = '''
task_id,
hash,
name,
status,
duration,
%cpu,
peak_vmem,workdir
'''
}

The output contains a line for each task that has been kicked off with the desired information:

Timeline

The timeline gives you a visual understanding of how your tasks are run and the resources (time and data transfers) they require. See below for a 30,000 foot view from above of one of our smaller imputation runs on just three chromosomes. You’ll see a small gap in the upper part where we’ve skipped over a few initial steps to fit into a decent sized picture.

Report

The report gives you an even more detailed view of all the tasks run, statistics, and resource utilization and helps with fine-tuning. The following report is from one of our full-scale test runs. You see that a Nextflow head job runs and monitors 5292 tasks in total. You also see that some tasks have failed, but nextflow has resubmitted them so the entire workflow was able to succeed. These failures in our case were all results of AWS spot compute terminations, so they were fully “retryable.” The report ends with a table that shows the full trace measures for each task (not shown here).

The most interesting section of the report is the ‘Resource Usage’ section. It can help tune the resources given each task job. Resource usage is visualized for CPU, memory, job duration and I/O. The following figure shows the CPU usage visualization. Especially the % Allocated tab helps understand how close the resource specifications in the workflow are compared to the actual usage. We can see that in our test run, many processes were actually pretty close to 100% allocation, but there were a few that we could tune. For example, we should look into cutting down the allocated CPUs for the merge_raw_data task and increasing them for the publish task.

Getting the resource specifications right is immensely important when running a lot of different tasks on the same compute environment, for example, AWS Batch where large instance types will execute multiple tasks. The closer the task specs are the better the utilization of the assigned hardware.

Unit Testing

Currently, Nextflow does not provide any built-in way to unit test your pipeline (there is an open ticket for this; maybe you can be the one to add it!). To get around this, we’ve created our own Python library that works as a wrapper around Nextflow, specifically to be used in unit tests for verifying that the outputs of each pipeline step matches our expectations.

To see how we use this wrapper library to unit test our pipeline, look here. At a high level, what this NextflowUtils Python wrapper does is:

  • Executes a given Nextflow pipeline
  • Parses the trace report (`trace.txt` file discussed above) that gets generated by the pipeline run
  • this file contains details of the directories where the outputs of each intermediate process is stored (see docs for more details)
  • Offers an interface for easy access to the files in these intermediate directories, which can be read and compared with your expected outputs… aka unit tests!

What it looked like in production

Here are some stats from our production run and the initial scale we ran at:

  • 1 nextflow head job computes 20,000 individuals
  • 8–10 nextflow head jobs ran in parallel, 1 kicked off every half hour via AWS Lambda triggered by an AWS CloudWatch rule
  • >5000 task jobs per head job running on AWS Spot Instances
  • ~2000 task jobs on average in runnable/starting/running state at all times in the AWS Batch dashboard
  • >800K succeeded task jobs showing up in the AWS Batch dashboard
  • Less than 1.5% of task jobs fail mostly due to spot terminations and all are retryable
  • Processed ~1M individuals per day
  • The compute environment taking care of the task jobs scaled up and down between 4K and 10K vCPUs automatically based on the request load

The following screenshots are just momentary snapshots of what the AWS Batch dashboard looked like at some point during processing.

The following is an overview of the number of vCPUs that are being scaled up and down automatically based on the job pressure over the course of 3 days:

We have capped our compute environment to use a maximum of 10K vCPUs as there are a few more optimizations that we would need to consider if we wanted to scale up higher. One would for example be working around AWS request throttling (e.g. S3 slow downs) that are a result of a high number of processes running in parallel that are using input files from S3.

Summary

At the end of the day, the sky’s the limit. With a few tweaks here or there, we could scale even higher. Nextflow does a lot of work so a developer delivering features does not need to “reinvent the wheel” over and over again. It comes with many tools to:

  • build robust and resumable workflows with minimal coding.
  • separate the functional logic from the underlying computation environment to run a workflow in different local or cloud environments without any change.
  • optimize and tune workflows using the configurable collection of runtime metrics and visualizations for task jobs.

¹ Go here for a deeper understanding of the split of the X chromosome into par1 and par2 regions; nonpar comprises the region between par1 and par2.

Acknowledgments

Big thanks to the authors of the Nextflow library and documentation, which were heavily referenced while writing this blog post

About the Authors

Anuved Verma is a Software Engineer at 23andMe on the Big Data Platform team. He loves building software to help genetics research at scale.

Anja Bog is an Engineering Manager on the Big Data Platform team. She’s passionate about contributing to a world where we can all live healthier longer.

23andMe is hiring! Check out our current openings!

--

--