Apache Cassandra에서 기본 키는 데이터 정의 언어를 사용하여 정의됩니다. 기본 키는 단순, 복합 또는 클러스터링 열이 포함된 혼합 키 중 하나입니다.
Bigtable은 바이트 배열에 사전순으로 정렬된 수동 row-key 생성을 지원합니다.
파이프라인은 키 유형에 대한 정보를 자동으로 수집하고 여러 값을 기반으로 row-key를 빌드하는 권장사항에 따라 키를 구축합니다.
CASSANDRA_HOSTS: Apache Cassandra 호스트 목록. 여러 호스트가 제공된 경우 쉼표를 이스케이프하는 방법에 대한 안내를 따르세요.
CASSANDRA_KEYSPACE: 테이블이 있는 Apache Cassandra 키스페이스
CASSANDRA_TABLE: 마이그레이션해야 하는 Apache Cassandra 테이블
템플릿 소스 코드
Java
/*
* Copyright (C) 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.bigtable;
import com.datastax.driver.core.Session;
import com.google.cloud.teleport.bigtable.CassandraToBigtable.Options;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.cassandra.CassandraIO;
import org.apache.beam.sdk.io.cassandra.Mapper;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
/**
* This Dataflow Template performs a one off copy of one table from Apache Cassandra to Cloud
* Bigtable. It is designed to require minimal configuration and aims to replicate the table
* structure in Cassandra as closely as possible in Cloud Bigtable.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cassandra_To_Cloud_Bigtable.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "Cassandra_To_Cloud_Bigtable",
category = TemplateCategory.BATCH,
displayName = "Cassandra to Cloud Bigtable",
description = {
"The Apache Cassandra to Cloud Bigtable template copies a table from Apache Cassandra to Cloud Bigtable. "
+ "This template requires minimal configuration and replicates the table structure in Cassandra as closely as possible in Cloud Bigtable.",
"The Apache Cassandra to Cloud Bigtable template is useful for the following:\n"
+ "- Migrating Apache Cassandra database when short downtime is acceptable.\n"
+ "- Periodically replicating Cassandra tables to Cloud Bigtable for global serving."
},
optionsClass = Options.class,
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/cassandra-to-bigtable",
contactInformation = "https://cloud.google.com/support",
requirements = {
"The target Bigtable table must exist before running the pipeline.",
"Network connection between Dataflow workers and Apache Cassandra nodes."
})
final class CassandraToBigtable {
/** TODO - refactor to extend BigtableCommonOptions.WriteOptions. */
public interface Options extends PipelineOptions {
@TemplateParameter.Text(
order = 1,
regexes = {"^[a-zA-Z0-9\\.\\-,]*$"},
description = "Cassandra Hosts",
helpText = "The hosts of the Apache Cassandra nodes in a comma-separated list.")
ValueProvider<String> getCassandraHosts();
@SuppressWarnings("unused")
void setCassandraHosts(ValueProvider<String> hosts);
@TemplateParameter.Integer(
order = 2,
optional = true,
description = "Cassandra Port",
helpText =
"The TCP port to use to reach Apache Cassandra on the nodes. The default value is 9042.")
@Default.Integer(9042)
ValueProvider<Integer> getCassandraPort();
@SuppressWarnings("unused")
void setCassandraPort(ValueProvider<Integer> port);
@TemplateParameter.Text(
order = 3,
regexes = {"^[a-zA-Z0-9][a-zA-Z0-9_]{0,47}$"},
description = "Cassandra Keyspace",
helpText = "The Apache Cassandra keyspace where the table is located.")
ValueProvider<String> getCassandraKeyspace();
@SuppressWarnings("unused")
void setCassandraKeyspace(ValueProvider<String> keyspace);
@TemplateParameter.Text(
order = 4,
regexes = {"^[a-zA-Z][a-zA-Z0-9_]*$"},
description = "Cassandra Table",
helpText = "The Apache Cassandra table to copy.")
ValueProvider<String> getCassandraTable();
@SuppressWarnings("unused")
void setCassandraTable(ValueProvider<String> cassandraTable);
@TemplateParameter.ProjectId(
order = 5,
description = "Bigtable Project ID",
helpText = "The Google Cloud project ID associated with the Bigtable instance.")
ValueProvider<String> getBigtableProjectId();
@SuppressWarnings("unused")
void setBigtableProjectId(ValueProvider<String> projectId);
@TemplateParameter.Text(
order = 6,
regexes = {"[a-z][a-z0-9\\-]+[a-z0-9]"},
description = "Target Bigtable Instance",
helpText = "The ID of the Bigtable instance that the Apache Cassandra table is copied to.")
ValueProvider<String> getBigtableInstanceId();
@SuppressWarnings("unused")
void setBigtableInstanceId(ValueProvider<String> bigtableInstanceId);
@TemplateParameter.Text(
order = 7,
regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
description = "Target Bigtable Table",
helpText = "The name of the Bigtable table that the Apache Cassandra table is copied to.")
ValueProvider<String> getBigtableTableId();
@SuppressWarnings("unused")
void setBigtableTableId(ValueProvider<String> bigtableTableId);
@TemplateParameter.Text(
order = 8,
optional = true,
regexes = {"[-_.a-zA-Z0-9]+"},
description = "The Default Bigtable Column Family",
helpText =
"The name of the column family of the Bigtable table. The default value is default.")
@Default.String("default")
ValueProvider<String> getDefaultColumnFamily();
@SuppressWarnings("unused")
void setDefaultColumnFamily(ValueProvider<String> defaultColumnFamily);
@TemplateParameter.Text(
order = 9,
optional = true,
description = "The Row Key Separator",
helpText = "The separator used to build row-keys. The default value is '#'.")
@Default.String("#")
ValueProvider<String> getRowKeySeparator();
@SuppressWarnings("unused")
void setRowKeySeparator(ValueProvider<String> rowKeySeparator);
@TemplateParameter.Boolean(
order = 10,
optional = true,
description = "If true, large rows will be split into multiple MutateRows requests",
helpText =
"The flag for enabling splitting of large rows into multiple MutateRows requests. Note that when a large row is split between multiple API calls, the updates to the row are not atomic. ")
ValueProvider<Boolean> getSplitLargeRows();
void setSplitLargeRows(ValueProvider<Boolean> splitLargeRows);
}
/**
* Runs a pipeline to copy one Cassandra table to Cloud Bigtable.
*
* @param args arguments to the pipeline
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
// Split the Cassandra Hosts value provider into a list value provider.
ValueProvider.NestedValueProvider<List<String>, String> hosts =
ValueProvider.NestedValueProvider.of(
options.getCassandraHosts(),
(SerializableFunction<String, List<String>>) value -> Arrays.asList(value.split(",")));
Pipeline p = Pipeline.create(PipelineUtils.tweakPipelineOptions(options));
// Create a factory method to inject the CassandraRowMapperFn to allow custom type mapping.
SerializableFunction<Session, Mapper> cassandraObjectMapperFactory =
new CassandraRowMapperFactory(options.getCassandraTable(), options.getCassandraKeyspace());
CassandraIO.Read<Row> source =
CassandraIO.<Row>read()
.withHosts(hosts)
.withPort(options.getCassandraPort())
.withKeyspace(options.getCassandraKeyspace())
.withTable(options.getCassandraTable())
.withMapperFactoryFn(cassandraObjectMapperFactory)
.withEntity(Row.class)
.withCoder(SerializableCoder.of(Row.class));
BigtableIO.Write sink =
BigtableIO.write()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId());
p.apply("Read from Cassandra", source)
.apply(
"Convert Row",
ParDo.of(
BeamRowToBigtableFn.createWithSplitLargeRows(
options.getRowKeySeparator(),
options.getDefaultColumnFamily(),
options.getSplitLargeRows(),
BeamRowToBigtableFn.MAX_MUTATION_PER_REQUEST)))
.apply("Write to Bigtable", sink);
p.run();
}
}