從 Dataflow 寫入 BigQuery

本文說明如何將 Dataflow 的資料寫入 BigQuery。

總覽

在大多數情況下,建議使用受管理 I/O 寫入 BigQuery。代管 I/O 提供自動升級和一致的設定 API 等功能。寫入 BigQuery 時,Managed I/O 會自動為批次或串流工作選擇最佳寫入方法。

如需更進階的效能調整,請考慮使用 BigQueryIO 連接器。詳情請參閱本文中的「使用 BigQueryIO 連接器」。

成效

下表列出各種工作負載的效能指標。這些工作負載是在一個 e2-standard2 工作站上執行,使用的 Apache Beam SDK 2.49.0 適用於 Java。他們沒有使用 Runner v2。

1 億筆記錄 | 1 kB | 1 個資料欄 處理量 (位元組) 處理量 (元素)
儲存空間寫入 55 MBps 每秒 54,000 個元素
Avro Load 78 MB/秒 每秒 77,000 個元素
Json Load 54 MBps 每秒 53,000 個元素

這些指標是以簡單的批次管道為依據。這些基準旨在比較 I/O 連接器之間的效能,不一定代表實際的管道。Dataflow 管道效能相當複雜,取決於 VM 類型、處理的資料、外部來源和接收器的效能,以及使用者程式碼。這些指標是根據執行 Java SDK 取得,無法代表其他語言 SDK 的效能特徵。詳情請參閱「Beam IO 效能」。

使用 BigQueryIO 連接器

BigQuery I/O 連接器支援下列 BigQuery 寫入方法:

  • STORAGE_WRITE_API。在這個模式下,連接器會使用 BigQuery Storage Write API,直接將資料寫入 BigQuery 儲存空間。Storage Write API 將串流擷取和批次載入功能合併為單一的高效能 API。這個模式可確保語意為「剛好一次」。
  • STORAGE_API_AT_LEAST_ONCE。這個模式也會使用 Storage Write API,但至少會提供一次語意。這個模式可降低大多數管道的延遲時間。但可能會重複寫入。
  • FILE_LOADS. 在這個模式下,連接器會將輸入資料寫入 Cloud Storage 中的暫存檔案。然後執行 BigQuery load job,將資料載入 BigQuery。這個模式是受限 PCollections 的預設模式,最常見於批次管道。
  • STREAMING_INSERTS。在此模式下,連接器會使用舊版串流 API。這個模式是無界限 PCollections 的預設模式,但不建議用於新專案。

選擇寫入方法時,請考量下列幾點:

  • 如果是串流工作,請考慮使用 STORAGE_WRITE_APISTORAGE_API_AT_LEAST_ONCE,因為這些模式會直接寫入 BigQuery 儲存空間,不會使用中繼暫存檔案。
  • 如果使用至少一次串流模式執行管道,請將寫入模式設為 STORAGE_API_AT_LEAST_ONCE。這項設定效率更高,且符合「至少一次」串流模式的語意。
  • 檔案載入和 Storage Write API 有不同的配額和限制
  • 載入工作會使用共用的 BigQuery 運算單元集區或預留運算單元。如要使用預留運算單元,請在具有 PIPELINE 類型保留項目指派作業的專案中執行載入工作。如果您使用共用 BigQuery 運算單元集區,載入工作就不會產生費用。不過,BigQuery 不保證共用集區的可用容量。詳情請參閱「預留項目簡介」。

平行處理工作數量

  • 在串流管道中,連接器會將 FILE_LOADSSTORAGE_WRITE_API 的資料分片至多個檔案或串流。一般來說,我們建議呼叫 withAutoSharding 來啟用自動分片。

  • 在批次管道中,連接器會將資料寫入已分割的檔案,然後平行載入 BigQuery。FILE_LOADS

  • 在批次管道中,每個工作人員會建立一或多個串流,以便寫入 BigQuery,具體數量取決於分片總數。STORAGE_WRITE_API

  • 對於 STORAGE_API_AT_LEAST_ONCE,只有一個預設寫入串流。多個工作站會附加至這個串流。

最佳做法

  • Storage Write API 有配額限制。連接器會處理大部分管道的這些限制。不過,在某些情況下,可用的 Storage Write API 串流可能會耗盡。舉例來說,如果管線使用自動分片和自動調度資源,且目的地數量龐大,就可能發生這個問題,尤其是在長時間執行的工作負載變動劇烈時。如果發生這個問題,請考慮使用 STORAGE_WRITE_API_AT_LEAST_ONCE,即可避免問題。

  • 使用 Google Cloud 指標監控 Storage Write API 配額用量。

  • 使用檔案載入時,Avro 通常比 JSON 表現更出色。如要使用 Avro,請呼叫 withAvroFormatFunction

  • 根據預設,載入工作會在與 Dataflow 工作相同的專案中執行。如要指定其他專案,請呼叫 withLoadJobProjectId

  • 使用 Java SDK 時,請考慮建立代表 BigQuery 資料表結構定義的類別。然後在管道中呼叫 useBeamSchema,自動在 Apache Beam Row 和 BigQuery TableRow 類型之間轉換。如需結構定義類別的範例,請參閱 ExampleModel.java

  • 如果載入的資料表含有數千個欄位的複雜結構定義,請考慮呼叫 withMaxBytesPerPartition,為每個載入工作設定較小的最大大小。

  • 根據預設,BigQueryIO 會使用適用於大多數管道的 Storage Write API 設定。不過,如果發現效能問題,可以設定管道選項來調整這些設定。詳情請參閱 Apache Beam 說明文件中的「調整 Storage Write API」。

串流管道

以下建議適用於串流管道。

  • 如果是串流管道,建議使用 Storage Write API (STORAGE_WRITE_APISTORAGE_API_AT_LEAST_ONCE)。

  • 串流管道可以使用檔案載入,但這種做法有缺點:

    • 因此需要視窗化才能寫入檔案。您無法使用全域視窗。
    • 使用共用運算單元集區時,BigQuery 會盡量載入檔案。記錄寫入後,可能要過一段時間才會在 BigQuery 中顯示。
    • 如果載入工作失敗 (例如,由於資料有誤或結構定義不符),整個管道就會失敗。
  • 建議盡可能使用 STORAGE_WRITE_API_AT_LEAST_ONCE。這可能會導致重複的記錄寫入 BigQuery,但與 STORAGE_WRITE_API 相比,費用較低且更具擴充性。

  • 一般來說,請避免使用 STREAMING_INSERTS。串流插入的費用比 Storage Write API 高,效能也不如後者。

  • 資料分片可提升串流管道的效能。對於大多數管道,自動分片是個不錯的起點。不過,您可以按照下列方式調整分片:

  • 如果您使用串流插入,建議將 retryTransientErrors 設為重試政策

批次管道

下列建議適用於批次管道。

  • 對於大多數大型批次管道,我們建議先嘗試 FILE_LOADS。批次管道可以使用 STORAGE_WRITE_API,但如果規模龐大 (超過 1,000 個 vCPU) 或同時執行多個管道,就可能超出配額限制。Apache Beam 不會限制批次 STORAGE_WRITE_API 工作的寫入串流數量上限,因此工作最終會達到 BigQuery Storage API 限制。

  • 使用 FILE_LOADS 時,您可能會用盡共用 BigQuery 運算單元集區或保留運算單元集區。如果發生這類失敗情況,請嘗試下列方法:

    • 減少工作的工作站數量上限或工作站大小。
    • 購買更多預留運算單元
    • 建議使用 STORAGE_WRITE_API
  • 如果管道規模不大 (少於 1,000 個 vCPU),建議使用 STORAGE_WRITE_API。如果是這類較小的工作,建議您使用 STORAGE_WRITE_API (如需無法傳送的訊息佇列),或在 FILE_LOADS 共用運算單元集區不足時使用。

  • 如果可以容忍重複資料,請考慮使用 STORAGE_WRITE_API_AT_LEAST_ONCE。這個模式可能會導致重複的記錄寫入 BigQuery,但費用可能比 STORAGE_WRITE_API 選項低。

  • 不同的寫入模式可能會根據管道的特性而有不同的執行方式。進行實驗,找出最適合工作負載的寫入模式。

處理列層級錯誤

本節說明如何處理可能在列層級發生的錯誤,例如輸入資料格式錯誤或結構定義不符。

如果是 Storage Write API,無法寫入的資料列會放入另一個 PCollection。如要取得這個集合,請在 WriteResult 物件上呼叫 getFailedStorageApiInserts。如需這個方法的範例,請參閱「將資料串流至 BigQuery」。

建議您將錯誤傳送至死信佇列或資料表,以供日後處理。如要進一步瞭解這個模式,請參閱BigQueryIO dead letter 模式

如果是 FILE_LOADS,載入資料時發生錯誤,載入工作就會失敗,管道也會擲回執行階段例外狀況。您可以在 Dataflow 記錄中查看錯誤,或查看 BigQuery 工作記錄。I/O 連接器不會傳回個別失敗資料列的相關資訊。

如要進一步瞭解如何排解錯誤,請參閱「BigQuery 連接器錯誤」。

範例

下列範例說明如何使用 Dataflow 寫入 BigQuery。這些範例使用 BigQueryIO 連接器。

寫入現有資料表

以下範例會建立批次管道,將 PCollection<MyData> 寫入 BigQuery,其中 MyData 是自訂資料型別。

BigQueryIO.write() 方法會傳回 BigQueryIO.Write<T> 型別,用於設定寫入作業。詳情請參閱 Apache Beam 說明文件中的「寫入資料表」。這個程式碼範例會寫入現有資料表 (CREATE_NEVER),並將新資料列附加至資料表 (WRITE_APPEND)。

Java

如要向 Dataflow 進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWrite {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to an exiting BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API));
    pipeline.run().waitUntilFinish();
  }
}

寫入新資料表或現有資料表

如果目的地資料表不存在,下列範例會將建立處置設為 CREATE_IF_NEEDED,藉此建立新資料表。使用這個選項時,您必須提供表格結構定義。如果連接器建立新資料表,就會使用這個結構定義。

Java

如要向 Dataflow 進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWriteWithSchema {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Define a table schema. A schema is required for write disposition CREATE_IF_NEEDED.
    TableSchema schema = new TableSchema()
        .setFields(
            Arrays.asList(
                new TableFieldSchema()
                    .setName("user_name")
                    .setType("STRING")
                    .setMode("REQUIRED"),
                new TableFieldSchema()
                    .setName("age")
                    .setType("INT64") // Defaults to NULLABLE
            )
        );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to a new or existing BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withSchema(schema)
            .withMethod(Write.Method.STORAGE_WRITE_API)
        );
    pipeline.run().waitUntilFinish();
  }
}

將資料串流至 BigQuery

以下範例說明如何透過將寫入模式設為 STORAGE_WRITE_API,使用「只傳送一次」語意串流資料:

並非所有串流管道都需要「僅限一次」語意。舉例來說,您或許可以手動移除目的地表格中的重複項目。如果您的情境可接受重複記錄,請考慮將寫入方法設為 STORAGE_API_AT_LEAST_ONCE,使用至少一次語意。一般來說,這種方法效率較高,且大多數管道的延遲時間較短。

Java

如要向 Dataflow 進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class BigQueryStreamExactlyOnce {
  // Create a PTransform that sends simulated streaming data. In a real application, the data
  // source would be an external source, such as Pub/Sub.
  private static TestStream<String> createEventSource() {
    Instant startTime = new Instant(0);
    return TestStream.create(StringUtf8Coder.of())
        .advanceWatermarkTo(startTime)
        .addElements(
            TimestampedValue.of("Alice,20", startTime),
            TimestampedValue.of("Bob,30",
                startTime.plus(Duration.standardSeconds(1))),
            TimestampedValue.of("Charles,40",
                startTime.plus(Duration.standardSeconds(2))),
            TimestampedValue.of("Dylan,Invalid value",
                startTime.plus(Duration.standardSeconds(2))))
        .advanceWatermarkToInfinity();
  }

  public static PipelineResult main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);
    options.setStreaming(true);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Add a streaming data source.
        .apply(createEventSource())
        // Map the event data into TableRow objects.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((String x) -> {
              String[] columns = x.split(",");
              return new TableRow().set("user_name", columns[0]).set("age", columns[1]);
            }))
        // Write the rows to BigQuery
        .apply(BigQueryIO.writeTableRows()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API)
            // For exactly-once processing, set the triggering frequency.
            .withTriggeringFrequency(Duration.standardSeconds(5)))
        // Get the collection of write errors.
        .getFailedStorageApiInserts()
        .apply(MapElements.into(TypeDescriptors.strings())
            // Process each error. In production systems, it's useful to write the errors to
            // another destination, such as a dead-letter table or queue.
            .via(
                x -> {
                  System.out.println("Failed insert: " + x.getErrorMessage());
                  System.out.println("Row: " + x.getRow());
                  return "";
                }));
    return pipeline.run();
  }
}

後續步驟