從 Dataflow 寫入 Cloud Storage

本文說明如何使用 Apache Beam TextIO I/O 連接器,將 Dataflow 中的文字資料寫入 Cloud Storage。

加入 Google Cloud 程式庫依附元件

如要搭配 Cloud Storage 使用 TextIO 連接器,請加入下列依附元件。這個程式庫提供 "gs://" 檔案名的結構定義處理常式。

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Go

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

詳情請參閱「安裝 Apache Beam SDK」。

在 Dataflow 上啟用 Apache Beam I/O 連接器的 gRPC

您可以透過 Dataflow 上的 Apache Beam I/O 連接器,使用 gRPC 連線至 Cloud StoragegRPC 是 Google 開發的高效能開放原始碼遠端程序呼叫 (RPC) 架構,可用於與 Cloud Storage 互動。

如要加快 Dataflow 工作對 Cloud Storage 的寫入要求,您可以在 Dataflow 上啟用 Apache Beam I/O 連接器,以使用 gRPC。

指令列

  1. 請確認您使用的是 Apache Beam SDK 2.55.0 以上版本。
  2. 如要執行 Dataflow 工作,請使用 --additional-experiments=use_grpc_for_gcs 管道選項。如要瞭解不同的管道選項,請參閱「選用標記」。

Apache Beam SDK

  1. 請確認您使用的是 Apache Beam SDK 2.55.0 以上版本。
  2. 如要執行 Dataflow 工作,請使用 --experiments=use_grpc_for_gcs 管道選項。如要瞭解不同的管道選項,請參閱基本選項

您可以在 Dataflow 上設定 Apache Beam I/O 連接器,在 Cloud Monitoring 中產生 gRPC 相關指標。gRPC 相關指標可協助您執行下列操作:

  • 監控及最佳化傳送至 Cloud Storage 的 gRPC 要求效能。
  • 排解及偵錯問題。
  • 深入瞭解應用程式的使用情形和行為。

如要瞭解如何在 Dataflow 上設定 Apache Beam I/O 連接器,以產生 gRPC 相關指標,請參閱「使用用戶端指標」。 如果您的用途不需要收集指標,可以選擇停用指標收集功能。 如需操作說明,請參閱「停用用戶端指標」。

平行處理工作數量

平行處理數主要取決於分片數量。根據預設,執行器會自動設定這個值。對於大多數管道,建議使用預設行為。請參閱本文中的「最佳做法」。

成效

下表顯示寫入 Cloud Storage 的效能指標。這些工作負載是在一個 e2-standard2 工作站上執行,並使用 Java 適用的 Apache Beam SDK 2.49.0。他們沒有使用 Runner v2。

1 億筆記錄 | 1 kB | 1 個資料欄 處理量 (位元組) 處理量 (元素)
撰寫 130 MBps 每秒 130,000 個元素

這些指標是以簡單的批次管道為依據。這些基準旨在比較 I/O 連接器之間的效能,不一定代表實際的管道。Dataflow 管道效能相當複雜,取決於 VM 類型、處理的資料、外部來源和接收器的效能,以及使用者程式碼。這些指標是根據執行 Java SDK 取得,無法代表其他語言 SDK 的效能特徵。詳情請參閱「Beam IO 效能」。

最佳做法

  • 一般而言,請避免設定特定數量的分片。這樣一來,跑步者就能為你的跑步選擇適當的配速。如要啟用自動分片,請呼叫 .withAutoSharding(),而非 .withNumShards(0)。如果調整分片數量,建議每個分片寫入 100 MB 到 1 GB 的資料。不過,最佳值可能取決於工作負載。

  • Cloud Storage 每秒可處理大量要求。不過,如果管道的寫入量大幅增加,請考慮寫入多個值區,避免任何單一 Cloud Storage 值區暫時過載。

  • 一般來說,每次寫入的資料量越大 (1 KB 以上),寫入 Cloud Storage 的效率就越高。將小型記錄寫入大量檔案,可能會導致每個位元組的效能變差。

  • 產生檔案名稱時,請考慮使用非連續的檔案名稱,以分散負載。詳情請參閱「善用命名慣例,將負載平均分配到各索引鍵範圍」。

  • 命名檔案時,請勿使用「@」符號,後面接著數字或星號「*」。詳情請參閱「@* 和 @N 是保留的分片規格」。

範例:將文字檔案寫入 Cloud Storage

以下範例會建立批次管道,使用 GZIP 壓縮寫入文字檔:

Java

如要向 Dataflow 進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。

import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BatchWriteStorage {
  public interface Options extends PipelineOptions {
    @Description("The Cloud Storage bucket to write to")
    String getBucketName();

    void setBucketName(String value);
  }

  // Write text data to Cloud Storage
  public static void main(String[] args) {
    final List<String> wordsList = Arrays.asList("1", "2", "3", "4");

    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        .apply(Create
            .of(wordsList))
        .apply(TextIO
            .write()
            .to(options.getBucketName())
            .withSuffix(".txt")
            .withCompression(Compression.GZIP)
        );
    pipeline.run().waitUntilFinish();
  }
}

如果輸入 PCollection 不受限制,您必須在集合中定義視窗或觸發條件,然後呼叫 TextIO.Write.withWindowedWrites 指定視窗寫入。

Python

如要向 Dataflow 進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。

import argparse
from typing import List

import apache_beam as beam
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def write_to_cloud_storage(argv: List[str] = None) -> None:
    # Parse the pipeline options passed into the application.
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option that specfies the Cloud Storage bucket.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--output", required=True)

    wordsList = ["1", "2", "3", "4"]
    options = MyOptions()

    with beam.Pipeline(options=options.view_as(PipelineOptions)) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(wordsList)
            | "Write Files" >> WriteToText(options.output, file_name_suffix=".txt")
        )

輸出路徑請指定 Cloud Storage 路徑,包括值區名稱和檔案名前置字串。舉例來說,如果您指定 gs://my_bucket/output/fileTextIO 連接器會寫入名為 my_bucket 的 Cloud Storage bucket,且輸出檔案的前置字串為 output/file*

根據預設,TextIO 連接器會使用類似 <file-prefix>-00000-of-00001 的命名慣例,將輸出檔案分片。如範例所示,您可以選擇指定檔案名稱後置字元和壓縮配置。

為確保寫入作業具備冪等性,Dataflow 會先寫入暫存檔案,然後將完成的暫存檔案複製到最終檔案。如要控管這些暫存檔案的儲存位置,請使用 withTempDirectory 方法。

後續步驟