从 Dataflow 写入 Cloud Storage

本文档介绍了如何使用 Apache Beam TextIO I/O 连接器将文本数据从 Dataflow 写入 Cloud Storage。

添加 Google Cloud 库依赖项

如需将 TextIO 连接器与 Cloud Storage 搭配使用,请添加以下依赖项。此库为 "gs://" 文件名提供架构处理程序。

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Go

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

如需了解详情,请参阅安装 Apache Beam SDK

最大并行数量

并行性主要由分片数决定。默认情况下,运行程序会自动设置此值。对于大多数流水线,建议使用默认行为。在本文档中,请参阅最佳实践

性能

下表展示了将数据写入 Cloud Storage 的性能指标。工作负载使用 Java 版 Apache Beam SDK 2.49.0 在一个 e2-standard2 工作器上运行。它们未使用 Runner v2。

1 亿条记录 | 1 KB | 1 列 吞吐量(字节) 吞吐量(元素)
Write 130 MBps 每秒 130,000 个元素

这些指标基于简单的批处理流水线。它们旨在比较 I/O 连接器之间的性能,不一定代表实际流水线。Dataflow 流水线性能很复杂,它受到多个因素的影响,包括虚拟机类型、正在处理的数据量、外部来源和接收器的性能以及用户代码。指标基于运行 Java SDK,不代表其他语言 SDK 的性能特征。如需了解详情,请参阅 Beam IO 性能

最佳做法

  • 一般来说,请避免设置特定数量的分片。这样,运行程序就可以为您的缩放比例选择合适的值。如果调节分片数量,建议每个分片写入 100 MB 到 1 GB 的数据。不过,最佳值可能取决于工作负载。

  • Cloud Storage 可以扩容以支持每秒处理大量请求。但是,如果流水线写入量急剧增加,请考虑写入多个存储桶,以避免任何单个 Cloud Storage 存储桶暂时发生过载。

  • 一般而言,当每次写入量都很大(1 KB 或更大)时,将数据写入 Cloud Storage 会更加高效。将小型记录写入大量文件可能会导致每个字节的性能变差。

  • 生成文件名时,请考虑使用非连续的文件名,以便分摊负载。如需了解详情,请参阅使用命名惯例将负载均匀分配到多个键范围中

  • 命名文件时,请勿使用“@”符号,后跟数字或星号“*”。如需了解详情,请参阅“@*”和“@N”是预留的分片规范

示例:将文本文件写入 Cloud Storage

以下示例会创建一个批处理流水线,该流水线使用 GZIP 压缩来写入文本文件:

Java

如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BatchWriteStorage {
  public interface Options extends PipelineOptions {
    @Description("The Cloud Storage bucket to write to")
    String getBucketName();

    void setBucketName(String value);
  }

  // Write text data to Cloud Storage
  public static void main(String[] args) {
    final List<String> wordsList = Arrays.asList("1", "2", "3", "4");

    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        .apply(Create
            .of(wordsList))
        .apply(TextIO
            .write()
            .to(options.getBucketName())
            .withSuffix(".txt")
            .withCompression(Compression.GZIP)
        );
    pipeline.run().waitUntilFinish();
  }
}

如果输入 PCollection 是无界限的,您必须在集合上定义窗口或触发器,然后通过调用 TextIO.Write.withWindowedWrites 指定窗口化写入。

Python

如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import argparse
from typing import List

import apache_beam as beam
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def write_to_cloud_storage(argv: List[str] = None) -> None:
    # Parse the pipeline options passed into the application.
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option that specfies the Cloud Storage bucket.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--output", required=True)

    wordsList = ["1", "2", "3", "4"]
    options = MyOptions()

    with beam.Pipeline(options=options.view_as(PipelineOptions)) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(wordsList)
            | "Write Files" >> WriteToText(options.output, file_name_suffix=".txt")
        )

对于输出路径,请指定包含存储桶名称和文件名前缀的 Cloud Storage 路径。例如,如果您指定 gs://my_bucket/output/file,则 TextIO 连接器将写入名为 my_bucket 的 Cloud Storage 存储桶,并且输出文件的前缀为 output/file*

默认情况下,TextIO 连接器使用像 <file-prefix>-00000-of-00001 这样的命名惯例对输出文件进行分片。(可选)您可以指定文件名后缀和压缩方案,如示例中所示。

为确保幂等性写入,Dataflow 将数据写入临时文件,然后将完成的临时文件复制到最终文件中。 如需控制这些临时文件的存储位置,请使用 withTempDirectory 方法。

后续步骤