从 Dataflow 写入 BigQuery

本文档介绍如何使用 Apache Beam BigQuery I/O 连接器将数据从 Dataflow 写入 BigQuery。

Apache Beam SDK 中提供 BigQuery I/O 连接器。我们建议使用最新的 SDK 版本。如需了解详情,请参阅 Apache Beam 2.x SDK

我们还提供了针对 Python 的跨语言支持

概览

BigQuery I/O 连接器支持使用以下方法将数据写入 BigQuery:

  • STORAGE_WRITE_API。在此模式下,连接器使用 BigQuery Storage Write API 直接将数据写入 BigQuery 存储。Storage Write API 将流式提取和批量加载整合到一个高性能 API 中。此模式可保证“正好一次”语义。
  • STORAGE_API_AT_LEAST_ONCE。此模式也使用 Storage Write API,但提供“至少一次”语义。此模式可缩短大多数流水线的延迟时间。不过,可能会出现重复写入。
  • FILE_LOADS。在此模式下,连接器将输入数据写入 Cloud Storage 中的暂存文件。然后,连接器运行 BigQuery 加载作业以将数据加载到 BigQuery 中。 该模式是有界限 PCollections 的默认模式,最常见于批处理流水线中。
  • STREAMING_INSERTS。在此模式中,连接器使用旧版流式插入 API。此模式是无界限 PCollections 的默认模式,但不建议用于新项目。

选择写入方法时,请考虑以下几点:

  • 对于流处理作业,请考虑使用 STORAGE_WRITE_APISTORAGE_API_AT_LEAST_ONCE,因为这些模式会直接写入 BigQuery 存储,而不使用中间暂存文件。
  • 如果使用“至少一次”流处理模式运行流水线,请将写入模式设置为 STORAGE_API_AT_LEAST_ONCE。此设置更高效,并且与“至少一次”流处理模式的语义相匹配。
  • 文件加载和 Storage Write API 具有不同的配额和限制
  • 加载作业使用共享 BigQuery 槽池或预留槽。如需使用预留槽,请在预留分配类型为 PIPELINE 的项目中运行加载作业。如果您使用共享 BigQuery 槽池,加载作业是免费的。不过,BigQuery 不保证共享数据池的可用容量。如需了解详情,请参阅预留简介

最大并行数量

  • 对于流式处理流水线中的 FILE_LOADSSTORAGE_WRITE_API,连接器会将数据分片为多个文件或流。通常,我们建议调用 withAutoSharding 以启用自动分片。

  • 对于批处理流水线中的 FILE_LOADS,连接器会将数据写入分区文件,然后分区文件会并行加载到 BigQuery 中。

  • 对于批处理流水线中的 STORAGE_WRITE_API,每个工作器都会创建一个或多个要写入 BigQuery 的流(由分片总数决定)。

  • 对于 STORAGE_API_AT_LEAST_ONCE,有单个默认写入流。多个工作器附加到此写入流。

性能

下表显示了各种 BigQuery I/O 读取选项的性能指标。工作负载使用 Java 版 Apache Beam SDK 2.49.0 在一个 e2-standard2 工作器上运行。它们未使用 Runner v2。

1 亿条记录 | 1 KB | 1 列 吞吐量(字节) 吞吐量(元素)
Storage Write 55 MBps 每秒 54,000 个元素
Avro Load 78 MBps 每秒 77,000 个元素
Json Load 54 MBps 每秒 53,000 个元素

这些指标基于简单的批处理流水线。它们旨在比较 I/O 连接器之间的性能,不一定代表实际流水线。Dataflow 流水线性能很复杂,它受到多个因素的影响,包括虚拟机类型、正在处理的数据量、外部来源和接收器的性能以及用户代码。指标基于运行 Java SDK,不代表其他语言 SDK 的性能特征。如需了解详情,请参阅 Beam IO 性能

最佳做法

本部分介绍有关从 Dataflow 写入 BigQuery 的最佳实践。

一般注意事项

  • Storage Write API 具有配额限制。对于大多数流水线,连接器会处理这些限制。但是,某些场景可能会耗尽可用的 Storage Write API 流。例如,如果流水线使用自动分片和自动扩缩并具有大量目标位置,则可能会发生此问题,尤其是在具有多变工作负载的长时间运行的作业中。如果发生此问题,请考虑使用 STORAGE_WRITE_API_AT_LEAST_ONCE 来避免此问题。

  • 使用 Google Cloud 指标来监控 Storage Write API 配额用量。

  • 使用文件加载时,Avro 的性能通常优于 JSON。如需使用 Avro,请调用 withAvroFormatFunction

  • 默认情况下,加载作业与 Dataflow 作业在同一项目中运行。如需指定其他项目,请调用 withLoadJobProjectId

  • 使用 Java SDK 时,请考虑创建一个表示 BigQuery 表架构的类。然后,在流水线中调用 useBeamSchema,以便在 Apache Beam Row 和 BigQuery TableRow 类型之间自动转换。如需查看架构类的示例,请参阅 ExampleModel.java

  • 如果要加载具有包含数千个字段的复杂架构的表,请考虑调用 withMaxBytesPerPartition 来为每个加载作业设置较小的大小上限。

流处理流水线

以下建议适用于流处理流水线。

  • 对于流式处理流水线,我们建议使用 Storage Write API(STORAGE_WRITE_APISTORAGE_API_AT_LEAST_ONCE)。

  • 流式处理流水线可以使用文件加载,但这种方法有以下缺点:

    • 它需要使用数据选取功能才能写入文件。您无法使用全局窗口。
    • 使用共享槽池时,BigQuery 会尽力而为地加载文件。写入记录与记录在 BigQuery 中可用之间可能存在明显延迟。
    • 如果加载作业失败(例如由于数据错误或架构不匹配),则整个流水线将会失败。
  • 请尽可能考虑使用 STORAGE_WRITE_API_AT_LEAST_ONCE。这可能会导致将重复的记录写入 BigQuery,但比 STORAGE_WRITE_API 费用更低,且可伸缩性更强。

  • 一般而言,请避免使用 STREAMING_INSERTS。流式插入比 Storage Write API 要贵,并且性能更低。

  • 数据分片可以提高流式处理流水线的性能。对于大多数流水线而言,自动分片是一个很好的起点。但是,您可以按如下方式调整分片:

  • 如果您使用流式插入,我们建议您将 retryTransientErrors 设置为重试政策

批处理流水线

以下建议适用于批处理流水线。

  • 对于大多数大型批处理流水线,我们建议先尝试 FILE_LOADS。批处理流水线可以使用 STORAGE_WRITE_API,但对于大规模情况(1,000 个或更多的 vCPU),或者如果并发流水线正在运行,可能会超出配额限制。Apache Beam 不会限制批量 STORAGE_WRITE_API 作业的写入流数量上限,因此作业最终会达到 BigQuery Storage API 限制。

  • 使用 FILE_LOADS 时,您可能会耗尽共享 BigQuery 槽池或预留槽池。如果遇到此类故障,请尝试以下方法:

    • 减小作业的工作器数量上限或工作器大小。
    • 购买更多预留槽
    • 考虑使用 STORAGE_WRITE_API
  • 使用 STORAGE_WRITE_API 可能会使中小型流水线(少于 1,000 个 vCPU)受益。对于这些较小的作业,如果您需要死信队列FILE_LOADS 共享槽池不足,请考虑使用 STORAGE_WRITE_API

  • 如果您可以容忍重复数据,请考虑使用 STORAGE_WRITE_API_AT_LEAST_ONCE。此模式可能会导致将重复记录写入 BigQuery,但费用可能会比 STORAGE_WRITE_API 选项低。

  • 不同写入模式的执行方式可能会因流水线的特性而异。可通过实验找到最适合您的工作负载的写入模式。

处理行级错误

本部分介绍了如何处理可能在行级层发生的错误,例如输入数据格式错误或架构不匹配。

对于 Storage Write API,所有无法写入的行都会被放入单独的 PCollection 中。如需获取此集合,请对 WriteResult 对象调用 getFailedStorageApiInserts。如需查看此方法的示例,请参阅将数据流式插入 BigQuery

最好将错误发送到死信队列或表,以供日后进行处理。如需详细了解此模式,请参阅 BigQueryIO 死信模式

对于 FILE_LOADS,如果在加载数据时发生错误,则加载作业会失败,并且流水线会抛出运行时异常。您可以在 Dataflow 日志中查看错误或查看 BigQuery 作业历史记录。I/O 连接器不会返回个别失败行的相关信息。

如需详细了解如何排查错误,请参阅 BigQuery 连接器错误

示例

以下示例展示了如何使用 Dataflow 写入 BigQuery。

写入现有表

以下示例会创建一个将 PCollection<MyData> 写入 BigQuery 的批处理流水线,其中 MyData 是自定义数据类型。

BigQueryIO.write() 方法会返回 BigQueryIO.Write<T> 类型,用于配置写入操作。如需了解详情,请参阅 Apache Beam 文档中的写入表格。此代码示例会将数据写入现有表 (CREATE_NEVER) 并将新行附加到表 (WRITE_APPEND)。

Java

如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWrite {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to an exiting BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API));
    pipeline.run().waitUntilFinish();
  }
}

写入新表或现有表

以下示例在目标表不存在时通过将创建处置方式设置为 CREATE_IF_NEEDED来创建新表。使用此选项时,您必须提供表架构。如果创建新表,则连接器使用此架构。

Java

如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWriteWithSchema {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Define a table schema. A schema is required for write disposition CREATE_IF_NEEDED.
    TableSchema schema = new TableSchema()
        .setFields(
            Arrays.asList(
                new TableFieldSchema()
                    .setName("user_name")
                    .setType("STRING")
                    .setMode("REQUIRED"),
                new TableFieldSchema()
                    .setName("age")
                    .setType("INT64") // Defaults to NULLABLE
            )
        );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to a new or existing BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withSchema(schema)
            .withMethod(Write.Method.STORAGE_WRITE_API)
        );
    pipeline.run().waitUntilFinish();
  }
}

将数据流式传输到 BigQuery

以下示例展示了如何通过将写入模式设置为 STORAGE_WRITE_API,使用“正好一次”语义来流式插入数据

并非所有流处理流水线都需要“正好一次”语义。例如,您可以从目标表中手动移除重复项。如果您的场景可以接受重复记录的可能性,请考虑将写入方法设置为 STORAGE_API_AT_LEAST_ONCE 来使用“至少一次”语义。此方法通常更高效,可缩短大多数流水线的延迟时间。

Java

如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class BigQueryStreamExactlyOnce {
  // Create a PTransform that sends simulated streaming data. In a real application, the data
  // source would be an external source, such as Pub/Sub.
  private static TestStream<String> createEventSource() {
    Instant startTime = new Instant(0);
    return TestStream.create(StringUtf8Coder.of())
        .advanceWatermarkTo(startTime)
        .addElements(
            TimestampedValue.of("Alice,20", startTime),
            TimestampedValue.of("Bob,30",
                startTime.plus(Duration.standardSeconds(1))),
            TimestampedValue.of("Charles,40",
                startTime.plus(Duration.standardSeconds(2))),
            TimestampedValue.of("Dylan,Invalid value",
                startTime.plus(Duration.standardSeconds(2))))
        .advanceWatermarkToInfinity();
  }

  public static PipelineResult main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);
    options.setStreaming(true);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Add a streaming data source.
        .apply(createEventSource())
        // Map the event data into TableRow objects.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((String x) -> {
              String[] columns = x.split(",");
              return new TableRow().set("user_name", columns[0]).set("age", columns[1]);
            }))
        // Write the rows to BigQuery
        .apply(BigQueryIO.writeTableRows()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API)
            // For exactly-once processing, set the triggering frequency.
            .withTriggeringFrequency(Duration.standardSeconds(5)))
        // Get the collection of write errors.
        .getFailedStorageApiInserts()
        .apply(MapElements.into(TypeDescriptors.strings())
            // Process each error. In production systems, it's useful to write the errors to
            // another destination, such as a dead-letter table or queue.
            .via(
                x -> {
                  System.out.println("Failed insert: " + x.getErrorMessage());
                  System.out.println("Row: " + x.getRow());
                  return "";
                }));
    return pipeline.run();
  }
}

后续步骤