DNASeq Pipeline

The Databricks DNASeq pipeline is a GATK best practices compliant pipeline for short read alignment, variant calling, and variant annotation. It uses the following software packages, parallelized using Spark:

  • BWA v0.7.17
  • ADAM v0.25.0
  • GATK HaplotypeCaller v4.0.11.0
  • SnpEff v4.3

For more information about the pipeline implementation and expected runtimes and costs for various option combinations, see Building the Fastest DNASeq Pipeline at Scala.

Beta

The Databricks DNASeq pipeline requires Databricks Runtime HLS, which is in Beta.

Setup

The pipeline is run as a Databricks job. Most likely, a Databricks solutions architect will work with you to set up the initial Databricks job. The necessary details are:

  • The cluster configuration should use Databricks Runtime HLS.
  • The task should be the DNASeq notebook found at the bottom of this page.
  • For best performance, use compute optimized instances with at least 60GB of memory. We recommend c5.9xlarge.
  • If you’re running base quality score recalibration, use general purpose (m5.4xlarge) instances instead since this operation requires more memory.
  • To reduce costs, use all spot workers with the Spot fall back to On-demand option selected.
  • Attach 1 500GB SSD EBS volume
DNASeq job

Parameters

The pipeline accepts parameters that control its behavior. The most important and commonly changed parameters are documented here; the rest can be found in the DNASeq notebook. Parameters can be set for all runs or per-run.

Parameter Default Description
manifest n/a The path of the manifest file describing the input.
output n/a The path where pipeline output should be written.
replayMode skip

One of:

  • skip: stages are skipped if output already exists.
  • overwrite: existing output is deleted.
exportVCF false If true, the pipeline writes results in VCF as well as Delta Lake.
referenceConfidenceMode NONE

One of:

  • If NONE, only variant sites are included in the output
  • If GVCF, all sites are included, with adjacent reference sites banded.
  • If BP_RESOLUTION, all sites are included.

Tip

To optimize runtime, set spark.sql.shuffle.partitions in the Spark config to three times the number of cores of the cluster.

Reference genomes

You must configure the reference genome using an environment variable. To use GRCh37, set an environment variable like this:

refGenomeId=grch37

To use GRCh38 instead, replace grch37 with grch38.

To use a reference build other than GRCh37 or GRCh38, follow these steps:

  1. Prepare the reference for use with BWA.

  2. Copy the fasta and index files to the driver node of the cluster (using %fs cp or aws s3 cp).

  3. Build a bwa-jni image:

    import org.broadinstitute.hellbender.utils.bwa._
    BwaMemIndex.createIndexImageFromIndexFiles(fastaPath, imagePath)
    
  4. Save the index image to s3://<refGenome_path>/<refGenomeName>.fa.img.

    Note

    Use a cluster running Databricks Runtime HLS. The image build process will run on the driver.

  5. Create an init script that copies the index image from cloud storage to /mnt/dbnucleus/dbgenomics/<refGenomeId>/data/, that will enable all nodes on your cluster to access the reference.

    dbutils.fs.put(s"s3://<init_path>/init.sh", raw"""
    #!/bin/bash
    pip install awscli
    aws s3 sync s3://<refGenome_path>/ /mnt/dbnucleus/dbgenomics/refGenome/data/ --exclude "*" --include "refGenomeName*"
    """, true)
    
  6. Configure your cluster to use the init script.

  7. Set the refGenomeName and refGenomePath parameters in the DNASeq notebook.

Manifest format

The manifest is a CSV file describing where to find the input FASTQ or BAM files. An example:

file_path,sample_id,paired_end,read_group_id
*_R1_*.fastq.bgz,HG001,1,read_group
*_R2_*.fastq.bgz,HG001,2,read_group

If your input consists of unaligned BAM files, you should omit the paired_end field:

file_path,sample_id,paired_end,read_group_id
*.bam,HG001,,read_group

Tip

The file_path field in each row may be an absolute path or a path relative to the manifest. You can include globs (*) to match many files.

Supported input formats

  • SAM
  • BAM
  • CRAM
  • Parquet
  • FASTQ
    • bgzip *.fastq.bgz (recommended) bgzipped files with the *.fastq.gz extension are recognized as bgz.
    • uncompressed *.fastq
    • gzip *.fastq.gz

Important

Gzipped files are not splittable. Choose autoscaling clusters to minimize cost for these files.

To block compress a FASTQ, install htslib, which includes the bgzip executable.

Locally
gunzip -c <my_file>.gz | bgzip -c | aws s3 cp - s3://<my_s3_file_path>.bgz
From S3
aws s3 cp s3://<my_s3_file_path>.gz - | gunzip -c | bgzip -c | aws s3 cp - s3://<my_s3_file_path>.bgz

Output

The aligned reads, called variants, and annotated variants are all written out to Delta tables inside the provided output directory. Each table is partitioned by sample ID. In addition, if you configured the pipeline to export VCFs or GVCFs, they’ll appear under the output directory as well.

output
|---alignments
    |---sampleId=HG001
        |---Delta files
    |---sampleId=HG002
|---annotations
    |---sampleId=HG001
        |---Delta files
|---annotations.vcf
    |---sampleId=HG001
        |---HG001.vcf
|---genotypes
    |---sampleId=HG001
        |---Delta files
|---genotypes.vcf
    |---sampleId=HG001
        |---HG001.g.vcf

When you run the pipeline on a new sample, it’ll appear as a new partition. If you run the pipeline for a sample that already appears in the output directory, that partition will be overwritten.

Since all the information is available in Delta, you can easily analyze it with Spark in SQL, Scala, Python, or R. For example:

# Load the data
df = spark.read.format("delta).load("/genomics/output_dir/genotypes")
# Show all variants from chromosome 12
display(df.where("contigName == '12'").orderBy("sampleId", "start"))
-- Register the table in the catalog
CREATE TABLE genotypes
USING delta
LOCATION '/genomics/output_dir/genotypes'

Troubleshooting

Job is slow and few tasks are running
Usually indicates that the input FASTQ files are compressed with gzip instead of bgzip. Gzipped files are not splittable, so the input cannot be processed in parallel. Try using the parallel bgzip notebook to convert a directory of files from gzip to bgzip in parallel.

Run programmatically

In addition to using the UI, you can start runs of the pipeline programmatically using the Databricks CLI.

Find the job id

After setting up the pipeline job in the UI, copy the Job ID as you pass it to the jobs run-now CLI command.

Here’s an example bash script that you can adapt for your workflow:

# Generate a manifest file
cat <<HERE >manifest.csv
file_path,sample_id,paired_end,read_group_id
dbfs:/genomics/my_new_sample/*_R1_*.fastq.bgz,my_new_sample,1,read_group
dbfs:/genomics/my_new_sample/*_R2_*.fastq.bgz,my_new_sample,2,read_group
HERE

# Upload the file to DBFS
DBFS_PATH=dbfs:/genomics/manifests/$(date +"%Y-%m-%dT%H-%M-%S")-manifest.csv
databricks fs cp index.rst $DBFS_PATH

# Start a new run
databricks jobs run-now --job-id <job-id> --notebook-params "{\"manifest\": \"$DBFS_PATH\"}"

In addition to starting runs from the command line, you can use this pattern to invoke the pipeline from automated systems like Jenkins.