Pub/Sub Avro to BigQuery テンプレート

Pub/Sub Avro to BigQuery テンプレートは、Pub/Sub サブスクリプションから BigQuery テーブルに Avro データを取り込むストリーミング パイプラインです。BigQuery テーブルへの書き込み中に発生したエラーは、Pub/Sub 未処理トピックにストリーミングされます。

パイプラインの要件

  • 入力 Pub/Sub サブスクリプションが存在していること。
  • Avro レコードのスキーマ ファイルが、Cloud Storage に存在していること。
  • 未処理の Pub/Sub トピックが存在していること。
  • 出力 BigQuery データセットが存在していること。

テンプレートのパラメータ

必須パラメータ

  • schemaPath: Avro スキーマ ファイルがある Cloud Storage の場所。例: gs://path/to/my/schema.avsc
  • inputSubscription: 読み取り元の Pub/Sub 入力サブスクリプション(例: projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_ID>)。
  • outputTableSpec: 出力を書き込む BigQuery 出力テーブルの場所。たとえば、<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME> です。指定された createDisposition によっては、ユーザーが指定した Avro スキーマを使用して出力テーブルが自動的に作成される場合があります。
  • outputTopic: 未処理レコードに使用する Pub/Sub トピック(例: projects/<PROJECT_ID>/topics/<TOPIC_NAME>)。

オプション パラメータ

  • useStorageWriteApiAtLeastOnce: Storage Write API を使用する場合は、書き込みセマンティクスを指定します。at-least-once セマンティクス(https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)を使用するには、このパラメータを true に設定します。exactly-once セマンティクスを使用するには、パラメータを false に設定します。このパラメータは、useStorageWriteApitrue の場合にのみ適用されます。デフォルト値は false です。
  • writeDisposition : BigQuery WriteDisposition(https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload)の値。例: WRITE_APPENDWRITE_EMPTYWRITE_TRUNCATE。デフォルトは WRITE_APPEND です。
  • createDisposition: BigQuery CreateDisposition(https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload)。たとえば、CREATE_IF_NEEDEDCREATE_NEVER です。デフォルトは CREATE_IF_NEEDED です。
  • useStorageWriteApi: true の場合、パイプラインは BigQuery Storage Write API(https://cloud.google.com/bigquery/docs/write-api)を使用します。デフォルト値は false です。詳細については、Storage Write API の使用(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)をご覧ください。
  • numStorageWriteApiStreams: Storage Write API を使用する場合は、書き込みストリームの数を指定します。useStorageWriteApitrue であり、useStorageWriteApiAtLeastOncefalse の場合に、このパラメータを設定する必要があります。デフォルト値は 0 です。
  • storageWriteApiTriggeringFrequencySec: Storage Write API を使用する場合は、トリガーの頻度を秒単位で指定します。useStorageWriteApitrue であり、useStorageWriteApiAtLeastOncefalse の場合に、このパラメータを設定する必要があります。

テンプレートを実行する

コンソールgcloudAPI
  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Pub/Sub Avro to BigQuery template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • SCHEMA_PATH: Avro スキーマ ファイルへの Cloud Storage パス(例: gs://MyBucket/file.avsc
  • SUBSCRIPTION_NAME: Pub/Sub 入力サブスクリプション名
  • BIGQUERY_TABLE: BigQuery 出力テーブル名
  • DEADLETTER_TOPIC: 未処理のキューに使用する Pub/Sub トピック

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • SCHEMA_PATH: Avro スキーマ ファイルへの Cloud Storage パス(例: gs://MyBucket/file.avsc
  • SUBSCRIPTION_NAME: Pub/Sub 入力サブスクリプション名
  • BIGQUERY_TABLE: BigQuery 出力テーブル名
  • DEADLETTER_TOPIC: 未処理のキューに使用する Pub/Sub トピック
Java
/*
 * Copyright (C) 2020 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.BigQueryCommonOptions.WriteOptions;
import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
import com.google.cloud.teleport.v2.options.PubsubCommonOptions.ReadSubscriptionOptions;
import com.google.cloud.teleport.v2.options.PubsubCommonOptions.WriteTopicOptions;
import com.google.cloud.teleport.v2.templates.PubsubAvroToBigQuery.PubsubAvroToBigQueryOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.values.Row;

/**
 * A Dataflow pipeline to stream <a href="https://avro.apache.org/">Apache Avro</a> records from
 * Pub/Sub into a BigQuery table.
 *
 * <p>Any persistent failures while writing to BigQuery will be written to a Pub/Sub dead-letter
 * topic.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/pubsub-binary-to-bigquery/README_PubSub_Avro_to_BigQuery.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "PubSub_Avro_to_BigQuery",
    category = TemplateCategory.STREAMING,
    displayName = "Pub/Sub Avro to BigQuery",
    description =
        "The Pub/Sub Avro to BigQuery template is a streaming pipeline that ingests Avro data from a Pub/Sub "
            + "subscription into a BigQuery table. Any errors which occur while writing to the BigQuery table are streamed into a Pub/Sub unprocessed topic.",
    optionsClass = PubsubAvroToBigQueryOptions.class,
    flexContainerName = "pubsub-avro-to-bigquery",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-avro-to-bigquery",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The input Pub/Sub subscription must exist.",
      "The schema file for the Avro records must exist on Cloud Storage.",
      "The unprocessed Pub/Sub topic must exist.",
      "The output BigQuery dataset must exist."
    },
    streaming = true,
    supportsAtLeastOnce = true)
public final class PubsubAvroToBigQuery {
  /**
   * Validates input flags and executes the Dataflow pipeline.
   *
   * @param args command line arguments to the pipeline
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    PubsubAvroToBigQueryOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(PubsubAvroToBigQueryOptions.class);

    run(options);
  }

  /**
   * Provides custom {@link org.apache.beam.sdk.options.PipelineOptions} required to execute the
   * {@link PubsubAvroToBigQuery} pipeline.
   */
  public interface PubsubAvroToBigQueryOptions
      extends ReadSubscriptionOptions,
          WriteOptions,
          WriteTopicOptions,
          BigQueryStorageApiStreamingOptions {

    @TemplateParameter.GcsReadFile(
        order = 1,
        description = "Cloud Storage path to the Avro schema file",
        helpText =
            "The Cloud Storage location of the Avro schema file. For example, `gs://path/to/my/schema.avsc`.")
    @Required
    String getSchemaPath();

    void setSchemaPath(String schemaPath);

    // Hide the UseStorageWriteApiAtLeastOnce in the UI, because it will automatically be turned
    // on when pipeline is running on ALO mode and using the Storage Write API
    @TemplateParameter.Boolean(
        order = 2,
        optional = true,
        parentName = "useStorageWriteApi",
        parentTriggerValues = {"true"},
        description = "Use at at-least-once semantics in BigQuery Storage Write API",
        helpText =
            " When using the Storage Write API, specifies the write semantics. To use"
                + " at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), set this parameter to true. To use exactly-once"
                + " semantics, set the parameter to `false`. This parameter applies only when `useStorageWriteApi` is `true`. The default value is `false`.",
        hiddenUi = true)
    @Default.Boolean(false)
    @Override
    Boolean getUseStorageWriteApiAtLeastOnce();

    void setUseStorageWriteApiAtLeastOnce(Boolean value);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options execution parameters to the pipeline
   * @return result of the pipeline execution as a {@link PipelineResult}
   */
  private static PipelineResult run(PubsubAvroToBigQueryOptions options) {
    BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);

    // Create the pipeline.
    Pipeline pipeline = Pipeline.create(options);

    Schema schema = SchemaUtils.getAvroSchema(options.getSchemaPath());

    WriteResult writeResults =
        pipeline
            .apply(
                "Read Avro records",
                PubsubIO.readAvroGenericRecords(schema)
                    .fromSubscription(options.getInputSubscription())
                    .withDeadLetterTopic(options.getOutputTopic()))
            // Workaround for BEAM-12256. Eagerly convert to rows to avoid
            // the RowToGenericRecord function that doesn't handle all data
            // types.
            // TODO: Remove this workaround when a fix for BEAM-12256 is
            // released.
            .apply(Convert.toRows())
            .apply(
                "Write to BigQuery",
                BigQueryConverters.<Row>createWriteTransform(options).useBeamSchema());

    BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResults, options)
        .apply(
            "Create error payload",
            ErrorConverters.BigQueryInsertErrorToPubsubMessage.<GenericRecord>newBuilder()
                .setPayloadCoder(AvroCoder.of(schema))
                .setTranslateFunction(BigQueryConverters.TableRowToGenericRecordFn.of(schema))
                .build())
        .apply("Write failed records", PubsubIO.writeMessages().to(options.getOutputTopic()));

    // Execute the pipeline and return the result.
    return pipeline.run();
  }
}

次のステップ