将数据从 Cloud Storage 读取到 Dataflow

如需将数据从 Cloud Storage 读取到 Dataflow,请使用 Apache Beam TextIOAvroIO I/O 连接器

添加 Google Cloud 库依赖项

如需将 TextIOAvroIO 连接器与 Cloud Storage 搭配使用,请添加以下依赖项。此库为 "gs://" 文件名提供架构处理程序。

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Go

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

如需了解详情,请参阅安装 Apache Beam SDK

在 Dataflow 上为 Apache Beam I/O 连接器启用 gRPC

您可以通过 Dataflow 上的 Apache Beam I/O 连接器使用 gRPC 连接到 Cloud StoragegRPC 是 Google 开发的高性能开源远程过程调用 (RPC) 框架,可用于与 Cloud Storage 交互。

如需加快 Dataflow 作业对 Cloud Storage 的读取请求速度,您可以在 Dataflow 上启用 Apache Beam I/O 连接器以使用 gRPC。

命令行

  1. 确保您使用的是 Apache Beam SDK 版本 2.55.0 或更高版本。
  2. 如需运行 Dataflow 作业,请使用 --additional-experiments=use_grpc_for_gcs 流水线选项。如需了解不同的流水线选项,请参阅可选标志

Apache Beam SDK

  1. 确保您使用的是 Apache Beam SDK 版本 2.55.0 或更高版本。
  2. 如需运行 Dataflow 作业,请使用 --experiments=use_grpc_for_gcs 流水线选项。如需了解不同的流水线选项,请参阅基本选项

您可以在 Dataflow 上配置 Apache Beam I/O 连接器,以便在 Cloud Monitoring 中生成与 gRPC 相关的指标。gRPC 相关指标可帮助您执行以下操作:

  • 监控和优化对 Cloud Storage 的 gRPC 请求的性能。
  • 排查和调试问题。
  • 深入了解应用的使用情况和行为。

如需了解如何在 Dataflow 上配置 Apache Beam I/O 连接器以生成与 gRPC 相关的指标,请参阅使用客户端指标。 如果您的用例不需要收集指标,您可以选择停用指标收集。 如需了解相关说明,请参阅停用客户端指标

最大并行数量

TextIOAvroIO 连接器支持两级并行处理:

  • 各个文件都有单独的键,因此多个工作器都可以读取它们。
  • 如果文件未压缩,则连接器可以单独读取每个文件的子范围,从而产生高度并行性。只有当文件中的每一行都是有意义的记录时,才能进行这种拆分。例如,在默认情况下,它不适用于 JSON 文件。

性能

下表展示了从 Cloud Storage 读取数据的性能指标。工作负载使用 Java 版 Apache Beam SDK 2.49.0 在一个 e2-standard2 工作器上运行。它们未使用 Runner v2。

1 亿条记录 | 1 KB | 1 列 吞吐量(字节) 吞吐量(元素)
读取 320 MBps 每秒 320,000 个元素

这些指标基于简单的批处理流水线。它们旨在比较 I/O 连接器之间的性能,不一定代表实际流水线。Dataflow 流水线性能很复杂,它受到多个因素的影响,包括虚拟机类型、正在处理的数据量、外部来源和接收器的性能以及用户代码。指标基于运行 Java SDK,不代表其他语言 SDK 的性能特征。如需了解详情,请参阅 Beam IO 性能

最佳做法

  • 避免将 watchForNewFiles 与 Cloud Storage 搭配使用。对于大型生产流水线来说,此方法的扩缩性较差,因为连接器必须在内存中保留可见文件的列表。该列表无法从内存中清空,这会随着时间的推移减少工作器的工作记忆。请考虑改用适用于 Cloud Storage 的 Pub/Sub 通知。如需了解详情,请参阅文件处理模式

  • 如果文件名和文件内容都是有用的数据,请使用 FileIO 类别读取文件名。例如,文件名可能包含在处理文件中的数据时有用的元数据。如需了解详情,请参阅访问文件名FileIO 文档还介绍了此模式的示例。

示例

以下示例展示了如何从 Cloud Storage 读取数据。

Java

如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class ReadFromStorage {
  public static Pipeline createPipeline(Options options) {
    var pipeline = Pipeline.create(options);
    pipeline
        // Read from a text file.
        .apply(TextIO.read().from(
            "gs://" + options.getBucket() + "/*.txt"))
        .apply(
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (x -> {
                      System.out.println(x);
                      return x;
                    })));
    return pipeline;
  }
}

后续步骤