Tetap teratur dengan koleksi
Simpan dan kategorikan konten berdasarkan preferensi Anda.
Template AstraDB ke BigQuery adalah pipeline batch yang membaca data dari AstraDB dan menulisnya ke BigQuery.
Jika tabel tujuan tidak ada di BigQuery, pipeline akan membuat
tabel dengan nilai berikut:
Dataset ID, yang diwarisi dari keyspace Cassandra.
Table ID, yang diwarisi dari tabel Cassandra.
Skema tabel tujuan disimpulkan dari tabel Cassandra sumber.
List dan Set dipetakan ke kolom REPEATED BigQuery.
Map dipetakan ke kolom RECORD BigQuery.
Semua jenis lainnya dipetakan ke kolom BigQuery dengan jenis yang sesuai.
Jenis data tuple dan jenis yang ditentukan pengguna Cassandra tidak didukung.
Persyaratan pipeline
Akun AstraDB dengan token
Parameter template
Parameter
Deskripsi
astraToken
Nilai token atau ID resource rahasia. Misalnya: AstraCS:abcdefghij.
astraDatabaseId
ID unik database (uuid). Misalnya: cf7af129-d33a-498f-ad06-d97a6ee6eb7.
astraKeyspace
Nama keyspace Cassandra dalam database Astra.
astraTable
Nama tabel dalam database Cassandra. Misalnya: my_table.
astraQuery
Opsional: Buat kueri untuk memfilter baris, bukan membaca seluruh tabel.
astraDatabaseRegion
Opsional: Jika tidak disediakan, setelan default akan dipilih, yang berguna untuk database multi-region.
minTokenRangesCount
Opsional: Jumlah bagian minimal untuk mendistribusikan kueri.
outputTableSpec
Opsional: Lokasi tabel BigQuery yang akan menjadi tujuan penulisan output. Tabel harus dalam format
<project>:<dataset>.<table_name>. Skema tabel harus cocok dengan objek input.
nama versi, seperti 2023-09-12-00_RC00, untuk menggunakan versi template
tertentu, yang dapat ditemukan bertingkat di folder induk bertanggal masing-masing dalam bucket—
gs://dataflow-templates-REGION_NAME/
REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow, misalnya us-central1
ASTRA_TOKEN: token Astra
ASTRA_DATABASE_ID: ID database
ASTRA_KEYSPACE: keyspace Cassandra
ASTRA_TABLE: tabel Cassandra
API
Untuk menjalankan template menggunakan REST API, kirim permintaan HTTP POST. Untuk informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.
nama versi, seperti 2023-09-12-00_RC00, untuk menggunakan versi template
tertentu, yang dapat ditemukan bertingkat di folder induk bertanggal masing-masing dalam bucket—
gs://dataflow-templates-REGION_NAME/
LOCATION: region tempat Anda ingin men-deploy tugas Dataflow, misalnya us-central1
ASTRA_TOKEN: token Astra
ASTRA_DATABASE_ID: ID database
ASTRA_KEYSPACE: keyspace Cassandra
ASTRA_TABLE: tabel Cassandra
Kode sumber template
Java
/*
* Copyright (C) 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.astradb.templates;
import com.datastax.oss.driver.api.core.CqlSession;
import com.dtsx.astra.sdk.db.DatabaseClient;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.astradb.options.AstraDbToBigQueryOptions;
import com.google.cloud.teleport.v2.astradb.transforms.AstraDbToBigQueryMappingFn;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.utils.SecretManagerUtils;
import java.util.AbstractMap;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.astra.db.AstraDbIO;
import org.apache.beam.sdk.io.astra.db.CqlSessionHolder;
import org.apache.beam.sdk.io.astra.db.mapping.AstraDbMapper;
import org.apache.beam.sdk.io.astra.db.mapping.BeamRowDbMapperFactoryFn;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link AstraDbToBigQuery} pipeline is a batch pipeline which ingests data from AstraDB and
* outputs the resulting records to BigQuery.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/astradb-to-bigquery/README_AstraDB_to_BigQuery.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "AstraDB_To_BigQuery",
category = TemplateCategory.BATCH,
displayName = "AstraDB to BigQuery",
description = {
"The AstraDB to BigQuery template is a batch pipeline that reads records from AstraDB and writes them to BigQuery.",
"If the destination table doesn't exist in BigQuery, the pipeline creates a table with the following values:\n"
+ "- The `Dataset ID` is inherited from the Cassandra keyspace.\n"
+ "- The `Table ID` is inherited from the Cassandra table.\n",
"The schema of the destination table is inferred from the source Cassandra table.\n"
+ "- `List` and `Set` are mapped to BigQuery `REPEATED` fields.\n"
+ "- `Map` are mapped to BigQuery `RECORD` fields.\n"
+ "- All other types are mapped to BigQuery fields with the corresponding types.\n"
+ "- Cassandra user-defined types (UDTs) and tuple data types are not supported."
},
optionsClass = AstraDbToBigQuery.Options.class,
flexContainerName = "astradb-to-bigquery",
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/astradb-to-bigquery",
contactInformation = "https://cloud.google.com/support",
preview = true,
requirements = {"AstraDB account with a token"})
public class AstraDbToBigQuery {
/** Logger for the class. */
private static final Logger LOGGER = LoggerFactory.getLogger(AstraDbToBigQuery.class);
/** If not provided, it is the default token range value. */
public static final int DEFAULT_TOKEN_RANGE = 18;
/**
* Options for the sample
*
* <p>Inherits standard configuration options.
*/
public interface Options
extends PipelineOptions,
AstraDbToBigQueryOptions.AstraDbSourceOptions,
AstraDbToBigQueryOptions.BigQueryWriteOptions {}
/** Main operations. */
public static void main(String[] args) {
UncaughtExceptionLogger.register();
LOGGER.info("Starting pipeline");
try {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
LOGGER.debug("Pipeline Arguments (options) validated");
// --------------------------------
// AstraDbIO.Read<Row>
// --------------------------------
// Credentials are read from secrets manager
AbstractMap.SimpleImmutableEntry<String, byte[]> astraCredentials =
parseAstraCredentials(options);
LOGGER.debug("Astra Credentials parsed");
// Map Cassandra Table Schema into BigQuery Table Schema
SerializableFunction<AstraDbIO.Read<?>, TableSchema> bigQuerySchemaFactory =
new AstraDbToBigQueryMappingFn(options.getAstraKeyspace(), options.getAstraTable());
LOGGER.debug("Schema Mapper has been initialized");
// Map Cassandra Rows into (Apache) Beam Rows (DATA)
SerializableFunction<CqlSession, AstraDbMapper<Row>> beamRowMapperFactory =
new BeamRowDbMapperFactoryFn(options.getAstraKeyspace(), options.getAstraTable());
LOGGER.debug("Row Mapper has been initialized");
// Distribute reads across all available Cassandra nodes
int minimalTokenRangesCount =
(options.getMinTokenRangesCount() == null)
? DEFAULT_TOKEN_RANGE
: options.getMinTokenRangesCount();
// Source: AstraDb
AstraDbIO.Read<Row> astraSource =
AstraDbIO.<Row>read()
.withToken(astraCredentials.getKey())
.withSecureConnectBundle(astraCredentials.getValue())
.withKeyspace(options.getAstraKeyspace())
.withTable(options.getAstraTable())
.withMinNumberOfSplits(minimalTokenRangesCount)
.withMapperFactoryFn(beamRowMapperFactory)
.withCoder(SerializableCoder.of(Row.class))
.withEntity(Row.class);
LOGGER.debug("AstraDb Source initialization [OK]");
// --------------------------------
// BigQueryIO.Write<Row>
// --------------------------------
TableReference bqTableRef = parseBigQueryDestinationTable(options);
createBigQueryDestinationTableIfNotExist(options, bqTableRef);
LOGGER.debug("BigQuery Sink Table has been initialized");
// Sink: BigQuery
BigQueryIO.Write<Row> bigQuerySink =
BigQueryIO.<Row>write()
.to(bqTableRef)
// Specialized function reading cassandra source table and mapping to BigQuery Schema
.withSchema(bigQuerySchemaFactory.apply(astraSource))
// Provided by google, convert a Beam Row to a BigQuery TableRow
.withFormatFunction(row -> row != null ? BigQueryUtils.toTableRow(row) : null)
// Table Will be created if not exist
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
LOGGER.debug("BigQuery Sink initialization [OK]");
// --------------------------------
// Pipeline
// --------------------------------
Pipeline astraDbToBigQueryPipeline = Pipeline.create(options);
astraDbToBigQueryPipeline
.apply("Read From Astra", astraSource)
.apply("Write To BigQuery", bigQuerySink);
astraDbToBigQueryPipeline.run();
} finally {
// Cassandra Connection is stateful and needs to be closed
CqlSessionHolder.cleanup();
}
}
/**
* Parse Astra Credentials from secrets in secret Manager. - SecretManagerUtils is not used as
* only applied to String secrets
*
* @param options pipeline options
* @return a pair with the token and the secure bundle
*/
private static AbstractMap.SimpleImmutableEntry<String, byte[]> parseAstraCredentials(
Options options) {
String astraToken = options.getAstraToken();
if (!astraToken.startsWith("AstraCS")) {
astraToken = SecretManagerUtils.getSecret(options.getAstraToken());
}
LOGGER.info("Astra Token is parsed, value={}", astraToken.substring(0, 10) + "...");
/*
* Accessing the devops Api to retrieve the secure bundle.
*/
DatabaseClient astraDbClient = new DatabaseClient(astraToken, options.getAstraDatabaseId());
if (!astraDbClient.exist()) {
throw new RuntimeException(
"Astra Database does not exist, please check your Astra Token and Database ID");
}
byte[] astraSecureBundle = astraDbClient.downloadDefaultSecureConnectBundle();
if (!StringUtils.isEmpty(options.getAstraDatabaseRegion())) {
astraSecureBundle =
astraDbClient.downloadSecureConnectBundle(options.getAstraDatabaseRegion());
}
LOGGER.info("Astra Bundle is parsed, length={}", astraSecureBundle.length);
return new AbstractMap.SimpleImmutableEntry<>(astraToken, astraSecureBundle);
}
/**
* Create the Bog Query table Reference (provided or based on Cassandra table name).
*
* @param options pipeline options
* @return the big query table reference
*/
private static TableReference parseBigQueryDestinationTable(Options options) {
/*
* bigQueryOutputTableSpec argument is the Big Query table specification. This is parameter
* is optional. If not set, the table specification is built from the cassandra source table
* attributes: keyspace=dataset name, table=table name.
*/
String bigQueryOutputTableSpec = options.getOutputTableSpec();
if (StringUtils.isEmpty(bigQueryOutputTableSpec)) {
bigQueryOutputTableSpec =
options.getProject() + ":" + options.getAstraKeyspace() + "." + options.getAstraTable();
}
TableReference bigQueryTableReference = BigQueryUtils.toTableReference(bigQueryOutputTableSpec);
LOGGER.info("Big Query table spec has been set to {}", bigQueryOutputTableSpec);
return bigQueryTableReference;
}
/**
* Create destination dataset and tables if needed (schema mapped from Cassandra).
*
* @param options pipeline options
* @param bqTableRef big query table reference
*/
private static void createBigQueryDestinationTableIfNotExist(
Options options, TableReference bqTableRef) {
BigQuery bigquery =
BigQueryOptions.newBuilder().setProjectId(options.getProject()).build().getService();
if (null
== bigquery.getDataset(
DatasetId.of(bqTableRef.getProjectId(), bqTableRef.getDatasetId()))) {
LOGGER.info(
"Dataset was not found: creating DataSet {} in region {}",
bqTableRef.getDatasetId(),
options.getWorkerRegion());
bigquery.create(
DatasetInfo.newBuilder(bqTableRef.getDatasetId())
.setLocation(options.getWorkerRegion())
.build());
LOGGER.debug("Dataset has been created [OK]");
} else {
LOGGER.info("Dataset {} already exist", bqTableRef.getDatasetId());
}
}
}
[[["Mudah dipahami","easyToUnderstand","thumb-up"],["Memecahkan masalah saya","solvedMyProblem","thumb-up"],["Lainnya","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["Masalah terjemahan","translationIssue","thumb-down"],["Lainnya","otherDown","thumb-down"]],["Terakhir diperbarui pada 2024-03-29 UTC."],[],[]]