Handling large inputs

This page shows how to improve performance and reduce costs when using the Variant Transforms tool to load large numbers of files.

Variant Transforms tool default settings

When you run the Variant Transforms tool, it uses the following default settings:

Flag Default
--optimize_for_large_inputs False
--max_num_workers Automatically determined
--worker_machine_type n1-standard-1
--disk_size_gb 250
--worker_disk_type PD
--num_bigquery_write_shards 1

Each of these settings can be adjusted to optimize the performance of the tool. You can also request Compute Engine quota. An explanation of each flag is below:

--optimize_for_large_inputs

This flag optimizes the Cloud Dataflow pipeline for large inputs, which can save significant costs and reduce turnaround time. However, for small inputs, the additional overhead it creates can increase costs and turnaround time.

Set this flag to true when you are loading more than 50,000 files and/or variant merging is enabled for a large number (more than 3 billion) of variants.

--max_num_workers

By default, Cloud Dataflow uses an autoscaling algorithm that chooses the appropriate number of worker instances required to run your job (limited by your Compute Engine quota).

Adjusting this flag increases the maximum number of workers in the job to which Cloud Dataflow will autoscale. You can also use the --num_workers flag to specify the number of workers that are initially assigned to the job.

--worker_machine_type

By default, Cloud Dataflow uses the n1-standard-1 machine type, which comes with 1 vCPU and 3.75GB of RAM. If you are loading a large number of files, you might need to request a larger machine. For details, see Predefined machine types.

For best performance with Cloud Dataflow, use a large number of small machines rather than a small number of large machines. For example, you should use 200 n1-standard-4 workers rather than 25 n1-standard-32 workers. This recommendation is provided because disk and network input/output operations per second (IOPS) are limited on each machine, particularly if variant merging is enabled.

However, due to quota limits on disk and IP addresses, using a large number of workers may not always be possible. In these cases, use SSDs (set with the --worker_disk_type flag) with a large (n1-standard-16 or larger) machine. This yields higher disk IOPS and avoids idle CPU cycles. Because disk is significantly cheaper than CPU, try to optimize for high CPU utilization rather than disk usage.

--disk_size_gb

By default, each worker has 250GB of disk size. The total amount of disk space for all workers must be, at minimum, as large as the uncompressed size of all of the VCF files being processed. However, the recommended disk size is three to four times as large as the uncompressed size of all of the VCF files being processed for the following reasons:

  • The intermediate stages of the pipeline require additional disk space.
  • The transform stages have additional overhead. For example, the sample name is repeated in every record in the BigQuery output, but is specified only once in the VCF header.

If variant merging or the --num_bigquery_write_shards flag are enabled, a larger disk size might be required for each worker because the same variants will be aggregated together on a single machine.

--worker_disk_type

There are two main types of storage available for Compute Engine instances: standard and SSD.

SSDs provide significantly more IOPS than standard disks, but are more expensive. However, when working with a large machine, such as n1-standard-16, SSDs provide cost savings. This is because SSDs can avoid idle CPU cycles due to their IOPS limitations.

Use SSDs if variant merging or the --num_bigquery_write_shards flag is enabled. These operations shuffle data, which requires significant amounts of disk I/O.

To use SSDs, set the --worker_disk_type flag to compute.googleapis.com/projects//zones//diskTypes/pd-ssd.

--num_bigquery_write_shards

The Cloud Dataflow pipeline that underlies the Variant Transforms tool writes data to BigQuery as a postprocessing step after the main transforms have completed. BigQuery has write limitations, so it's necessary to shard data when the data is being written. Sharding the data makes the process of writing data to BigQuery significantly faster because it parallelizes the write process and allows for loading large (greater than 5TB) sets of data to BigQuery at a time.

When loading data that has either more than 1 billion rows (after variant merging) or 1 TB of output, set the --num_bigquery_write_shards flag to 20.

If you use a large number of shards, such as 50, the process of writing data to BigQuery might fail because there is a limit on the number of concurrent writes for each BigQuery table.

Requesting Compute Engine quota

By default, Compute Engine has resource quotas in place to prevent inadvertent usage. By increasing quotas, you can launch more virtual machines concurrently, increasing throughput and reducing turnaround time.

If you are loading a large number of files and the tool is performing poorly or not at all, you can request additional quota above your project's default. Some recommendations for quota increases are:

  • CPUs: Minimum 1 per worker, more if a larger machine type is used
  • Persistent Disk Standard (GB): At least 250GB per worker, more if a larger disk is used
  • Persistent Disk SSD (GB): Only needed if the --worker_disk_type flag is set to SSD. The recommended quota size is at least 250GB per worker, more if a larger disk is used
  • In-use IP Addresses: 1 per worker

Note that the Compute Engine quota is an upper limit of the resources available for a job. For example, if the quota for in-use IP addresses is set to 10, but you run the tool with --max_num_workers set to 20, then the job will run with only ten workers, instead of 20.

Handling large inputs with the preprocessor tool

Processing large inputs can be expensive and take several hours. Therefore, you should run the preprocessor tool on your VCF files before loading them into BigQuery. Doing so can avoid failures due to invalid records and therefore reduce costs and turnaround time.

Depending on the quality of the input files, you might want to run the preprocessor tool with the --report_all_conflicts flag to get a full report on the files. Doing so can take more time, but provides a more accurate report on the data. This step is highly recommended if you are unsure about the quality of the input files.

Variant merging

You can use variant merging to merge calls across VCF files. There are different merging options available which you can specify by passing the --variant_merge_strategy flag.

By default, calls between VCF files are not merged. For example, if the two VCF files below were in the same Cloud Storage bucket, and you were to load them at the same time (by passing the --input_pattern gs://my-bucket/*.vcf flag), the Variant Transforms tool would load two rows into BigQuery:

gs://my-bucket/a.vcf:

## <headers omitted>
#CHROM  POS   ID    REF   ALT   QUAL  FILTER  INFO    FORMAT  S1      S2
1       100   rs1   A     T     29    PASS    DP=14   GT:GQ   0|1:48  0/0:30

gs://my-bucket/b.vcf:

## <headers omitted>
#CHROM  POS   ID    REF   ALT   QUAL  FILTER  INFO         FORMAT  S3
1       100   rs1   A     T     7     q10     DP=11;AA=G   GT:GQ   1/1:2

Even though all of the calls have the same variant, one row would have calls S1 and S2, and the other row would have call S3.

MOVE_TO_CALLS strategy

The --variant_merge_strategy MOVE_TO_CALLS strategy merges calls across files that have the same following fields:

  • reference_name
  • start_position
  • end_position
  • reference_bases
  • alternate_bases

By default, these fields are merged as follows:

  • calls are merged in no particular order.
  • The highest quality value is chosen between the variants being merged.
  • All filter values are merged and de-duplicated.
  • All names (equivalent to the ID column in the VCF spec) are merged and de-duplicated.
  • One INFO field is chosen to represent the entire merged variant record in no particular order. For repeated fields, a single set is chosen to represent that particular field.

You can customize the merge strategy with the following flags:

  • --copy_quality_to_calls and/or --copy_filter_to_calls

    Copies the quality and/or filter values in each file to the set of calls specified in that file. This is useful when merging single call VCF files because quality and filter values typically correspond to the call-level quality and filter. The variant-level quality and filter values will still be merged according to the logic described above.

  • --info_keys_to_move_to_calls_regex REGEX

    Moves INFO fields that match the provided regular expression to the associated calls. This ensures that all INFO fields are kept when merging.

    Examples:

    • Specifying --info_keys_to_move_to_calls_regex .* moves all INFO keys to calls.
    • Specifying --info_keys_to_move_to_calls_regex ^(AA|AF)$ only moves AA and AF fields to calls.

    If a key is the same in both the INFO and FORMAT fields, the key cannot be moved to calls and an error will be raised.

    See Regular Expression Syntax for details on regular expression specifications.

Examples

To see an example of how the merging logic is implemented, read the code and documentation in the move_to_calls_strategy.py file on GitHub. Note in particular the get_merged_variants method.

The following examples show the output from merging the gs://my-bucket/a.vcf and gs://my-bucket/b.vcf files from above.

Default settings

The following example shows the output from running the Variant Transforms tool with its default settings and the --variant_merge_strategy MOVE_TO_CALLS flag:

reference_name: "1"
start_position: 100
end_position: 101
reference_bases: "A"
alternate_bases: {alt: "T"}
names: ["rs1"]
quality: 29
filter: ["PASS", "q10"]
call: {
  [
    name: "S3",
    genotype: [1, 1]
    phaseset: null
    GQ: 2
  ],
  [
    name: "S1",
    genotype: [0, 1]
    phaseset: "*"
    GQ: 48
  ],
  [
    name: "S2",
    genotype: [0, 0]
    phaseset: null
    GQ: 30
  ]
}
DP: 14  # This can also be 11.
AA: "G"

Copying quality and filter to calls

The following example shows the output from running the Variant Transforms tool with its default settings and the following flags:

  • --variant_merge_strategy MOVE_TO_CALLS
  • --copy_quality_to_calls
  • --copy_filter_to_calls
reference_name: "1"
start_position: 100
end_position: 101
reference_bases: "A"
alternate_bases: {alt: "T"}
names: ["rs1"]
quality: 29
filter: ["PASS", "q10"]
call: {
  [
    name: "S3",
    genotype: [1, 1]
    phaseset: null
    GQ: 2
    quality: 7
    filter: ["q10"]
  ],
  [
    name: "S1",
    genotype: [0, 1]
    phaseset: "*"
    GQ: 48
    quality: 29
    filter: ["PASS"]
  ],
  [
    name: "S2",
    genotype: [0, 0]
    phaseset: null
    GQ: 30
    quality: 29
    filter: ["PASS"]
  ]
}
DP: 14  # This can also be 11.
AA: "G"

Moving INFO fields to associated calls with a regular expression

The following example shows the output from running the Variant Transforms tool with its default settings and the following flags:

  • --variant_merge_strategy MOVE_TO_CALLS
  • --copy_quality_to_calls
  • --copy_filter_to_calls
  • --info_keys_to_move_to_calls_regex ^DP$
reference_name: "1"
start_position: 100
end_position: 101
reference_bases: "A"
alternate_bases: {alt: "T"}
names: ["rs1"]
quality: 29
filter: ["PASS", "q10"]
call: {
  [
    name: "S3",
    genotype: [1, 1]
    phaseset: null
    GQ: 2
    quality: 7
    filter: ["q10"]
    DP: 11
  ],
  [
    name: "S1",
    genotype: [0, 1]
    phaseset: "*"
    GQ: 48
    quality: 29
    filter: ["PASS"]
    DP: 14
  ],
  [
    name: "S2",
    genotype: [0, 0]
    phaseset: null
    GQ: 30
    quality: 29
    filter: ["PASS"]
    DP: 14
  ]
}
AA: "G"
Was this page helpful? Let us know how we did:

Send feedback about...

Cloud Life Sciences