Vorlage "BigQuery für Bigtable"

Die Vorlage "BigQuery für Bigtable" ist eine Batchpipeline, die Daten aus einer BigQuery-Tabelle in eine vorhandene Bigtable-Tabelle kopiert. Die Vorlage kann entweder die gesamte Tabelle oder bestimmte Datensätze mithilfe einer angegebenen Abfrage lesen.

Pipelineanforderungen

  • Die BigQuery-Quelltabelle
  • Die Bigtable-Tabelle muss vorhanden sein.
  • Das Worker-Dienstkonto benötigt die Berechtigung roles/bigquery.datasets.create. Weitere Informationen finden Sie unter Einführung in IAM.

Vorlagenparameter

Erforderliche Parameter

  • readIdColumn: Der Name der BigQuery-Spalte, in der die eindeutige Kennung der Zeile gespeichert ist.
  • bigtableWriteInstanceId: Die ID der Bigtable-Instanz, die die Tabelle enthält.
  • bigtableWriteTableId: Die ID der Bigtable-Tabelle, in die Daten geschrieben werden sollen.
  • bigtableWriteColumnFamily: Der Name der Spaltenfamilie der Bigtable-Tabelle, in die Daten geschrieben werden sollen.

Optionale Parameter

  • inputTableSpec: Die BigQuery-Tabelle, aus der gelesen werden soll. Wenn Sie inputTableSpec angeben, liest die Vorlage die Daten mithilfe der BigQuery Storage Read API direkt aus dem BigQuery-Speicher (https://cloud.google.com/bigquery/docs/reference/storage). Informationen zu Einschränkungen in der Storage Read API finden Sie unter https://cloud.google.com/bigquery/docs/reference/storage#limitations. Sie müssen entweder inputTableSpec oder query angeben. Wenn Sie beide Parameter festlegen, verwendet die Vorlage den Parameter query. Beispiel: <BIGQUERY_PROJECT>:<DATASET_NAME>.<INPUT_TABLE>.
  • outputDeadletterTable: Die BigQuery-Tabelle für Nachrichten, die die Ausgabetabelle nicht erreicht haben. Wenn eine Tabelle nicht vorhanden ist, wird sie während der Pipelineausführung erstellt. Falls nichts angegeben wird, wird <outputTableSpec>_error_records verwendet. Beispiel: <PROJECT_ID>:<DATASET_NAME>.<DEADLETTER_TABLE>
  • query: Die SQL-Abfrage zum Lesen von Daten aus BigQuery. Wenn sich das BigQuery-Dataset in einem anderen Projekt als der Dataflow-Job befindet, geben Sie den vollständigen Dataset-Namen in der SQL-Abfrage an, z. B.: <PROJECT_ID>.<DATASET_NAME>.<TABLE_NAME>. Standardmäßig wird für den Parameter query GoogleSQL verwendet (https://cloud.google.com/bigquery/docs/introduction-sql), es sei denn, useLegacySql ist true. Sie müssen entweder inputTableSpec oder query angeben. Wenn Sie beide Parameter festlegen, verwendet die Vorlage den Parameter query. Beispiel: select * from sampledb.sample_table.
  • useLegacySql: Legen Sie true fest, um Legacy-SQL zu verwenden. Dieser Parameter gilt nur, wenn der Parameter query verwendet wird. Die Standardeinstellung ist false.
  • queryLocation: Wird benötigt, wenn aus einer autorisierten Ansicht ohne Berechtigung der zugrunde liegenden Tabelle gelesen wird. Beispiel: US.
  • queryTempDataset: Mit dieser Option können Sie ein vorhandenes Dataset festlegen, um die temporäre Tabelle zum Speichern der Ergebnisse der Abfrage zu erstellen. Beispiel: temp_dataset.
  • KMSEncryptionKey: Wenn Sie mithilfe der Abfragequelle Daten aus BigQuery lesen, verwenden Sie diesen Cloud KMS-Schlüssel, um alle erstellten temporären Tabellen zu verschlüsseln. Beispiel: projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key.
  • bigtableRpcAttemptTimeoutMs: Das Zeitlimit für jeden Bigtable-RPC-Versuch in Millisekunden.
  • bigtableRpcTimeoutMs: Das Gesamtzeitlimit für einen Bigtable-RPC-Vorgang in Millisekunden.
  • bigtableAdditionalRetryCodes: Die zusätzlichen Wiederholungscodes. Beispiel: RESOURCE_EXHAUSTED,DEADLINE_EXCEEDED.
  • bigtableWriteAppProfile: Die ID des Bigtable-Anwendungsprofils, das für den Export verwendet werden soll. Wenn Sie kein Anwendungsprofil angeben, verwendet Bigtable das standardmäßige Anwendungsprofil (https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile) der Instanz.
  • bigtableWriteProjectId: Die ID des Google Cloud-Projekts, das die Bigtable-Instanz enthält, in die Daten geschrieben werden sollen.
  • bigtableBulkWriteLatencyTargetMs: Das Latenzziel von Bigtable in Millisekunden für die latenzbasierte Drosselung.
  • bigtableBulkWriteMaxRowKeyCount: Die maximale Anzahl von Zeilenschlüsseln in einem Bigtable-Batch-Schreibvorgang.
  • bigtableBulkWriteMaxRequestSizeBytes: Die maximale Anzahl von Bytes, die pro Batchschreibvorgang in Bigtable einbezogen werden soll.

Führen Sie die Vorlage aus.

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the BigQuery to Bigtable templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/BigQuery_to_Bigtable \
    --parameters \
readIdColumn=READ_COLUMN_ID,\
inputTableSpec=INPUT_TABLE_SPEC,\
bigtableWriteInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableWriteTableId=BIGTABLE_TABLE_ID,\
bigtableWriteColumnFamily=BIGTABLE_COLUMN_FAMILY

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • READ_COLUMN_ID: Ihre eindeutige BigQuery-ID-Spalte.
  • INPUT_TABLE_SPEC: ist der BigQuery-Tabellenname
  • BIGTABLE_INSTANCE_ID: Ihre Bigtable-Instanz-ID.
  • BIGTABLE_TABLE_ID: Ihre Bigtable-Tabellen-ID.
  • BIGTABLE_COLUMN_FAMILY: die Bigtable-Tabellenspaltenfamilie.

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "readIdColumn": "READ_COLUMN_ID",
          "inputTableSpec": "INPUT_TABLE_SPEC",
          "bigtableWriteInstanceId": "BIGTABLE_INSTANCE_ID",
          "bigtableWriteTableId": "BIGTABLE_TABLE_ID",
          "bigtableWriteColumnFamily": "BIGTABLE_COLUMN_FAMILY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/BigQuery_to_Bigtable",
   }
}

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • READ_COLUMN_ID: Ihre eindeutige BigQuery-ID-Spalte.
  • INPUT_TABLE_SPEC: ist der BigQuery-Tabellenname
  • BIGTABLE_INSTANCE_ID: Ihre Bigtable-Instanz-ID.
  • BIGTABLE_TABLE_ID: Ihre Bigtable-Tabellen-ID.
  • BIGTABLE_COLUMN_FAMILY: die Bigtable-Tabellenspaltenfamilie.
Java
/*
 * Copyright (C) 2021 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.bigtable.utils.BigtableConfig.generateCloudBigtableWriteConfiguration;

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.bigtable.options.BigtableCommonOptions;
import com.google.cloud.teleport.v2.bigtable.transforms.BigtableConverters;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.templates.BigQueryToBigtable.BigQueryToBigtableOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.hadoop.hbase.client.Mutation;

/**
 * Dataflow template which reads BigQuery data and writes it to Bigtable. The source data can be
 * either a BigQuery table or an SQL query.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/bigquery-to-bigtable/README_BigQuery_to_Bigtable.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "BigQuery_to_Bigtable",
    category = TemplateCategory.BATCH,
    displayName = "BigQuery to Bigtable",
    description = "A pipeline to export a BigQuery table into Bigtable.",
    optionsClass = BigQueryToBigtableOptions.class,
    optionsOrder = {
      BigQueryToBigtableOptions.class,
      BigQueryConverters.BigQueryReadOptions.class,
      BigtableCommonOptions.class,
      BigtableCommonOptions.WriteOptions.class
    },
    optionalOptions = {"inputTableSpec"},
    flexContainerName = "bigquery-to-bigtable",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/bigquery-to-bigtable",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The source BigQuery table must exist.",
      "The Bigtable table must exist.",
      "The <a href=\"https://cloud.google.com/dataflow/docs/concepts/security-and-permissions#worker-service-account\">worker service account</a>"
          + " needs the <code>roles/bigquery.datasets.create</code> permission. For"
          + " more information, see <a href=\"https://cloud.google.com/bigquery/docs/access-control\">Introduction to IAM</a>."
    })
public class BigQueryToBigtable {

  /**
   * The {@link BigQueryToBigtableOptions} class provides the custom execution options passed by the
   * executor at the command-line.
   */
  public interface BigQueryToBigtableOptions
      extends BigQueryConverters.BigQueryReadOptions,
          BigtableCommonOptions.WriteOptions,
          GcpOptions {

    @TemplateParameter.Text(
        order = 1,
        regexes = {"[A-Za-z_][A-Za-z_0-9]*"},
        description = "Unique identifier column",
        helpText = "The name of the BigQuery column storing the unique identifier of the row.")
    @Required
    String getReadIdColumn();

    void setReadIdColumn(String value);
  }

  /**
   * Runs a pipeline which reads data from BigQuery and writes it to Bigtable.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    BigQueryToBigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryToBigtableOptions.class);

    CloudBigtableTableConfiguration bigtableTableConfig =
        generateCloudBigtableWriteConfiguration(options);

    Pipeline pipeline = Pipeline.create(options);

    pipeline
        .apply(
            "AvroToMutation",
            BigQueryConverters.ReadBigQuery.<Mutation>newBuilder()
                .setOptions(options.as(BigQueryToBigtableOptions.class))
                .setReadFunction(
                    BigQueryIO.read(
                        BigtableConverters.AvroToMutation.newBuilder()
                            .setColumnFamily(options.getBigtableWriteColumnFamily())
                            .setRowkey(options.getReadIdColumn())
                            .build()))
                .build())
        .apply("WriteToTable", CloudBigtableIO.writeToTable(bigtableTableConfig));

    pipeline.run();
  }
}

Nächste Schritte