AstraDB to BigQuery テンプレート

AstraDB to BigQuery テンプレートは、AstraDB からレコードを読み取り、BigQuery に書き込むバッチ パイプラインです。

宛先テーブルが BigQuery に存在しない場合、パイプラインは次の値を持つテーブルを作成します。

  • Cassandra キースペースから継承される Dataset ID
  • Cassandra テーブルから継承される Table ID

宛先テーブルのスキーマは、ソース Cassandra テーブルから推定されます。

  • ListSet は BigQuery の REPEATED フィールドにマッピングされます。
  • Map は BigQuery の RECORD フィールドにマッピングされます。
  • 他のすべての型は、対応するタイプの BigQuery フィールドにマッピングされます。
  • Cassandra ユーザー定義タイプ(UDT)とタプルデータ タイプはサポートされていません。

パイプラインの要件

  • トークンを含む AstraDB アカウント

テンプレートのパラメータ

必須パラメータ

  • astraToken: トークン値またはシークレットのリソース ID(例: AstraCS:abcdefghij)。
  • astraDatabaseId: データベースの固有識別子(UUID)(例: cf7af129-d33a-498f-ad06-d97a6ee6eb7)。
  • astraKeyspace: Astra データベース内の Cassandra キースペースの名前。
  • astraTable: Cassandra データベース内のテーブルの名前(例: my_table)。

オプション パラメータ

  • astraQuery: テーブル全体を読み取る代わりに行をフィルタするクエリ。
  • astraDatabaseRegion: 指定しない場合はデフォルトが選択されます。これはマルチリージョン データベースで有用です。
  • minTokenRangesCount: クエリを分散するために使用するスプリットの最小数。
  • outputTableSpec: 出力を書き込む BigQuery テーブルの場所。形式 <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME> を使用します。テーブルのスキーマは、入力オブジェクトと一致する必要があります。

テンプレートを実行する

コンソールgcloudAPI
  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the AstraDB to BigQuery template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/AstraDB_To_BigQuery \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       astraToken=ASTRA_TOKEN,\
       astraDatabaseId=ASTRA_DATABASE_ID,\
       astraKeyspace=ASTRA_KEYSPACE,\
       astraTable=ASTRA_TABLE,\

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • ASTRA_TOKEN: Astra トークン
  • ASTRA_DATABASE_ID: データベースの識別子
  • ASTRA_KEYSPACE: Cassandra キースペース
  • ASTRA_TABLE: Cassandra テーブル

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "astraToken": "ASTRA_TOKEN",
       "astraDatabaseId": "ASTRA_DATABASE_ID",
       "astraKeyspace": "ASTRA_KEYSPACE",
       "astraTable": "ASTRA_TABLE",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/AstraDB_To_BigQuery",
     "environment": { "maxWorkers": "10" }
  }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • ASTRA_TOKEN: Astra トークン
  • ASTRA_DATABASE_ID: データベースの識別子
  • ASTRA_KEYSPACE: Cassandra キースペース
  • ASTRA_TABLE: Cassandra テーブル
Java
/*
 * Copyright (C) 2023 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.astradb.templates;

import com.datastax.oss.driver.api.core.CqlSession;
import com.dtsx.astra.sdk.db.DatabaseClient;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.astradb.options.AstraDbToBigQueryOptions;
import com.google.cloud.teleport.v2.astradb.transforms.AstraDbToBigQueryMappingFn;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.utils.SecretManagerUtils;
import java.util.AbstractMap;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.astra.db.AstraDbIO;
import org.apache.beam.sdk.io.astra.db.CqlSessionHolder;
import org.apache.beam.sdk.io.astra.db.mapping.AstraDbMapper;
import org.apache.beam.sdk.io.astra.db.mapping.BeamRowDbMapperFactoryFn;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link AstraDbToBigQuery} pipeline is a batch pipeline which ingests data from AstraDB and
 * outputs the resulting records to BigQuery.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/astradb-to-bigquery/README_AstraDB_to_BigQuery.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "AstraDB_To_BigQuery",
    category = TemplateCategory.BATCH,
    displayName = "AstraDB to BigQuery",
    description = {
      "The AstraDB to BigQuery template is a batch pipeline that reads records from AstraDB and writes them to BigQuery.",
      "If the destination table doesn't exist in BigQuery, the pipeline creates a table with the following values:\n"
          + "- The `Dataset ID` is inherited from the Cassandra keyspace.\n"
          + "- The `Table ID` is inherited from the Cassandra table.\n",
      "The schema of the destination table is inferred from the source Cassandra table.\n"
          + "- `List` and `Set` are mapped to BigQuery `REPEATED` fields.\n"
          + "- `Map` are mapped to BigQuery `RECORD` fields.\n"
          + "- All other types are mapped to BigQuery fields with the corresponding types.\n"
          + "- Cassandra user-defined types (UDTs) and tuple data types are not supported."
    },
    optionsClass = AstraDbToBigQuery.Options.class,
    flexContainerName = "astradb-to-bigquery",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/astradb-to-bigquery",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {"AstraDB account with a token"})
public class AstraDbToBigQuery {

  /** Logger for the class. */
  private static final Logger LOGGER = LoggerFactory.getLogger(AstraDbToBigQuery.class);

  /** If not provided, it is the default token range value. */
  public static final int DEFAULT_TOKEN_RANGE = 18;

  /**
   * Options for the sample
   *
   * <p>Inherits standard configuration options.
   */
  public interface Options
      extends PipelineOptions,
          AstraDbToBigQueryOptions.AstraDbSourceOptions,
          AstraDbToBigQueryOptions.BigQueryWriteOptions {}

  /** Main operations. */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();
    LOGGER.info("Starting pipeline");

    try {

      Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
      LOGGER.debug("Pipeline Arguments (options) validated");

      // --------------------------------
      // AstraDbIO.Read<Row>
      // --------------------------------

      // Credentials are read from secrets manager
      AbstractMap.SimpleImmutableEntry<String, byte[]> astraCredentials =
          parseAstraCredentials(options);
      LOGGER.debug("Astra Credentials parsed");

      // Map Cassandra Table Schema into BigQuery Table Schema
      SerializableFunction<AstraDbIO.Read<?>, TableSchema> bigQuerySchemaFactory =
          new AstraDbToBigQueryMappingFn(options.getAstraKeyspace(), options.getAstraTable());
      LOGGER.debug("Schema Mapper has been initialized");

      // Map Cassandra Rows into (Apache) Beam Rows (DATA)
      SerializableFunction<CqlSession, AstraDbMapper<Row>> beamRowMapperFactory =
          new BeamRowDbMapperFactoryFn(options.getAstraKeyspace(), options.getAstraTable());
      LOGGER.debug("Row Mapper has been initialized");

      // Distribute reads across all available Cassandra nodes
      int minimalTokenRangesCount =
          (options.getMinTokenRangesCount() == null)
              ? DEFAULT_TOKEN_RANGE
              : options.getMinTokenRangesCount();

      // Source: AstraDb
      AstraDbIO.Read<Row> astraSource =
          AstraDbIO.<Row>read()
              .withToken(astraCredentials.getKey())
              .withSecureConnectBundle(astraCredentials.getValue())
              .withKeyspace(options.getAstraKeyspace())
              .withTable(options.getAstraTable())
              .withMinNumberOfSplits(minimalTokenRangesCount)
              .withMapperFactoryFn(beamRowMapperFactory)
              .withCoder(SerializableCoder.of(Row.class))
              .withEntity(Row.class);
      LOGGER.debug("AstraDb Source initialization [OK]");

      // --------------------------------
      //  BigQueryIO.Write<Row>
      // --------------------------------

      TableReference bqTableRef = parseBigQueryDestinationTable(options);
      createBigQueryDestinationTableIfNotExist(options, bqTableRef);
      LOGGER.debug("BigQuery Sink Table has been initialized");

      // Sink: BigQuery
      BigQueryIO.Write<Row> bigQuerySink =
          BigQueryIO.<Row>write()
              .to(bqTableRef)
              // Specialized function reading cassandra source table and mapping to BigQuery Schema
              .withSchema(bigQuerySchemaFactory.apply(astraSource))
              // Provided by google, convert a Beam Row to a BigQuery TableRow
              .withFormatFunction(row -> row != null ? BigQueryUtils.toTableRow(row) : null)
              // Table Will be created if not exist
              .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
              .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
      LOGGER.debug("BigQuery Sink initialization [OK]");

      // --------------------------------
      //  Pipeline
      // --------------------------------

      Pipeline astraDbToBigQueryPipeline = Pipeline.create(options);
      astraDbToBigQueryPipeline
          .apply("Read From Astra", astraSource)
          .apply("Write To BigQuery", bigQuerySink);
      astraDbToBigQueryPipeline.run();

    } finally {
      // Cassandra Connection is stateful and needs to be closed
      CqlSessionHolder.cleanup();
    }
  }

  /**
   * Parse Astra Credentials from secrets in secret Manager. - SecretManagerUtils is not used as
   * only applied to String secrets
   *
   * @param options pipeline options
   * @return a pair with the token and the secure bundle
   */
  private static AbstractMap.SimpleImmutableEntry<String, byte[]> parseAstraCredentials(
      Options options) {

    String astraToken = options.getAstraToken();
    if (!astraToken.startsWith("AstraCS")) {
      astraToken = SecretManagerUtils.getSecret(options.getAstraToken());
    }
    LOGGER.info("Astra Token is parsed, value={}", astraToken.substring(0, 10) + "...");
    /*
     * Accessing the devops Api to retrieve the secure bundle.
     */
    DatabaseClient astraDbClient = new DatabaseClient(astraToken, options.getAstraDatabaseId());
    if (!astraDbClient.exist()) {
      throw new RuntimeException(
          "Astra Database does not exist, please check your Astra Token and Database ID");
    }
    byte[] astraSecureBundle = astraDbClient.downloadDefaultSecureConnectBundle();
    if (!StringUtils.isEmpty(options.getAstraDatabaseRegion())) {
      astraSecureBundle =
          astraDbClient.downloadSecureConnectBundle(options.getAstraDatabaseRegion());
    }
    LOGGER.info("Astra Bundle is parsed, length={}", astraSecureBundle.length);
    return new AbstractMap.SimpleImmutableEntry<>(astraToken, astraSecureBundle);
  }

  /**
   * Create the Bog Query table Reference (provided or based on Cassandra table name).
   *
   * @param options pipeline options
   * @return the big query table reference
   */
  private static TableReference parseBigQueryDestinationTable(Options options) {
    /*
     * bigQueryOutputTableSpec argument is the Big Query table specification. This is parameter
     * is optional. If not set, the table specification is built from the cassandra source table
     * attributes: keyspace=dataset name, table=table name.
     */
    String bigQueryOutputTableSpec = options.getOutputTableSpec();
    if (StringUtils.isEmpty(bigQueryOutputTableSpec)) {
      bigQueryOutputTableSpec =
          options.getProject() + ":" + options.getAstraKeyspace() + "." + options.getAstraTable();
    }
    TableReference bigQueryTableReference = BigQueryUtils.toTableReference(bigQueryOutputTableSpec);
    LOGGER.info("Big Query table spec has been set to {}", bigQueryOutputTableSpec);
    return bigQueryTableReference;
  }

  /**
   * Create destination dataset and tables if needed (schema mapped from Cassandra).
   *
   * @param options pipeline options
   * @param bqTableRef big query table reference
   */
  private static void createBigQueryDestinationTableIfNotExist(
      Options options, TableReference bqTableRef) {
    BigQuery bigquery =
        BigQueryOptions.newBuilder().setProjectId(options.getProject()).build().getService();
    if (null
        == bigquery.getDataset(
            DatasetId.of(bqTableRef.getProjectId(), bqTableRef.getDatasetId()))) {
      LOGGER.info(
          "Dataset was not found: creating DataSet {} in region {}",
          bqTableRef.getDatasetId(),
          options.getWorkerRegion());
      bigquery.create(
          DatasetInfo.newBuilder(bqTableRef.getDatasetId())
              .setLocation(options.getWorkerRegion())
              .build());
      LOGGER.debug("Dataset has been created [OK]");
    } else {
      LOGGER.info("Dataset {} already exist", bqTableRef.getDatasetId());
    }
  }
}

次のステップ