Modèle BigQuery vers Bigtable

Le modèle BigQuery vers Bigtable est un pipeline par lots qui copie les données d'une table BigQuery dans une table Bigtable existante. Le modèle peut lire la table entière ou lire des enregistrements spécifiques indiqués par une requête fournie.

Conditions requises pour ce pipeline

Paramètres de modèle

Paramètres obligatoires

  • readIdColumn: nom de la colonne BigQuery stockant l'identifiant unique de la ligne.
  • bigtableWriteInstanceId: ID de l'instance Bigtable qui contient la table.
  • bigtableWriteTableId: ID de la table Bigtable dans laquelle écrire les données.
  • bigtableWriteColumnFamily: nom de la famille de colonnes de la table Bigtable dans laquelle écrire les données.

Paramètres facultatifs

  • inputTableSpec: table BigQuery à lire. Si vous spécifiez inputTableSpec, le modèle lit les données directement à partir de l'espace de stockage BigQuery à l'aide de l'API BigQuery Storage Read (https://cloud.google.com/bigquery/docs/reference/storage). Pour en savoir plus sur les limites de l'API Storage Read, consultez https://cloud.google.com/bigquery/docs/reference/storage#limitations. Vous devez spécifier inputTableSpec ou query. Si vous définissez les deux paramètres, le modèle utilise le paramètre query. Par exemple, <BIGQUERY_PROJECT>:<DATASET_NAME>.<INPUT_TABLE>.
  • outputDeadletterTable: table BigQuery des messages qui n'ont pas pu atteindre la table de sortie. En l'absence de table existante, une table va être créée lors de l'exécution du pipeline. Si aucune valeur n'est spécifiée, <outputTableSpec>_error_records est utilisé. Par exemple, <PROJECT_ID>:<DATASET_NAME>.<DEADLETTER_TABLE>.
  • query: requête SQL à utiliser pour lire les données à partir de BigQuery. Si l'ensemble de données BigQuery se trouve dans un projet différent de celui de la tâche Dataflow, spécifiez le nom complet de l'ensemble de données dans la requête SQL, par exemple : <ID_PROJET>.<NOM_ENSEMBLE_DE_DONNÉES>.<NOM_TABLE>. Par défaut, le paramètre query utilise GoogleSQL (https://cloud.google.com/bigquery/docs/introduction-sql), sauf si useLegacySql est true. Vous devez spécifier inputTableSpec ou query. Si vous définissez les deux paramètres, le modèle utilise le paramètre query. Par exemple, select * from sampledb.sample_table.
  • useLegacySql: définissez la valeur sur true pour utiliser l'ancien SQL. Ce paramètre s'applique uniquement lorsque vous utilisez le paramètre query. La valeur par défaut est false.
  • queryLocation: requis lors de la lecture à partir d'une vue autorisée sans l'autorisation de la table sous-jacente. Exemple :US
  • queryTempDataset: avec cette option, vous pouvez définir un ensemble de données existant pour créer la table temporaire qui stockera les résultats de la requête. Exemple :temp_dataset
  • KMSEncryptionKey: si vous lisez à partir de BigQuery à l'aide de la source de requête, utilisez cette clé Cloud KMS pour chiffrer les tables temporaires créées. Exemple :projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key
  • bigtableRpcAttemptTimeoutMs: délai avant expiration (en millisecondes) de chaque tentative de RPC Bigtable.
  • bigtableRpcTimeoutMs: délai avant expiration total (en millisecondes) d'une opération RPC Bigtable.
  • bigtableAdditionalRetryCodes: codes de nouvelles tentatives supplémentaires. Exemple :RESOURCE_EXHAUSTED,DEADLINE_EXCEEDED
  • bigtableWriteAppProfile: ID du profil d'application Bigtable à utiliser pour l'exportation. Si vous ne spécifiez pas de profil d'application, Bigtable utilise le profil d'application par défaut (https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile) de l'instance.
  • bigtableWriteProjectId: ID du projet Google Cloud contenant l'instance Bigtable dans laquelle écrire les données.
  • bigtableBulkWriteLatencyTargetMs: cible de latence de Bigtable en millisecondes pour la limitation basée sur la latence.
  • bigtableBulkWriteMaxRowKeyCount: nombre maximal de clés de ligne dans une opération d'écriture par lot Bigtable.
  • bigtableBulkWriteMaxRequestSizeBytes: nombre maximal d'octets à inclure par opération d'écriture par lot Bigtable.

Exécuter le modèle

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the BigQuery to Bigtable template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/BigQuery_to_Bigtable \
    --parameters \
readIdColumn=READ_COLUMN_ID,\
inputTableSpec=INPUT_TABLE_SPEC,\
bigtableWriteInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableWriteTableId=BIGTABLE_TABLE_ID,\
bigtableWriteColumnFamily=BIGTABLE_COLUMN_FAMILY

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • READ_COLUMN_ID : colonne BigQuery d'ID uniques.
  • INPUT_TABLE_SPEC : nom de votre table BigQuery.
  • BIGTABLE_INSTANCE_ID : ID de votre instance Bigtable.
  • BIGTABLE_TABLE_ID : ID de votre table Bigtable.
  • BIGTABLE_COLUMN_FAMILY : famille de colonnes de votre table Bigtable.

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "readIdColumn": "READ_COLUMN_ID",
          "inputTableSpec": "INPUT_TABLE_SPEC",
          "bigtableWriteInstanceId": "BIGTABLE_INSTANCE_ID",
          "bigtableWriteTableId": "BIGTABLE_TABLE_ID",
          "bigtableWriteColumnFamily": "BIGTABLE_COLUMN_FAMILY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/BigQuery_to_Bigtable",
   }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • READ_COLUMN_ID : colonne BigQuery d'ID uniques.
  • INPUT_TABLE_SPEC : nom de votre table BigQuery.
  • BIGTABLE_INSTANCE_ID : ID de votre instance Bigtable.
  • BIGTABLE_TABLE_ID : ID de votre table Bigtable.
  • BIGTABLE_COLUMN_FAMILY : famille de colonnes de votre table Bigtable.
Java
/*
 * Copyright (C) 2021 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.bigtable.utils.BigtableConfig.generateCloudBigtableWriteConfiguration;

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.bigtable.options.BigtableCommonOptions;
import com.google.cloud.teleport.v2.bigtable.transforms.BigtableConverters;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.templates.BigQueryToBigtable.BigQueryToBigtableOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.hadoop.hbase.client.Mutation;

/**
 * Dataflow template which reads BigQuery data and writes it to Bigtable. The source data can be
 * either a BigQuery table or an SQL query.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/bigquery-to-bigtable/README_BigQuery_to_Bigtable.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "BigQuery_to_Bigtable",
    category = TemplateCategory.BATCH,
    displayName = "BigQuery to Bigtable",
    description = "A pipeline to export a BigQuery table into Bigtable.",
    optionsClass = BigQueryToBigtableOptions.class,
    optionsOrder = {
      BigQueryToBigtableOptions.class,
      BigQueryConverters.BigQueryReadOptions.class,
      BigtableCommonOptions.class,
      BigtableCommonOptions.WriteOptions.class
    },
    optionalOptions = {"inputTableSpec"},
    flexContainerName = "bigquery-to-bigtable",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/bigquery-to-bigtable",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The source BigQuery table must exist.",
      "The Bigtable table must exist.",
      "The <a href=\"https://cloud.google.com/dataflow/docs/concepts/security-and-permissions#worker-service-account\">worker service account</a>"
          + " needs the <code>roles/bigquery.datasets.create</code> permission. For"
          + " more information, see <a href=\"https://cloud.google.com/bigquery/docs/access-control\">Introduction to IAM</a>."
    })
public class BigQueryToBigtable {

  /**
   * The {@link BigQueryToBigtableOptions} class provides the custom execution options passed by the
   * executor at the command-line.
   */
  public interface BigQueryToBigtableOptions
      extends BigQueryConverters.BigQueryReadOptions,
          BigtableCommonOptions.WriteOptions,
          GcpOptions {

    @TemplateParameter.Text(
        order = 1,
        regexes = {"[A-Za-z_][A-Za-z_0-9]*"},
        description = "Unique identifier column",
        helpText = "The name of the BigQuery column storing the unique identifier of the row.")
    @Required
    String getReadIdColumn();

    void setReadIdColumn(String value);
  }

  /**
   * Runs a pipeline which reads data from BigQuery and writes it to Bigtable.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    BigQueryToBigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryToBigtableOptions.class);

    CloudBigtableTableConfiguration bigtableTableConfig =
        generateCloudBigtableWriteConfiguration(options);

    Pipeline pipeline = Pipeline.create(options);

    pipeline
        .apply(
            "AvroToMutation",
            BigQueryConverters.ReadBigQuery.<Mutation>newBuilder()
                .setOptions(options.as(BigQueryToBigtableOptions.class))
                .setReadFunction(
                    BigQueryIO.read(
                        BigtableConverters.AvroToMutation.newBuilder()
                            .setColumnFamily(options.getBigtableWriteColumnFamily())
                            .setRowkey(options.getReadIdColumn())
                            .build()))
                .build())
        .apply("WriteToTable", CloudBigtableIO.writeToTable(bigtableTableConfig));

    pipeline.run();
  }
}

Étape suivante