从 BigQuery 读取到 Dataflow

本文档介绍了如何将数据从 BigQuery 读取到 Dataflow。

概览

对于大多数用例,请考虑使用托管式 I/O 从 BigQuery 读取数据。托管式 I/O 提供自动升级和一致的配置 API 等功能。从 BigQuery 读取数据时,托管式 I/O 会执行直接表读取,从而提供最佳读取性能。

如果您需要更高级的性能调优,不妨考虑使用 BigQueryIO 连接器。BigQueryIO 连接器支持直接读取表,也支持从 BigQuery 导出作业读取数据。它还可更精细地控制表记录的反序列化。如需了解详情,请参阅本文档中的使用 BigQueryIO 连接器

列投影和过滤

为了减少流水线从 BigQuery 读取的数据量,您可以使用以下技巧:

  • 列投影用于指定要从表中读取的列的子集。如果您的表包含大量列,但您只需要读取其中的一部分,请使用列投影。
  • 行过滤指定要应用于表的谓词。BigQuery 读取操作仅返回与过滤条件匹配的行,这可以减少流水线注入的总数据量。

以下示例从表中读取 "user_name""age" 列,并过滤掉不符合谓词 "age > 18" 的行。此示例使用托管式 I/O。

Java

如需向 Dataflow 进行身份验证,请设置应用默认凭证。如需了解详情,请参阅为本地开发环境设置身份验证

import com.google.common.collect.ImmutableMap;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;

public class BigQueryReadWithProjectionAndFiltering {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    String tableSpec = String.format("%s:%s.%s",
        options.getProjectId(),
        options.getDatasetName(),
        options.getTableName());

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("table", tableSpec)
        .put("row_restriction", "age > 18")
        .put("fields", List.of("user_name", "age"))
        .build();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(Managed.read(Managed.BIGQUERY).withConfig(config)).getSinglePCollection()
        .apply(MapElements
            .into(TypeDescriptors.strings())
            // Access individual fields in the row.
            .via((Row row) -> {
              String output = String.format("Name: %s, Age: %s%n",
                  row.getString("user_name"),
                  row.getInt64("age"));
              System.out.println(output);
              return output;
            }));
    pipeline.run().waitUntilFinish();
  }
}

从查询结果中读取

以下示例使用托管式 I/O 读取 SQL 查询的结果。它针对 BigQuery 公共数据集运行查询。您还可以使用 SQL 查询从 BigQuery 视图或物化视图中读取数据。

Java

如需向 Dataflow 进行身份验证,请设置应用默认凭证。如需了解详情,请参阅为本地开发环境设置身份验证

import com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;

public class BigQueryReadFromQuery {
  public static void main(String[] args) {
    // The SQL query to run inside BigQuery.
    final String queryString =
        "SELECT repo_name as repo, COUNT(*) as count "
            + "FROM `bigquery-public-data.github_repos.sample_commits` "
            + "GROUP BY repo_name";

    // Parse the pipeline options passed into the application.
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation().create();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("query", queryString)
        .build();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(Managed.read(Managed.BIGQUERY).withConfig(config)).getSinglePCollection()
        .apply(MapElements
            .into(TypeDescriptors.strings())
            // Access individual fields in the row.
            .via((Row row) -> {
              String output = String.format("Repo: %s, commits: %d%n",
                  row.getString("repo"),
                  row.getInt64("count"));
              System.out.println(output);
              return output;
            }));
    pipeline.run().waitUntilFinish();
  }
}

使用 BigQueryIO 连接器

BigQueryIO 连接器支持以下序列化方法:

连接器支持以下两种数据读取选项:

  • 导出作业。默认情况下,BigQueryIO 连接器会运行一个 BigQuery 导出作业,该作业会将表数据写入 Cloud Storage。然后,连接器会从 Cloud Storage 读取数据。
  • 直接表读取。此选项比导出作业更快,因为它使用 BigQuery Storage Read API 并跳过导出步骤。如需使用直接表读取,请在构建流水线时调用 withMethod(Method.DIRECT_READ)

选择使用哪种选项时,请考虑以下几点:

  • 通常,我们建议使用直接表读取。Storage Read API 比导出作业更适合数据流水线,因为它不需要导出数据的中间步骤。

  • 如果您使用直接读取,则需要支付 Storage Read API 的使用费。请参阅 BigQuery 价格页面中的数据提取价格

  • 导出作业不产生额外费用。但是,导出作业存在一些限制。如果需要移动大量数据,而及时性最为重要且费用可调整,则建议使用直接读取。

  • Storage Read API 具有配额限制。使用 Google Cloud 指标监控配额用量。

  • 如果您使用导出作业,请设置 --tempLocation 流水线选项,为导出的文件指定 Cloud Storage 存储桶。

  • 使用 Storage Read API 时,您可能会在日志中看到租期到期和会话超时错误,例如:

    • DEADLINE_EXCEEDED
    • Server Unresponsive
    • StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session

    当操作花费的时间超过超时值时,可能会发生这些错误,通常发生在运行超过 6 小时的流水线中。为了缓解此问题,请改用文件导出功能。

  • 并行度取决于读取方法:

    • 直接读取:I/O 连接器会根据导出请求的大小生成动态数量的流。它直接从 BigQuery 并行读取这些流。

    • 导出作业:BigQuery 确定要写入 Cloud Storage 的文件数量。文件数取决于查询和数据量。I/O 连接器会并行读取导出的文件。

下表显示了各种 BigQuery I/O 读取选项的性能指标。工作负载使用 Java 版 Apache Beam SDK 2.49.0 在一个 e2-standard2 工作器上运行。它们未使用 Runner v2。

1 亿条记录 | 1 KB | 1 列 吞吐量(字节) 吞吐量(元素)
Storage Read 120 MBps 每秒 88,000 个元素
Avro Export 105 MBps 每秒 78,000 个元素
Json Export 110 MBps 每秒 81,000 个元素

这些指标基于简单的批处理流水线。它们旨在比较 I/O 连接器之间的性能,不一定代表实际流水线。Dataflow 流水线性能很复杂,它受到多个因素的影响,包括虚拟机类型、正在处理的数据量、外部来源和接收器的性能以及用户代码。指标基于运行 Java SDK,不代表其他语言 SDK 的性能特征。如需了解详情,请参阅 Beam IO 性能

示例

以下代码示例使用 BigQueryIO 连接器进行直接表读取。如需改用导出作业,请省略对 withMethod 的调用。

读取 Avro 格式的记录

此示例展示了如何使用 BigQueryIO 连接器读取 Avro 格式的记录。

如需将 BigQuery 数据读取到 Avro 格式的记录中,请使用 read(SerializableFunction) 方法。此方法接受应用定义的函数,该函数会解析 SchemaAndRecord 对象并返回自定义数据类型。连接器的输出是自定义数据类型的 PCollection

以下代码会从 BigQuery 表中读取 PCollection<MyData>,其中 MyData 是应用定义的类。

Java

如需向 Dataflow 进行身份验证,请设置应用默认凭证。如需了解详情,请参阅为本地开发环境设置身份验证

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadAvro {

  // A custom datatype to hold a record from the source table.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    // Function to convert Avro records to MyData instances.
    public static class FromSchemaAndRecord
            implements SerializableFunction<SchemaAndRecord, MyData> {
      @Override public MyData apply(SchemaAndRecord elem) {
        MyData data = new MyData();
        GenericRecord record = elem.getRecord();
        data.name = ((Utf8) record.get("user_name")).toString();
        data.age = (Long) record.get("age");
        return data;
      }
    }
  }

  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into Avro records, using an application-defined parsing function.
        .apply(BigQueryIO.read(new MyData.FromSchemaAndRecord())
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<MyData>.
        .apply(MapElements
            .into(TypeDescriptor.of(MyData.class))
            .via((MyData x) -> {
              System.out.printf("Name: %s, Age: %d%n", x.name, x.age);
              return x;
            }));
    pipeline.run().waitUntilFinish();
  }
}

read 方法采用 SerializableFunction<SchemaAndRecord, T> 接口,该接口定义了一个函数,用于将 Avro 记录转换为自定义数据类。在上面的代码示例中,MyData.apply 方法实现了此转换函数。示例函数会解析 Avro 记录中的 nameage 字段,并返回 MyData 实例。

如需指定要读取的 BigQuery 表,请调用 from 方法,如上述示例所示。如需了解详情,请参阅 BigQuery I/O 连接器文档中的表名称

读取 TableRow 对象

此示例展示了如何使用 BigQueryIO 连接器读取 TableRow 对象。

readTableRows 方法会将 BigQuery 数据读入 TableRow 对象的 PCollection 中。每个 TableRow 都是一个键值对映射,用于保存一行表数据。可以通过调用 from 方法指定要读取的 BigQuery 表。

以下代码会从 BigQuery 表中读取 PCollection<TableRows>

Java

如需向 Dataflow 进行身份验证,请设置应用默认凭证。如需了解详情,请参阅为本地开发环境设置身份验证

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BiqQueryReadTableRows {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(Method.DIRECT_READ)
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = (String) row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

此示例还展示了如何访问 TableRow 字典中的值。整数值会被编码为字符串,以匹配 BigQuery 导出的 JSON 格式。

后续步骤