Vorlage „Pub/Sub Avro für BigQuery“

Die Vorlage „Pub/Sub Avro für BigQuery“ ist eine Streamingpipeline, die Avro-Daten aus einem Pub/Sub-Abo in eine BigQuery-Tabelle schreibt. Alle Fehler beim Schreiben in die BigQuery-Tabelle werden in ein Pub/Sub-Thema für nicht verarbeitete Datensätze gestreamt.

Pipelineanforderungen

  • Das Pub/Sub-Eingabeabo muss vorhanden sein.
  • Die Schemadatei für die Avro-Einträge muss in Cloud Storage hinterlegt sein.
  • Das Pub/Sub-Thema für nicht verarbeitete Datensätze muss vorhanden sein.
  • Das BigQuery-Ausgabe-Dataset muss vorhanden sein.

Vorlagenparameter

Erforderliche Parameter

  • schemaPath: Der Cloud Storage-Speicherort der Avro-Schemadatei. Beispiel: gs://path/to/my/schema.avsc.
  • inputSubscription: Das Pub/Sub-Eingabeabo, aus dem gelesen werden soll. Beispiel: projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_ID>.
  • outputTableSpec: Der Speicherort der BigQuery-Ausgabetabelle, in die die Ausgabe geschrieben werden soll. Beispiel: <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.Abhängig von der angegebenen createDisposition kann die Ausgabetabelle automatisch mit dem vom Nutzer angegebenen Avro-Schema erstellt werden.
  • outputTopic: Das Pub/Sub-Thema, das für nicht verarbeitete Datensätze verwendet werden soll. Beispiel: projects/<PROJECT_ID>/topics/<TOPIC_NAME>.

Optionale Parameter

Führen Sie die Vorlage aus.

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub Avro to BigQuery templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

Ersetzen Sie Folgendes:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • SCHEMA_PATH: der Cloud Storage-Pfad zur Avro-Schemadatei (z. B. gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: der Name des Pub/Sub-Eingabeabos
  • BIGQUERY_TABLE: der Name der BigQuery-Ausgabetabelle
  • DEADLETTER_TOPIC: Das Pub/Sub-Thema, das für die Warteschlange für nicht verarbeitete Datensätze verwendet werden soll

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

Ersetzen Sie Folgendes:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • SCHEMA_PATH: der Cloud Storage-Pfad zur Avro-Schemadatei (z. B. gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: der Name des Pub/Sub-Eingabeabos
  • BIGQUERY_TABLE: der Name der BigQuery-Ausgabetabelle
  • DEADLETTER_TOPIC: Das Pub/Sub-Thema, das für die Warteschlange für nicht verarbeitete Datensätze verwendet werden soll
Java
/*
 * Copyright (C) 2020 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.BigQueryCommonOptions.WriteOptions;
import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
import com.google.cloud.teleport.v2.options.PubsubCommonOptions.ReadSubscriptionOptions;
import com.google.cloud.teleport.v2.options.PubsubCommonOptions.WriteTopicOptions;
import com.google.cloud.teleport.v2.templates.PubsubAvroToBigQuery.PubsubAvroToBigQueryOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.values.Row;

/**
 * A Dataflow pipeline to stream <a href="https://avro.apache.org/">Apache Avro</a> records from
 * Pub/Sub into a BigQuery table.
 *
 * <p>Any persistent failures while writing to BigQuery will be written to a Pub/Sub dead-letter
 * topic.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/pubsub-binary-to-bigquery/README_PubSub_Avro_to_BigQuery.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "PubSub_Avro_to_BigQuery",
    category = TemplateCategory.STREAMING,
    displayName = "Pub/Sub Avro to BigQuery",
    description =
        "The Pub/Sub Avro to BigQuery template is a streaming pipeline that ingests Avro data from a Pub/Sub "
            + "subscription into a BigQuery table. Any errors which occur while writing to the BigQuery table are streamed into a Pub/Sub unprocessed topic.",
    optionsClass = PubsubAvroToBigQueryOptions.class,
    flexContainerName = "pubsub-avro-to-bigquery",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-avro-to-bigquery",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The input Pub/Sub subscription must exist.",
      "The schema file for the Avro records must exist on Cloud Storage.",
      "The unprocessed Pub/Sub topic must exist.",
      "The output BigQuery dataset must exist."
    },
    streaming = true,
    supportsAtLeastOnce = true)
public final class PubsubAvroToBigQuery {
  /**
   * Validates input flags and executes the Dataflow pipeline.
   *
   * @param args command line arguments to the pipeline
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    PubsubAvroToBigQueryOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(PubsubAvroToBigQueryOptions.class);

    run(options);
  }

  /**
   * Provides custom {@link org.apache.beam.sdk.options.PipelineOptions} required to execute the
   * {@link PubsubAvroToBigQuery} pipeline.
   */
  public interface PubsubAvroToBigQueryOptions
      extends ReadSubscriptionOptions,
          WriteOptions,
          WriteTopicOptions,
          BigQueryStorageApiStreamingOptions {

    @TemplateParameter.GcsReadFile(
        order = 1,
        description = "Cloud Storage path to the Avro schema file",
        helpText =
            "The Cloud Storage location of the Avro schema file. For example, `gs://path/to/my/schema.avsc`.")
    @Required
    String getSchemaPath();

    void setSchemaPath(String schemaPath);

    // Hide the UseStorageWriteApiAtLeastOnce in the UI, because it will automatically be turned
    // on when pipeline is running on ALO mode and using the Storage Write API
    @TemplateParameter.Boolean(
        order = 2,
        optional = true,
        parentName = "useStorageWriteApi",
        parentTriggerValues = {"true"},
        description = "Use at at-least-once semantics in BigQuery Storage Write API",
        helpText =
            " When using the Storage Write API, specifies the write semantics. To use"
                + " at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), set this parameter to true. To use exactly-once"
                + " semantics, set the parameter to `false`. This parameter applies only when `useStorageWriteApi` is `true`. The default value is `false`.",
        hiddenUi = true)
    @Default.Boolean(false)
    @Override
    Boolean getUseStorageWriteApiAtLeastOnce();

    void setUseStorageWriteApiAtLeastOnce(Boolean value);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options execution parameters to the pipeline
   * @return result of the pipeline execution as a {@link PipelineResult}
   */
  private static PipelineResult run(PubsubAvroToBigQueryOptions options) {
    BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);

    // Create the pipeline.
    Pipeline pipeline = Pipeline.create(options);

    Schema schema = SchemaUtils.getAvroSchema(options.getSchemaPath());

    WriteResult writeResults =
        pipeline
            .apply(
                "Read Avro records",
                PubsubIO.readAvroGenericRecords(schema)
                    .fromSubscription(options.getInputSubscription())
                    .withDeadLetterTopic(options.getOutputTopic()))
            // Workaround for BEAM-12256. Eagerly convert to rows to avoid
            // the RowToGenericRecord function that doesn't handle all data
            // types.
            // TODO: Remove this workaround when a fix for BEAM-12256 is
            // released.
            .apply(Convert.toRows())
            .apply(
                "Write to BigQuery",
                BigQueryConverters.<Row>createWriteTransform(options).useBeamSchema());

    BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResults, options)
        .apply(
            "Create error payload",
            ErrorConverters.BigQueryInsertErrorToPubsubMessage.<GenericRecord>newBuilder()
                .setPayloadCoder(AvroCoder.of(schema))
                .setTranslateFunction(BigQueryConverters.TableRowToGenericRecordFn.of(schema))
                .build())
        .apply("Write failed records", PubsubIO.writeMessages().to(options.getOutputTopic()));

    // Execute the pipeline and return the result.
    return pipeline.run();
  }
}

Nächste Schritte