Plantilla de AstraDB a BigQuery

La plantilla de AstraDB a BigQuery es una canalización por lotes que lee registros de AstraDB y los escribe en BigQuery.

Si la tabla de destino no existe en BigQuery, la canalización crea una tabla con los siguientes valores:

  • El Dataset ID, que se hereda del espacio de claves de Cassandra.
  • El Table ID, que se hereda de la tabla de Cassandra.

El esquema de la tabla de destino se infiere de la tabla de origen de Cassandra.

  • List y Set se asignan a los campos REPEATED de BigQuery.
  • Map se asignan a los campos RECORD de BigQuery.
  • Todos los demás tipos se asignan a los campos de BigQuery con los tipos correspondientes.
  • Los tipos definidos por el usuario (UDT) y los tipos de datos de tupla de Cassandra son compatibles.

Requisitos de la canalización

  • Cuenta de AstraDB con un token

Parámetros de la plantilla

Parámetros obligatorios

  • astraToken: Es el valor del token o el ID de recurso secreto. Por ejemplo, AstraCS:abcdefghij
  • astraDatabaseId: Es el identificador único de la base de datos (UUID). Por ejemplo, cf7af129-d33a-498f-ad06-d97a6ee6eb7
  • astraKeyspace: El nombre del espacio de claves de Cassandra dentro de la base de datos de Astra.
  • astraTable: El nombre de la tabla dentro de la base de datos de Cassandra. Por ejemplo, my_table

Parámetros opcionales

  • astraQuery: La consulta que se usará para filtrar filas en lugar de leer toda la tabla.
  • astraDatabaseRegion: Si no se proporciona, se elige uno predeterminado, que es útil con bases de datos multirregionales.
  • minTokenRangesCount: Es la cantidad mínima de divisiones que se usarán para distribuir la consulta.
  • outputTableSpec: La ubicación de la tabla de BigQuery en la que se escribirá el resultado. Usa el formato <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>. El esquema de la tabla debe coincidir con los objetos de entrada.

Ejecuta la plantilla

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the AstraDB to BigQuery template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/AstraDB_To_BigQuery \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       astraToken=ASTRA_TOKEN,\
       astraDatabaseId=ASTRA_DATABASE_ID,\
       astraKeyspace=ASTRA_KEYSPACE,\
       astraTable=ASTRA_TABLE,\

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • ASTRA_TOKEN: Es el token de Astra
  • ASTRA_DATABASE_ID: Es el identificador de la base de datos
  • ASTRA_KEYSPACE: Es el espacio de claves de Cassandra
  • ASTRA_TABLE: Es la tabla de Cassandra

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "astraToken": "ASTRA_TOKEN",
       "astraDatabaseId": "ASTRA_DATABASE_ID",
       "astraKeyspace": "ASTRA_KEYSPACE",
       "astraTable": "ASTRA_TABLE",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/AstraDB_To_BigQuery",
     "environment": { "maxWorkers": "10" }
  }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • ASTRA_TOKEN: Es el token de Astra
  • ASTRA_DATABASE_ID: Es el identificador de la base de datos
  • ASTRA_KEYSPACE: Es el espacio de claves de Cassandra
  • ASTRA_TABLE: Es la tabla de Cassandra
Java
/*
 * Copyright (C) 2023 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.astradb.templates;

import com.datastax.oss.driver.api.core.CqlSession;
import com.dtsx.astra.sdk.db.DatabaseClient;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.astradb.options.AstraDbToBigQueryOptions;
import com.google.cloud.teleport.v2.astradb.transforms.AstraDbToBigQueryMappingFn;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.utils.SecretManagerUtils;
import java.util.AbstractMap;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.astra.db.AstraDbIO;
import org.apache.beam.sdk.io.astra.db.CqlSessionHolder;
import org.apache.beam.sdk.io.astra.db.mapping.AstraDbMapper;
import org.apache.beam.sdk.io.astra.db.mapping.BeamRowDbMapperFactoryFn;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link AstraDbToBigQuery} pipeline is a batch pipeline which ingests data from AstraDB and
 * outputs the resulting records to BigQuery.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/astradb-to-bigquery/README_AstraDB_to_BigQuery.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "AstraDB_To_BigQuery",
    category = TemplateCategory.BATCH,
    displayName = "AstraDB to BigQuery",
    description = {
      "The AstraDB to BigQuery template is a batch pipeline that reads records from AstraDB and writes them to BigQuery.",
      "If the destination table doesn't exist in BigQuery, the pipeline creates a table with the following values:\n"
          + "- The `Dataset ID` is inherited from the Cassandra keyspace.\n"
          + "- The `Table ID` is inherited from the Cassandra table.\n",
      "The schema of the destination table is inferred from the source Cassandra table.\n"
          + "- `List` and `Set` are mapped to BigQuery `REPEATED` fields.\n"
          + "- `Map` are mapped to BigQuery `RECORD` fields.\n"
          + "- All other types are mapped to BigQuery fields with the corresponding types.\n"
          + "- Cassandra user-defined types (UDTs) and tuple data types are not supported."
    },
    optionsClass = AstraDbToBigQuery.Options.class,
    flexContainerName = "astradb-to-bigquery",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/astradb-to-bigquery",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {"AstraDB account with a token"})
public class AstraDbToBigQuery {

  /** Logger for the class. */
  private static final Logger LOGGER = LoggerFactory.getLogger(AstraDbToBigQuery.class);

  /** If not provided, it is the default token range value. */
  public static final int DEFAULT_TOKEN_RANGE = 18;

  /**
   * Options for the sample
   *
   * <p>Inherits standard configuration options.
   */
  public interface Options
      extends PipelineOptions,
          AstraDbToBigQueryOptions.AstraDbSourceOptions,
          AstraDbToBigQueryOptions.BigQueryWriteOptions {}

  /** Main operations. */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();
    LOGGER.info("Starting pipeline");

    try {

      Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
      LOGGER.debug("Pipeline Arguments (options) validated");

      // --------------------------------
      // AstraDbIO.Read<Row>
      // --------------------------------

      // Credentials are read from secrets manager
      AbstractMap.SimpleImmutableEntry<String, byte[]> astraCredentials =
          parseAstraCredentials(options);
      LOGGER.debug("Astra Credentials parsed");

      // Map Cassandra Table Schema into BigQuery Table Schema
      SerializableFunction<AstraDbIO.Read<?>, TableSchema> bigQuerySchemaFactory =
          new AstraDbToBigQueryMappingFn(options.getAstraKeyspace(), options.getAstraTable());
      LOGGER.debug("Schema Mapper has been initialized");

      // Map Cassandra Rows into (Apache) Beam Rows (DATA)
      SerializableFunction<CqlSession, AstraDbMapper<Row>> beamRowMapperFactory =
          new BeamRowDbMapperFactoryFn(options.getAstraKeyspace(), options.getAstraTable());
      LOGGER.debug("Row Mapper has been initialized");

      // Distribute reads across all available Cassandra nodes
      int minimalTokenRangesCount =
          (options.getMinTokenRangesCount() == null)
              ? DEFAULT_TOKEN_RANGE
              : options.getMinTokenRangesCount();

      // Source: AstraDb
      AstraDbIO.Read<Row> astraSource =
          AstraDbIO.<Row>read()
              .withToken(astraCredentials.getKey())
              .withSecureConnectBundle(astraCredentials.getValue())
              .withKeyspace(options.getAstraKeyspace())
              .withTable(options.getAstraTable())
              .withMinNumberOfSplits(minimalTokenRangesCount)
              .withMapperFactoryFn(beamRowMapperFactory)
              .withCoder(SerializableCoder.of(Row.class))
              .withEntity(Row.class);
      LOGGER.debug("AstraDb Source initialization [OK]");

      // --------------------------------
      //  BigQueryIO.Write<Row>
      // --------------------------------

      TableReference bqTableRef = parseBigQueryDestinationTable(options);
      createBigQueryDestinationTableIfNotExist(options, bqTableRef);
      LOGGER.debug("BigQuery Sink Table has been initialized");

      // Sink: BigQuery
      BigQueryIO.Write<Row> bigQuerySink =
          BigQueryIO.<Row>write()
              .to(bqTableRef)
              // Specialized function reading cassandra source table and mapping to BigQuery Schema
              .withSchema(bigQuerySchemaFactory.apply(astraSource))
              // Provided by google, convert a Beam Row to a BigQuery TableRow
              .withFormatFunction(row -> row != null ? BigQueryUtils.toTableRow(row) : null)
              // Table Will be created if not exist
              .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
              .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
      LOGGER.debug("BigQuery Sink initialization [OK]");

      // --------------------------------
      //  Pipeline
      // --------------------------------

      Pipeline astraDbToBigQueryPipeline = Pipeline.create(options);
      astraDbToBigQueryPipeline
          .apply("Read From Astra", astraSource)
          .apply("Write To BigQuery", bigQuerySink);
      astraDbToBigQueryPipeline.run();

    } finally {
      // Cassandra Connection is stateful and needs to be closed
      CqlSessionHolder.cleanup();
    }
  }

  /**
   * Parse Astra Credentials from secrets in secret Manager. - SecretManagerUtils is not used as
   * only applied to String secrets
   *
   * @param options pipeline options
   * @return a pair with the token and the secure bundle
   */
  private static AbstractMap.SimpleImmutableEntry<String, byte[]> parseAstraCredentials(
      Options options) {

    String astraToken = options.getAstraToken();
    if (!astraToken.startsWith("AstraCS")) {
      astraToken = SecretManagerUtils.getSecret(options.getAstraToken());
    }
    LOGGER.info("Astra Token is parsed, value={}", astraToken.substring(0, 10) + "...");
    /*
     * Accessing the devops Api to retrieve the secure bundle.
     */
    DatabaseClient astraDbClient = new DatabaseClient(astraToken, options.getAstraDatabaseId());
    if (!astraDbClient.exist()) {
      throw new RuntimeException(
          "Astra Database does not exist, please check your Astra Token and Database ID");
    }
    byte[] astraSecureBundle = astraDbClient.downloadDefaultSecureConnectBundle();
    if (!StringUtils.isEmpty(options.getAstraDatabaseRegion())) {
      astraSecureBundle =
          astraDbClient.downloadSecureConnectBundle(options.getAstraDatabaseRegion());
    }
    LOGGER.info("Astra Bundle is parsed, length={}", astraSecureBundle.length);
    return new AbstractMap.SimpleImmutableEntry<>(astraToken, astraSecureBundle);
  }

  /**
   * Create the Bog Query table Reference (provided or based on Cassandra table name).
   *
   * @param options pipeline options
   * @return the big query table reference
   */
  private static TableReference parseBigQueryDestinationTable(Options options) {
    /*
     * bigQueryOutputTableSpec argument is the Big Query table specification. This is parameter
     * is optional. If not set, the table specification is built from the cassandra source table
     * attributes: keyspace=dataset name, table=table name.
     */
    String bigQueryOutputTableSpec = options.getOutputTableSpec();
    if (StringUtils.isEmpty(bigQueryOutputTableSpec)) {
      bigQueryOutputTableSpec =
          options.getProject() + ":" + options.getAstraKeyspace() + "." + options.getAstraTable();
    }
    TableReference bigQueryTableReference = BigQueryUtils.toTableReference(bigQueryOutputTableSpec);
    LOGGER.info("Big Query table spec has been set to {}", bigQueryOutputTableSpec);
    return bigQueryTableReference;
  }

  /**
   * Create destination dataset and tables if needed (schema mapped from Cassandra).
   *
   * @param options pipeline options
   * @param bqTableRef big query table reference
   */
  private static void createBigQueryDestinationTableIfNotExist(
      Options options, TableReference bqTableRef) {
    BigQuery bigquery =
        BigQueryOptions.newBuilder().setProjectId(options.getProject()).build().getService();
    if (null
        == bigquery.getDataset(
            DatasetId.of(bqTableRef.getProjectId(), bqTableRef.getDatasetId()))) {
      LOGGER.info(
          "Dataset was not found: creating DataSet {} in region {}",
          bqTableRef.getDatasetId(),
          options.getWorkerRegion());
      bigquery.create(
          DatasetInfo.newBuilder(bqTableRef.getDatasetId())
              .setLocation(options.getWorkerRegion())
              .build());
      LOGGER.debug("Dataset has been created [OK]");
    } else {
      LOGGER.info("Dataset {} already exist", bqTableRef.getDatasetId());
    }
  }
}

¿Qué sigue?