处理大量输入

本页面介绍如何在使用 Variant Transforms 工具加载大量文件的同时提高性能并降低成本。

Variant Transforms 工具默认设置

运行 Variant Transforms 工具时,它使用以下默认设置:

标志 默认值
--optimize_for_large_inputs
--max_num_workers 自动确定
--worker_machine_type n1-standard-1
--disk_size_gb 250
--worker_disk_type PD
--num_bigquery_write_shards 1

可以调整这些设置以优化工具的性能。您还可以申请 Compute Engine 配额。 以下是每个标志的说明:

--optimize_for_large_inputs

此标志针对大量输入优化 Dataflow 流水线,可以显著节省费用并缩短周转时间。但是,对于小量输入,它产生的额外开销会增加费用并延长周转时间。

当您加载 50000 个以上文件和/或为大量(超过 30 亿)变体启用变体合并时,请将此标志设置为 true

--max_num_workers

默认情况下,Dataflow 使用自动调节算法,该算法会选择运行作业所需的适当数量的工作器实例(受 Compute Engine 配额限制)。

调整此标志会将作业中的最大工作器数量增至 Dataflow 将自动调节的数量。您还可以使用 --num_workers 标志指定最初分配给作业的工作器数量。

--worker_machine_type

默认情况下,Dataflow 使用 n1-standard-1 机器类型,它具有 1 个 vCPU 和 3.75GB 的 RAM。如果要加载大量文件,则可能需要申请更大的机器。如需了解详情,请参阅预定义机器类型

为使 Dataflow 达到最佳性能,请使用大量小型机器而不是少量大型机器。例如,您应该使用 200 个 n1-standard-4 工作器,而不是 25 个 n1-standard-32 工作器。这是因为每台机器上磁盘和网络的每秒输入/输出操作次数 (IOPS) 会受到限制,特别是在启用了变体合并时。

然而,由于磁盘和 IP 地址存在配额限制,因此您不一定能使用大量工作器。在这些情况下,请将 SSD(使用 --worker_disk_type 标志设置)与大型(n1-standard-16 或更大)机器搭配使用。这会产生更高的磁盘 IOPS 并避免空闲 CPU 周期。由于磁盘比 CPU 便宜得多,因此请尝试优化 CPU 使用率而不是磁盘使用情况。

--disk_size_gb

默认情况下,每个工作器的磁盘大小为 250GB。所有工作器的磁盘空间总量必须至少与正在处理的所有 VCF 文件的未压缩大小相同。但是,我们建议的磁盘大小应是正在处理的所有 VCF 文件的未压缩大小的三到四倍,原因如下:

  • 流水线的中间阶段需要额外的磁盘空间。
  • 转换阶段会出现额外的开销。例如,样本名称在 BigQuery 输出中的每个记录中重复,但在 VCF 文件头中仅指定一次。

如果启用了变体合并--num_bigquery_write_shards 标志,则每个工作器可能需要更大的磁盘大小,因为相同的变体将聚合在单个机器上。

--worker_disk_type

Compute Engine 实例有两种主要的可用存储类型:标准和 SSD。

与标准磁盘相比,SSD 可提供更高的 IOPS,但价格也更昂贵。然而,当使用大型机器(如 n1-standard-16)时,SSD 更具性价比。这是因为 SSD 可以避免由于其 IOPS 限制而导致的空闲 CPU 周期。

如果启用了变体合并--num_bigquery_write_shards 标志,请使用 SSD。这是因为这些操作会重排数据,所以需要大量磁盘 I/O。

要使用 SSD,请将 --worker_disk_type 标志设置为 compute.googleapis.com/projects//zones//diskTypes/pd-ssd

--num_bigquery_write_shards

在主要转换完成后,作为后处理步骤,作为 Variant Transforms 工具基础的 Dataflow 流水线会将数据写入 BigQuery。由于 BigQuery 具有写入限制,因此在写入数据时需要对其进行分片。对数据进行分片可以大幅提升向 BigQuery 写入数据的速度,因为它可以并行处理写入过程,并允许一次向 BigQuery 加载大量(大于 5TB)数据。

加载超过 10 亿行(变体合并后)或具有 1TB 输出的数据时,请将 --num_bigquery_write_shards 标志设置为 20

如果使用大量分片(例如 50 个),则向 BigQuery 写入数据的过程可能会失败,这是因为每个 BigQuery 表格的并发写入数量都存在限制。

申请 Compute Engine 配额

默认情况下,Compute Engine 已配置适当的资源配额以防止意外错误使用。通过增加配额,您可以同时启动更多虚拟机,从而提高吞吐量并缩短周转时间。

如果要加载大量文件,但工具性能不佳或无法运行,您应该超出项目默认值申请更多配额。以下是关于增加配额的一些建议:

  • CPU 数:每个工作器至少 1 个,如果使用更大的机器类型则应申请更多 CPU
  • 永久性磁盘(标准,单位为 GB):每个工作器至少 250GB,如果使用更大的磁盘,则应申请更大容量
  • 永久性磁盘(SSD,单位为 GB):仅在 --worker_disk_type 标志设置为 SSD 时才需要。推荐的配额大小为每个工作器至少 250GB,如果使用更大的磁盘,则应申请更大容量
  • 使用中的 IP 地址数:每个工作器 1 个

请注意,Compute Engine 配额代表作业可用资源的上限。例如,如果将使用中的 IP 地址数的配额设置为 10,但您在运行工具时将 --max_num_workers 设置为 20,则作业将仅运行 10 个工作器而不是 20 个工作器。

使用预处理器工具处理大量输入

处理大量输入耗时费力,因此,您应该对 VCF 文件运行预处理器工具,然后再将其加载到 BigQuery 中。 这样做可以避免因无效记录而导致的故障,从而降低费用并缩短周转时间。

根据输入文件的质量,您可能希望使用 --report_all_conflicts 标志运行预处理器工具以获取有关文件的完整报告。这样做可能需要更多时间,但可以提供更准确的数据报告。如果您不确定输入文件的质量,我们强烈建议您执行此步骤。

变体合并

您可以使用变体合并来跨 VCF 文件合并检出。通过传递 --variant_merge_strategy 标志可以指定不同的合并选项。

默认情况下,VCF 文件之间的检出不会合并。例如,如果下面的两个 VCF 文件位于同一个 Cloud Storage 存储分区中,并且您要同时加载它们(通过传递 --input_pattern gs://my-bucket/*.vcf 标志),则 Variant Transforms 工具会将两行加载到 BigQuery 中:

gs://my-bucket/a.vcf

## <headers omitted>
#CHROM  POS   ID    REF   ALT   QUAL  FILTER  INFO    FORMAT  S1      S2
1       100   rs1   A     T     29    PASS    DP=14   GT:GQ   0|1:48  0/0:30

gs://my-bucket/b.vcf

## <headers omitted>
#CHROM  POS   ID    REF   ALT   QUAL  FILTER  INFO         FORMAT  S3
1       100   rs1   A     T     7     q10     DP=11;AA=G   GT:GQ   1/1:2

即使所有检出都具有相同的变体,但一行可包含检出 S1S2,另一行可包含检出 S3

MOVE_TO_CALLS 策略

--variant_merge_strategy MOVE_TO_CALLS 策略合并以下字段相同的文件之间的检出:

  • reference_name
  • start_position
  • end_position
  • reference_bases
  • alternate_bases

默认情况下,这些字段按以下方式合并:

  • calls 的合并没有特定顺序。
  • 选择合并的变体中最高的 quality 值。
  • 合并所有 filter 值并去重。
  • 合并所有 names(相当于 VCF 规范中的 ID 列)并去重。
  • 选择一个 INFO 字段以表示整个合并变体记录,没有特定顺序。 对于重复字段,选择单个集合来表示该特定字段。

您可以使用以下标志自定义合并策略:

  • --copy_quality_to_calls 和/或 --copy_filter_to_calls

    将每个文件中的 quality 和/或 filter 值复制到该文件中指定的检出集合。这在合并单个检出 VCF 文件时非常有用,因为 qualityfilter 值通常对应于检出层级的质量和过滤器。变体层级的 qualityfilter 值仍将根据上述逻辑合并。

  • --info_keys_to_move_to_calls_regex REGEX

    将与提供的正则表达式匹配的 INFO 字段移动到关联的检出。这可确保在合并时保留所有 INFO 字段。

    示例

    • 指定 --info_keys_to_move_to_calls_regex .* 将所有 INFO 键移动到检出。
    • 指定 --info_keys_to_move_to_calls_regex ^(AA|AF)$ 只会将 AAAF 字段移动到检出。

    如果 INFOFORMAT 字段中的键相同,则无法将该键移动到检出,并且将引发错误。

    如需详细了解正则表达式规范,请参阅正则表达式句法规则

示例

要查看有关合并逻辑如何实现的示例,请参阅 GitHub 上 move_to_calls_strategy.py 文件中的代码和文档,请特别注意 get_merged_variants 方法。

如下示例展示了合并上述 gs://my-bucket/a.vcfgs://my-bucket/b.vcf 文件时的输出。

默认设置

下面的示例展示了使用默认设置和 --variant_merge_strategy MOVE_TO_CALLS 标志运行 Variant Transforms 工具的输出:

reference_name: "1"
start_position: 100
end_position: 101
reference_bases: "A"
alternate_bases: {alt: "T"}
names: ["rs1"]
quality: 29
filter: ["PASS", "q10"]
call: {
  [
    name: "S3",
    genotype: [1, 1]
    phaseset: null
    GQ: 2
  ],
  [
    name: "S1",
    genotype: [0, 1]
    phaseset: "*"
    GQ: 48
  ],
  [
    name: "S2",
    genotype: [0, 0]
    phaseset: null
    GQ: 30
  ]
}
DP: 14  # This can also be 11.
AA: "G"

qualityfilter 复制到检出

下面的示例展示了使用默认设置和以下标志运行 Variant Transforms 工具的输出:

  • --variant_merge_strategy MOVE_TO_CALLS
  • --copy_quality_to_calls
  • --copy_filter_to_calls
reference_name: "1"
start_position: 100
end_position: 101
reference_bases: "A"
alternate_bases: {alt: "T"}
names: ["rs1"]
quality: 29
filter: ["PASS", "q10"]
call: {
  [
    name: "S3",
    genotype: [1, 1]
    phaseset: null
    GQ: 2
    quality: 7
    filter: ["q10"]
  ],
  [
    name: "S1",
    genotype: [0, 1]
    phaseset: "*"
    GQ: 48
    quality: 29
    filter: ["PASS"]
  ],
  [
    name: "S2",
    genotype: [0, 0]
    phaseset: null
    GQ: 30
    quality: 29
    filter: ["PASS"]
  ]
}
DP: 14  # This can also be 11.
AA: "G"

使用正则表达式将 INFO 字段移动到关联的检出

下面的示例展示了使用默认设置和以下标志运行 Variant Transforms 工具的输出:

  • --variant_merge_strategy MOVE_TO_CALLS
  • --copy_quality_to_calls
  • --copy_filter_to_calls
  • --info_keys_to_move_to_calls_regex ^DP$
reference_name: "1"
start_position: 100
end_position: 101
reference_bases: "A"
alternate_bases: {alt: "T"}
names: ["rs1"]
quality: 29
filter: ["PASS", "q10"]
call: {
  [
    name: "S3",
    genotype: [1, 1]
    phaseset: null
    GQ: 2
    quality: 7
    filter: ["q10"]
    DP: 11
  ],
  [
    name: "S1",
    genotype: [0, 1]
    phaseset: "*"
    GQ: 48
    quality: 29
    filter: ["PASS"]
    DP: 14
  ],
  [
    name: "S2",
    genotype: [0, 0]
    phaseset: null
    GQ: 30
    quality: 29
    filter: ["PASS"]
    DP: 14
  ]
}
AA: "G"