Modèle AstraDB vers BigQuery

Le modèle AstraDB vers BigQuery est un pipeline de traitement par lot qui lit les enregistrements depuis AstraDB et les écrit dans BigQuery.

Si la table de destination n'existe pas dans BigQuery, le pipeline crée une table avec les valeurs suivantes :

  • Le Dataset ID, hérité de l'espace de clés Cassandra.
  • Le Table ID, hérité de la table Cassandra.

Le schéma de la table de destination est déduit de la table Cassandra source.

  • List et Set sont mappés avec les champs BigQuery REPEATED.
  • Map est mappé avec les champs BigQuery RECORD.
  • Tous les autres types sont mappés avec les champs BigQuery des types correspondants.
  • Les types définis par l'utilisateur (UDT) et les types de données tuple Cassandra ne sont pas acceptés.

Conditions requises pour ce pipeline

  • Compte AstraDB avec un jeton

Paramètres de modèle

Paramètres obligatoires

  • astraToken: valeur du jeton ou ID de ressource du secret. Exemple :AstraCS:abcdefghij
  • astraDatabaseId: identifiant unique de la base de données (UUID). Exemple :cf7af129-d33a-498f-ad06-d97a6ee6eb7
  • astraKeyspace: nom de l'espace de clés Cassandra dans la base de données Astra.
  • astraTable: nom de la table dans la base de données Cassandra. Exemple :my_table

Paramètres facultatifs

  • astraQuery: requête à utiliser pour filtrer les lignes au lieu de lire la table entière.
  • astraDatabaseRegion: si cette valeur n'est pas spécifiée, une valeur par défaut est choisie, ce qui est utile pour les bases de données multirégionales.
  • minTokenRangesCount: nombre minimal de divisions à utiliser pour distribuer la requête.
  • outputTableSpec: emplacement de la table BigQuery dans lequel écrire la sortie. Utilisez le format <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>. Le schéma de la table doit correspondre aux objets d'entrée.

Exécuter le modèle

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the AstraDB to BigQuery template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/AstraDB_To_BigQuery \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       astraToken=ASTRA_TOKEN,\
       astraDatabaseId=ASTRA_DATABASE_ID,\
       astraKeyspace=ASTRA_KEYSPACE,\
       astraTable=ASTRA_TABLE,\

Remplacez les éléments suivants :

  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • ASTRA_TOKEN : jeton Astra
  • ASTRA_DATABASE_ID : identifiant de la base de données
  • ASTRA_KEYSPACE : espace de clés Cassandra
  • ASTRA_TABLE : table Cassandra

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "astraToken": "ASTRA_TOKEN",
       "astraDatabaseId": "ASTRA_DATABASE_ID",
       "astraKeyspace": "ASTRA_KEYSPACE",
       "astraTable": "ASTRA_TABLE",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/AstraDB_To_BigQuery",
     "environment": { "maxWorkers": "10" }
  }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • ASTRA_TOKEN : jeton Astra
  • ASTRA_DATABASE_ID : identifiant de la base de données
  • ASTRA_KEYSPACE : espace de clés Cassandra
  • ASTRA_TABLE : table Cassandra
Java
/*
 * Copyright (C) 2023 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.astradb.templates;

import com.datastax.oss.driver.api.core.CqlSession;
import com.dtsx.astra.sdk.db.DatabaseClient;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.astradb.options.AstraDbToBigQueryOptions;
import com.google.cloud.teleport.v2.astradb.transforms.AstraDbToBigQueryMappingFn;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.utils.SecretManagerUtils;
import java.util.AbstractMap;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.astra.db.AstraDbIO;
import org.apache.beam.sdk.io.astra.db.CqlSessionHolder;
import org.apache.beam.sdk.io.astra.db.mapping.AstraDbMapper;
import org.apache.beam.sdk.io.astra.db.mapping.BeamRowDbMapperFactoryFn;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link AstraDbToBigQuery} pipeline is a batch pipeline which ingests data from AstraDB and
 * outputs the resulting records to BigQuery.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/astradb-to-bigquery/README_AstraDB_to_BigQuery.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "AstraDB_To_BigQuery",
    category = TemplateCategory.BATCH,
    displayName = "AstraDB to BigQuery",
    description = {
      "The AstraDB to BigQuery template is a batch pipeline that reads records from AstraDB and writes them to BigQuery.",
      "If the destination table doesn't exist in BigQuery, the pipeline creates a table with the following values:\n"
          + "- The `Dataset ID` is inherited from the Cassandra keyspace.\n"
          + "- The `Table ID` is inherited from the Cassandra table.\n",
      "The schema of the destination table is inferred from the source Cassandra table.\n"
          + "- `List` and `Set` are mapped to BigQuery `REPEATED` fields.\n"
          + "- `Map` are mapped to BigQuery `RECORD` fields.\n"
          + "- All other types are mapped to BigQuery fields with the corresponding types.\n"
          + "- Cassandra user-defined types (UDTs) and tuple data types are not supported."
    },
    optionsClass = AstraDbToBigQuery.Options.class,
    flexContainerName = "astradb-to-bigquery",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/astradb-to-bigquery",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {"AstraDB account with a token"})
public class AstraDbToBigQuery {

  /** Logger for the class. */
  private static final Logger LOGGER = LoggerFactory.getLogger(AstraDbToBigQuery.class);

  /** If not provided, it is the default token range value. */
  public static final int DEFAULT_TOKEN_RANGE = 18;

  /**
   * Options for the sample
   *
   * <p>Inherits standard configuration options.
   */
  public interface Options
      extends PipelineOptions,
          AstraDbToBigQueryOptions.AstraDbSourceOptions,
          AstraDbToBigQueryOptions.BigQueryWriteOptions {}

  /** Main operations. */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();
    LOGGER.info("Starting pipeline");

    try {

      Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
      LOGGER.debug("Pipeline Arguments (options) validated");

      // --------------------------------
      // AstraDbIO.Read<Row>
      // --------------------------------

      // Credentials are read from secrets manager
      AbstractMap.SimpleImmutableEntry<String, byte[]> astraCredentials =
          parseAstraCredentials(options);
      LOGGER.debug("Astra Credentials parsed");

      // Map Cassandra Table Schema into BigQuery Table Schema
      SerializableFunction<AstraDbIO.Read<?>, TableSchema> bigQuerySchemaFactory =
          new AstraDbToBigQueryMappingFn(options.getAstraKeyspace(), options.getAstraTable());
      LOGGER.debug("Schema Mapper has been initialized");

      // Map Cassandra Rows into (Apache) Beam Rows (DATA)
      SerializableFunction<CqlSession, AstraDbMapper<Row>> beamRowMapperFactory =
          new BeamRowDbMapperFactoryFn(options.getAstraKeyspace(), options.getAstraTable());
      LOGGER.debug("Row Mapper has been initialized");

      // Distribute reads across all available Cassandra nodes
      int minimalTokenRangesCount =
          (options.getMinTokenRangesCount() == null)
              ? DEFAULT_TOKEN_RANGE
              : options.getMinTokenRangesCount();

      // Source: AstraDb
      AstraDbIO.Read<Row> astraSource =
          AstraDbIO.<Row>read()
              .withToken(astraCredentials.getKey())
              .withSecureConnectBundle(astraCredentials.getValue())
              .withKeyspace(options.getAstraKeyspace())
              .withTable(options.getAstraTable())
              .withMinNumberOfSplits(minimalTokenRangesCount)
              .withMapperFactoryFn(beamRowMapperFactory)
              .withCoder(SerializableCoder.of(Row.class))
              .withEntity(Row.class);
      LOGGER.debug("AstraDb Source initialization [OK]");

      // --------------------------------
      //  BigQueryIO.Write<Row>
      // --------------------------------

      TableReference bqTableRef = parseBigQueryDestinationTable(options);
      createBigQueryDestinationTableIfNotExist(options, bqTableRef);
      LOGGER.debug("BigQuery Sink Table has been initialized");

      // Sink: BigQuery
      BigQueryIO.Write<Row> bigQuerySink =
          BigQueryIO.<Row>write()
              .to(bqTableRef)
              // Specialized function reading cassandra source table and mapping to BigQuery Schema
              .withSchema(bigQuerySchemaFactory.apply(astraSource))
              // Provided by google, convert a Beam Row to a BigQuery TableRow
              .withFormatFunction(row -> row != null ? BigQueryUtils.toTableRow(row) : null)
              // Table Will be created if not exist
              .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
              .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
      LOGGER.debug("BigQuery Sink initialization [OK]");

      // --------------------------------
      //  Pipeline
      // --------------------------------

      Pipeline astraDbToBigQueryPipeline = Pipeline.create(options);
      astraDbToBigQueryPipeline
          .apply("Read From Astra", astraSource)
          .apply("Write To BigQuery", bigQuerySink);
      astraDbToBigQueryPipeline.run();

    } finally {
      // Cassandra Connection is stateful and needs to be closed
      CqlSessionHolder.cleanup();
    }
  }

  /**
   * Parse Astra Credentials from secrets in secret Manager. - SecretManagerUtils is not used as
   * only applied to String secrets
   *
   * @param options pipeline options
   * @return a pair with the token and the secure bundle
   */
  private static AbstractMap.SimpleImmutableEntry<String, byte[]> parseAstraCredentials(
      Options options) {

    String astraToken = options.getAstraToken();
    if (!astraToken.startsWith("AstraCS")) {
      astraToken = SecretManagerUtils.getSecret(options.getAstraToken());
    }
    LOGGER.info("Astra Token is parsed, value={}", astraToken.substring(0, 10) + "...");
    /*
     * Accessing the devops Api to retrieve the secure bundle.
     */
    DatabaseClient astraDbClient = new DatabaseClient(astraToken, options.getAstraDatabaseId());
    if (!astraDbClient.exist()) {
      throw new RuntimeException(
          "Astra Database does not exist, please check your Astra Token and Database ID");
    }
    byte[] astraSecureBundle = astraDbClient.downloadDefaultSecureConnectBundle();
    if (!StringUtils.isEmpty(options.getAstraDatabaseRegion())) {
      astraSecureBundle =
          astraDbClient.downloadSecureConnectBundle(options.getAstraDatabaseRegion());
    }
    LOGGER.info("Astra Bundle is parsed, length={}", astraSecureBundle.length);
    return new AbstractMap.SimpleImmutableEntry<>(astraToken, astraSecureBundle);
  }

  /**
   * Create the Bog Query table Reference (provided or based on Cassandra table name).
   *
   * @param options pipeline options
   * @return the big query table reference
   */
  private static TableReference parseBigQueryDestinationTable(Options options) {
    /*
     * bigQueryOutputTableSpec argument is the Big Query table specification. This is parameter
     * is optional. If not set, the table specification is built from the cassandra source table
     * attributes: keyspace=dataset name, table=table name.
     */
    String bigQueryOutputTableSpec = options.getOutputTableSpec();
    if (StringUtils.isEmpty(bigQueryOutputTableSpec)) {
      bigQueryOutputTableSpec =
          options.getProject() + ":" + options.getAstraKeyspace() + "." + options.getAstraTable();
    }
    TableReference bigQueryTableReference = BigQueryUtils.toTableReference(bigQueryOutputTableSpec);
    LOGGER.info("Big Query table spec has been set to {}", bigQueryOutputTableSpec);
    return bigQueryTableReference;
  }

  /**
   * Create destination dataset and tables if needed (schema mapped from Cassandra).
   *
   * @param options pipeline options
   * @param bqTableRef big query table reference
   */
  private static void createBigQueryDestinationTableIfNotExist(
      Options options, TableReference bqTableRef) {
    BigQuery bigquery =
        BigQueryOptions.newBuilder().setProjectId(options.getProject()).build().getService();
    if (null
        == bigquery.getDataset(
            DatasetId.of(bqTableRef.getProjectId(), bqTableRef.getDatasetId()))) {
      LOGGER.info(
          "Dataset was not found: creating DataSet {} in region {}",
          bqTableRef.getDatasetId(),
          options.getWorkerRegion());
      bigquery.create(
          DatasetInfo.newBuilder(bqTableRef.getDatasetId())
              .setLocation(options.getWorkerRegion())
              .build());
      LOGGER.debug("Dataset has been created [OK]");
    } else {
      LOGGER.info("Dataset {} already exist", bqTableRef.getDatasetId());
    }
  }
}

Étape suivante