从 BigQuery 读取到 Dataflow

本文档介绍如何使用 Apache Beam BigQuery I/O 连接器将数据从 BigQuery 读取到 Dataflow。

概览

BigQuery I/O 连接器支持使用以下两种方法从 BigQuery 读取数据:

  • 直接表读取。这个选项是最快的,因为它使用 BigQuery Storage Read API
  • 导出作业。如果使用此选项,BigQuery 会运行导出作业,以将表数据写入 Cloud Storage。然后,连接器会从 Cloud Storage 读取导出的数据。此选项效率较低,因为它需要导出步骤。

导出作业是默认选项。如需指定直接读取,请调用 withMethod(Method.DIRECT_READ)

连接器将表数据序列化为 PCollectionPCollection 中的每个元素表示一个表行。连接器支持以下序列化方法:

最大并行数量

此连接器中的并行性取决于读取方法:

  • 直接读取:I/O 连接器会根据导出请求的大小生成动态数量的流。它直接从 BigQuery 并行读取这些流。

  • 导出作业:BigQuery 确定要写入 Cloud Storage 的文件数量。文件数取决于查询和数据量。I/O 连接器会并行读取导出的文件。

性能

下表显示了各种 BigQuery I/O 读取选项的性能指标。工作负载使用 Java 版 Apache Beam SDK 2.49.0 在一个 e2-standard2 工作器上运行。它们未使用 Runner v2。

1 亿条记录 | 1 KB | 1 列 吞吐量(字节) 吞吐量(元素)
Storage Read 120 MBps 每秒 88,000 个元素
Avro Export 105 MBps 每秒 78,000 个元素
Json Export 110 MBps 每秒 81,000 个元素

这些指标基于简单的批处理流水线。它们旨在比较 I/O 连接器之间的性能,不一定代表实际流水线。Dataflow 流水线性能很复杂,它受到多个因素的影响,包括虚拟机类型、正在处理的数据量、外部来源和接收器的性能以及用户代码。指标基于运行 Java SDK,不代表其他语言 SDK 的性能特征。如需了解详情,请参阅 Beam IO 性能

最佳做法

  • 通常,我们建议使用直接表读取 (Method.DIRECT_READ)。Storage Read API 比导出作业更适合数据流水线,因为它不需要导出数据的中间步骤。

  • 如果您使用直接读取,则需要支付 Storage Read API 的使用费。请参阅 BigQuery 价格页面中的数据提取价格

  • 导出作业不产生额外费用。但是,导出作业存在一些限制。如果需要移动大量数据,而及时性最为重要且费用可调整,则建议使用直接读取。

  • Storage Read API 具有配额限制。使用 Google Cloud 指标监控配额用量。

  • 使用 Storage Read API 时,您可能会在日志中看到租期到期和会话超时错误,例如:

    • DEADLINE_EXCEEDED
    • Server Unresponsive
    • StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session`

    当操作花费的时间超过超时值时,可能会发生这些错误,通常发生在运行超过 6 小时的流水线中。为了缓解此问题,请改用文件导出功能。

  • 使用 Java SDK 时,请考虑创建一个表示 BigQuery 表架构的类。然后,在流水线中调用 useBeamSchema,以便在 Apache Beam Row 和 BigQuery TableRow 类型之间自动转换。如需查看架构类的示例,请参阅 ExampleModel.java

示例

本部分中的代码示例使用直接表读取。

如需改用导出作业,请省略对 withMethod 的调用或指定 Method.EXPORT。然后设置 --tempLocation 流水线选项,为导出的文件指定 Cloud Storage 存储桶。

这些代码示例假定源表有以下列:

  • name(字符串)
  • age(整数)

指定为 JSON 架构文件

[
  {"name":"user_name","type":"STRING","mode":"REQUIRED"},
  {"name":"age","type":"INTEGER","mode":"REQUIRED"}
]

读取 Avro 格式的记录

如需将 BigQuery 数据读入 Avro 格式的记录,请使用 read(SerializableFunction) 方法。此方法接受应用定义的函数,该函数会解析 SchemaAndRecord 对象并返回自定义数据类型。连接器的输出是自定义数据类型的 PCollection

以下代码会从 BigQuery 表中读取 PCollection<MyData>,其中 MyData 是应用定义的类。

Java

如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadAvro {

  // A custom datatype to hold a record from the source table.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    // Function to convert Avro records to MyData instances.
    public static class FromSchemaAndRecord
            implements SerializableFunction<SchemaAndRecord, MyData> {
      @Override public MyData apply(SchemaAndRecord elem) {
        MyData data = new MyData();
        GenericRecord record = elem.getRecord();
        data.name = ((Utf8) record.get("user_name")).toString();
        data.age = (Long) record.get("age");
        return data;
      }
    }
  }

  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into Avro records, using an application-defined parsing function.
        .apply(BigQueryIO.read(new MyData.FromSchemaAndRecord())
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<MyData>.
        .apply(MapElements
            .into(TypeDescriptor.of(MyData.class))
            .via((MyData x) -> {
              System.out.printf("Name: %s, Age: %d%n", x.name, x.age);
              return x;
            }));
    pipeline.run().waitUntilFinish();
  }
}

read 方法采用 SerializableFunction<SchemaAndRecord, T> 接口,该接口定义了一个函数,用于将 Avro 记录转换为自定义数据类。在上面的代码示例中,MyData.apply 方法实现了此转换函数。示例函数会解析 Avro 记录中的 nameage 字段,并返回 MyData 实例。

如需指定要读取的 BigQuery 表,请调用 from 方法,如上述示例所示。如需了解详情,请参阅 BigQuery I/O 连接器文档中的表名称

读取 TableRow 对象

readTableRows 方法会将 BigQuery 数据读入 TableRow 对象的 PCollection。每个 TableRow 都是一个键值对映射,用于保存一行表数据。可以通过调用 from 方法指定要读取的 BigQuery 表。

以下代码会从 BigQuery 表中读取 PCollection<TableRows>

Java

如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BiqQueryReadTableRows {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(Method.DIRECT_READ)
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = (String) row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

此示例还展示了如何访问 TableRow 字典中的值。整数值会被编码为字符串,以匹配 BigQuery 导出的 JSON 格式。

列投影和过滤

使用直接读取 (Method.DIRECT_READ) 时,您可以减少从 BigQuery 读取并通过网络发送的数据量,从而提高读取操作的效率。

  • 列投影:调用 withSelectedFields 以从表中读取部分列。这样,当表包含许多列时,可实现高效读取。
  • 行过滤:调用 withRowRestriction 以指定在服务器端过滤数据的谓词。

过滤条件谓词必须具有确定性,并且不支持汇总。

以下示例会投影 "user_name""age" 列,并过滤掉与谓词 "age > 18" 不匹配的行。

Java

如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadWithProjectionAndFiltering {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(BigQueryIO.readTableRows()
            // Read rows from a specified table.
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ)
            .withSelectedFields(Arrays.asList("user_name", "age"))
            .withRowRestriction("age > 18")
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

从查询结果中读取

前面的示例展示了如何从表中读取行。您还可以通过调用 fromQuery 从 SQL 查询结果中读取。此方法将一些计算工作迁移到 BigQuery。您还可以使用此方法对 BigQuery 视图或具体化视图运行查询,从而从中读取数据。

以下示例会对 BigQuery 公共数据集运行查询并读取结果。流水线运行后,您可以在 BigQuery 作业历史记录中看到查询作业。

Java

如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadFromQuery {
  public static void main(String[] args) {
    // The SQL query to run inside BigQuery.
    final String queryString =
        "SELECT repo_name as repo, COUNT(*) as count "
            + "FROM `bigquery-public-data.github_repos.sample_commits` "
            + "GROUP BY repo_name";

    // Parse the pipeline options passed into the application.
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation().create();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read the query results into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .fromQuery(queryString)
            .usingStandardSql()
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((TableRow row) -> {
              System.out.printf("Repo: %s, commits: %s%n", row.get("repo"), row.get("count"));
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

后续步骤