Datastream to Spanner 模板是一种流处理流水线,可从 Cloud Storage 存储桶中读取 Datastream 事件并将其写入 Spanner 数据库。它适用于从 Datastream 来源到 Spanner 的数据迁移。
在执行模板之前,迁移所需的所有表必须存在于目标 Spanner 数据库中。因此,在数据迁移之前,必须完成从源数据库到目标 Spanner 的架构迁移。在迁移之前,数据可能存在表中。此模板不会将 Datastream 架构更改传播到 Spanner 数据库。
只有在所有数据都写入 Spanner 后,才能在迁移结束时保证数据一致性。为了存储写入 Spanner 的每个记录的排序信息,此模板为 Spanner 数据库中的每个表创建了一个额外的表(称为影子表)。这用于确保迁移结束时的一致性。影子表在迁移后不会被删除,可在迁移结束时用于进行验证。
操作期间发生的任何错误(例如架构不匹配、JSON 文件格式错误或执行转换产生的错误)都会记录在错误队列中。错误队列是一个 Cloud Storage 文件夹,它以文本格式存储遇到错误的所有 Datastream 事件以及错误原因。这些错误可能是暂时性的,也可能是永久性的,它们存储在错误队列的相应 Cloud Storage 文件夹中。系统会自动重试暂时性错误,但不会自动重试永久性错误。如果发生永久性错误,您可以选择在模板运行期间更正更改事件,并将它们转移到可重试的存储桶。
流水线要求
- 处于正在运行或未启动状态的 Datastream 数据流。
- 要在其中复制 Datastream 事件的 Cloud Storage 存储桶。
- 包含现有表的 Spanner 数据库。这些表可以为空,也可以包含数据。
模板参数
必需参数
- instanceId:在其中复制更改的 Spanner 实例。
- databaseId:在其中复制更改的 Spanner 数据库。
可选参数
- inputFilePattern:包含要复制的 Datastream 文件的 Cloud Storage 文件位置。通常,这是数据流的根路径。我们已停用对此功能的支持。
- inputFileFormat:Datastream 生成的输出文件的格式。例如
avro,json
。默认值为avro
。 - sessionFilePath:Cloud Storage 中的会话文件路径,其中包含 HarbourBridge 的映射信息。
- projectId:Spanner 项目 ID。
- spannerHost:要在模板中调用的 Cloud Spanner 端点。例如
https://batch-spanner.googleapis.com
。默认值为:https://batch-spanner.googleapis.com。 - gcsPubSubSubscription:Cloud Storage 通知政策中使用的 Pub/Sub 订阅。对于名称,请使用
projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>
格式。 - streamName:用于轮询架构信息和来源类型的数据流的名称或模板。
- shadowTablePrefix:用于为影子表命名的前缀。默认值:
shadow_
。 - shouldCreateShadowTables:此标志指示是否必须在 Cloud Spanner 数据库中创建影子表。默认值为:true。
- rfcStartDateTime:用于从 Cloud Storage 中提取数据的起始日期时间 (https://tools.ietf.org/html/rfc3339)。默认值为:1970-01-01T00:00:00.00Z。
- fileReadConcurrency:要读取的并发 DataStream 文件的数量。默认值为:30。
- deadLetterQueueDirectory:存储错误队列输出时使用的文件路径。默认文件路径为 Dataflow 作业的临时位置下的目录。
- dlqRetryMinutes:死信队列重试之间的分钟数。默认值为
10
。 - dlqMaxRetryCount:可通过 DLQ 重试临时错误的次数上限。默认值为
500
。 - dataStreamRootUrl:Datastream API 根网址。默认值为 https://datastream.googleapis.com/。
- datastreamSourceType:这是 Datastream 连接到的源数据库的类型。示例:mysql/oracle。在测试时,如果没有实际运行的 Datastream,则需要进行设置。
- roundJsonDecimals:如果设置了此标志,则会将 json 列中的小数值四舍五入为可以在不损失精度的情况下存储的数字。默认值为:false。
- runMode:这是运行模式类型,无论是常规模式还是具有 retryDLQ 的模式。默认值为:regular。
- transformationContextFilePath:Cloud Storage 中的转换上下文文件路径,用于填充迁移期间执行转换时使用的数据。示例:用于标识从中迁移行的数据库的分片 ID 到数据库名称。
- directoryWatchDurationInMinutes:流水线应持续轮询 GCS 中某个目录的时长。Datastream 输出文件以目录结构进行排列,该目录结构描述了按分钟分组的事件时间戳。此参数应大致等于在源数据库中发生的事件与 Datastream 写入 GCS 的同一事件之间可能出现的最长延迟时间。第 99.9 百分位 = 10 分钟。默认值为:10。
- spannerPriority:Cloud Spanner 调用的请求优先级。值必须为 [
HIGH
,MEDIUM
,LOW
] 之一。默认为HIGH
。 - dlqGcsPubSubSubscription:在常规模式下运行时,Cloud Storage 通知政策中用于 DLQ 重试目录的 Pub/Sub 订阅。对于名称,请使用
projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>
格式。设置后,系统会忽略 deadLetterQueueDirectory 和 dlqRetryMinutes。 - transformationJarPath:Cloud Storage 中的自定义 JAR 文件位置,其中包含用于处理正向迁移中的记录的自定义转换逻辑。默认值为空。
- transformationClassName:包含自定义转换逻辑的完全限定类名称。如果指定了 transformationJarPath,则这是必填字段。默认值为空。
- transformationCustomParameters:包含要传递给自定义转换类的任何自定义参数的字符串。默认值为空。
- filteredEventsDirectory:这是用于存储通过自定义转换过滤的事件的文件路径。默认值为 Dataflow 作业的临时位置下的目录。在大多数情况下,默认值就可以了。
- shardingContextFilePath:Cloud Storage 中的分片上下文文件路径,用于为每个源分片在 Spanner 数据库中填充分片 ID。其格式为 Map<stream_name, Map<db_name, shard_id>>。
- tableOverrides:这些是从源到 Spanner 的表名称替换项。它们的写法如下:[{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}]此示例展示了如何将 Singers 表映射到 Vocalists 表,以及将 Albums 表映射到 Records 表。例如
[{Singers, Vocalists}, {Albums, Records}]
。默认值为空。 - columnOverrides:这些是从来源到 Spanner 的列名称替换项。它们的写法如下:[{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}]请注意,SourceTableName 在源表和 Spanner 对中应保持不变。如需替换表名称,请使用 tableOverrides。该示例展示了如何在“歌手”和“专辑”表中将 SingerName 分别映射到 TalentName 和 AlbumName。例如
[{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]
。默认值为空。 - schemaOverridesFilePath:用于指定从源到 Spanner 的表和列名称替换项的文件。默认值为空。
运行模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Cloud Datastream to Spanner template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner \ --parameters \ inputFilePattern=GCS_FILE_PATH,\ streamName=STREAM_NAME,\ instanceId=CLOUDSPANNER_INSTANCE,\ databaseId=CLOUDSPANNER_DATABASE,\ deadLetterQueueDirectory=DLQ
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 IDJOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
GCS_FILE_PATH
:用于存储 Datastream 事件的 Cloud Storage 路径。例如gs://bucket/path/to/data/
CLOUDSPANNER_INSTANCE
:Spanner 实例。CLOUDSPANNER_DATABASE
:Spanner 数据库。DLQ
:错误队列目录的 Cloud Storage 路径。
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner", "parameters": { "inputFilePattern": "GCS_FILE_PATH", "streamName": "STREAM_NAME" "instanceId": "CLOUDSPANNER_INSTANCE" "databaseId": "CLOUDSPANNER_DATABASE" "deadLetterQueueDirectory": "DLQ" } } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 IDJOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
GCS_FILE_PATH
:用于存储 Datastream 事件的 Cloud Storage 路径。例如gs://bucket/path/to/data/
CLOUDSPANNER_INSTANCE
:Spanner 实例。CLOUDSPANNER_DATABASE
:Spanner 数据库。DLQ
:错误队列目录的 Cloud Storage 路径。
模板源代码
Java
/*
* Copyright (C) 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.services.datastream.v1.model.SourceConfig;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.v2.cdc.dlq.DeadLetterQueueManager;
import com.google.cloud.teleport.v2.cdc.dlq.PubSubNotifiedDlqIO;
import com.google.cloud.teleport.v2.cdc.dlq.StringDeadLetterQueueSanitizer;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.datastream.sources.DataStreamIO;
import com.google.cloud.teleport.v2.datastream.utils.DataStreamClient;
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
import com.google.cloud.teleport.v2.spanner.migrations.schema.ISchemaOverridesParser;
import com.google.cloud.teleport.v2.spanner.migrations.schema.NoopSchemaOverridesParser;
import com.google.cloud.teleport.v2.spanner.migrations.schema.Schema;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SchemaFileOverridesParser;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SchemaStringOverridesParser;
import com.google.cloud.teleport.v2.spanner.migrations.shard.ShardingContext;
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
import com.google.cloud.teleport.v2.spanner.migrations.transformation.TransformationContext;
import com.google.cloud.teleport.v2.spanner.migrations.utils.SessionFileReader;
import com.google.cloud.teleport.v2.spanner.migrations.utils.ShardingContextReader;
import com.google.cloud.teleport.v2.spanner.migrations.utils.TransformationContextReader;
import com.google.cloud.teleport.v2.templates.DataStreamToSpanner.Options;
import com.google.cloud.teleport.v2.templates.constants.DatastreamToSpannerConstants;
import com.google.cloud.teleport.v2.templates.datastream.DatastreamConstants;
import com.google.cloud.teleport.v2.templates.spanner.ProcessInformationSchema;
import com.google.cloud.teleport.v2.templates.transform.ChangeEventTransformerDoFn;
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Strings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This pipeline ingests DataStream data from GCS as events. The events are written to Cloud
* Spanner.
*
* <p>NOTE: Future versions will support: Pub/Sub, GCS, or Kafka as per DataStream
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/datastream-to-spanner/README_Cloud_Datastream_to_Spanner.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "Cloud_Datastream_to_Spanner",
category = TemplateCategory.STREAMING,
displayName = "Datastream to Cloud Spanner",
description = {
"The Datastream to Cloud Spanner template is a streaming pipeline that reads <a"
+ " href=\"https://cloud.google.com/datastream/docs\">Datastream</a> events from a Cloud"
+ " Storage bucket and writes them to a Cloud Spanner database. It is intended for data"
+ " migration from Datastream sources to Cloud Spanner.\n",
"All tables required for migration must exist in the destination Cloud Spanner database prior"
+ " to template execution. Hence schema migration from a source database to destination"
+ " Cloud Spanner must be completed prior to data migration. Data can exist in the tables"
+ " prior to migration. This template does not propagate Datastream schema changes to the"
+ " Cloud Spanner database.\n",
"Data consistency is guaranteed only at the end of migration when all data has been written"
+ " to Cloud Spanner. To store ordering information for each record written to Cloud"
+ " Spanner, this template creates an additional table (called a shadow table) for each"
+ " table in the Cloud Spanner database. This is used to ensure consistency at the end of"
+ " migration. The shadow tables are not deleted after migration and can be used for"
+ " validation purposes at the end of migration.\n",
"Any errors that occur during operation, such as schema mismatches, malformed JSON files, or"
+ " errors resulting from executing transforms, are recorded in an error queue. The error"
+ " queue is a Cloud Storage folder which stores all the Datastream events that had"
+ " encountered errors along with the error reason in text format. The errors can be"
+ " transient or permanent and are stored in appropriate Cloud Storage folders in the"
+ " error queue. The transient errors are retried automatically while the permanent"
+ " errors are not. In case of permanent errors, you have the option of making"
+ " corrections to the change events and moving them to the retriable bucket while the"
+ " template is running."
},
optionsClass = Options.class,
flexContainerName = "datastream-to-spanner",
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/datastream-to-cloud-spanner",
contactInformation = "https://cloud.google.com/support",
requirements = {
"A Datastream stream in Running or Not started state.",
"A Cloud Storage bucket where Datastream events are replicated.",
"A Cloud Spanner database with existing tables. These tables can be empty or contain data.",
},
streaming = true,
supportsAtLeastOnce = true)
public class DataStreamToSpanner {
private static final Logger LOG = LoggerFactory.getLogger(DataStreamToSpanner.class);
private static final String AVRO_SUFFIX = "avro";
private static final String JSON_SUFFIX = "json";
/**
* Options supported by the pipeline.
*
* <p>Inherits standard configuration options.
*/
public interface Options
extends PipelineOptions, StreamingOptions, DataflowPipelineWorkerPoolOptions {
@TemplateParameter.GcsReadFile(
order = 1,
groupName = "Source",
optional = true,
description =
"File location for Datastream file output in Cloud Storage. Support for this feature has been disabled.",
helpText =
"The Cloud Storage file location that contains the Datastream files to replicate. Typically, "
+ "this is the root path for a stream. Support for this feature has been disabled.")
String getInputFilePattern();
void setInputFilePattern(String value);
@TemplateParameter.Enum(
order = 2,
enumOptions = {@TemplateEnumOption("avro"), @TemplateEnumOption("json")},
optional = true,
description = "Datastream output file format (avro/json).",
helpText =
"The format of the output file produced by Datastream. For example `avro,json`. Defaults to `avro`.")
@Default.String("avro")
String getInputFileFormat();
void setInputFileFormat(String value);
@TemplateParameter.GcsReadFile(
order = 3,
optional = true,
description = "Session File Path in Cloud Storage",
helpText =
"Session file path in Cloud Storage that contains mapping information from"
+ " HarbourBridge")
String getSessionFilePath();
void setSessionFilePath(String value);
@TemplateParameter.Text(
order = 4,
groupName = "Target",
description = "Cloud Spanner Instance Id.",
helpText = "The Spanner instance where the changes are replicated.")
String getInstanceId();
void setInstanceId(String value);
@TemplateParameter.Text(
order = 5,
groupName = "Target",
description = "Cloud Spanner Database Id.",
helpText = "The Spanner database where the changes are replicated.")
String getDatabaseId();
void setDatabaseId(String value);
@TemplateParameter.ProjectId(
order = 6,
groupName = "Target",
optional = true,
description = "Cloud Spanner Project Id.",
helpText = "The Spanner project ID.")
String getProjectId();
void setProjectId(String projectId);
@TemplateParameter.Text(
order = 7,
groupName = "Target",
optional = true,
description = "The Cloud Spanner Endpoint to call",
helpText = "The Cloud Spanner endpoint to call in the template.",
example = "https://batch-spanner.googleapis.com")
@Default.String("https://batch-spanner.googleapis.com")
String getSpannerHost();
void setSpannerHost(String value);
@TemplateParameter.PubsubSubscription(
order = 8,
optional = true,
description = "The Pub/Sub subscription being used in a Cloud Storage notification policy.",
helpText =
"The Pub/Sub subscription being used in a Cloud Storage notification policy. For the name,"
+ " use the format `projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>`.")
String getGcsPubSubSubscription();
void setGcsPubSubSubscription(String value);
@TemplateParameter.Text(
order = 9,
groupName = "Source",
optional = true,
description = "Datastream stream name.",
helpText =
"The name or template for the stream to poll for schema information and source type.")
String getStreamName();
void setStreamName(String value);
@TemplateParameter.Text(
order = 10,
optional = true,
description = "Cloud Spanner shadow table prefix.",
helpText = "The prefix used to name shadow tables. Default: `shadow_`.")
@Default.String("shadow_")
String getShadowTablePrefix();
void setShadowTablePrefix(String value);
@TemplateParameter.Boolean(
order = 11,
optional = true,
description = "If true, create shadow tables in Cloud Spanner.",
helpText =
"This flag indicates whether shadow tables must be created in Cloud Spanner database.")
@Default.Boolean(true)
Boolean getShouldCreateShadowTables();
void setShouldCreateShadowTables(Boolean value);
@TemplateParameter.DateTime(
order = 12,
optional = true,
description =
"The starting DateTime used to fetch from Cloud Storage "
+ "(https://tools.ietf.org/html/rfc3339).",
helpText =
"The starting DateTime used to fetch from Cloud Storage "
+ "(https://tools.ietf.org/html/rfc3339).")
@Default.String("1970-01-01T00:00:00.00Z")
String getRfcStartDateTime();
void setRfcStartDateTime(String value);
@TemplateParameter.Integer(
order = 13,
optional = true,
description = "File read concurrency",
helpText = "The number of concurrent DataStream files to read.")
@Default.Integer(30)
Integer getFileReadConcurrency();
void setFileReadConcurrency(Integer value);
@TemplateParameter.Text(
order = 14,
optional = true,
description = "Dead letter queue directory.",
helpText =
"The file path used when storing the error queue output. "
+ "The default file path is a directory under the Dataflow job's temp location.")
@Default.String("")
String getDeadLetterQueueDirectory();
void setDeadLetterQueueDirectory(String value);
@TemplateParameter.Integer(
order = 15,
optional = true,
description = "Dead letter queue retry minutes",
helpText = "The number of minutes between dead letter queue retries. Defaults to `10`.")
@Default.Integer(10)
Integer getDlqRetryMinutes();
void setDlqRetryMinutes(Integer value);
@TemplateParameter.Integer(
order = 16,
optional = true,
description = "Dead letter queue maximum retry count",
helpText =
"The max number of times temporary errors can be retried through DLQ. Defaults to `500`.")
@Default.Integer(500)
Integer getDlqMaxRetryCount();
void setDlqMaxRetryCount(Integer value);
// DataStream API Root Url (only used for testing)
@TemplateParameter.Text(
order = 17,
optional = true,
description = "Datastream API Root URL (only required for testing)",
helpText = "Datastream API Root URL.")
@Default.String("https://datastream.googleapis.com/")
String getDataStreamRootUrl();
void setDataStreamRootUrl(String value);
@TemplateParameter.Text(
order = 18,
optional = true,
description = "Datastream source type (only required for testing)",
helpText =
"This is the type of source database that Datastream connects to. Example -"
+ " mysql/oracle. Need to be set when testing without an actual running"
+ " Datastream.")
String getDatastreamSourceType();
void setDatastreamSourceType(String value);
@TemplateParameter.Boolean(
order = 19,
optional = true,
description =
"If true, rounds the decimal values in json columns to a number that can be stored"
+ " without loss of precision.",
helpText =
"This flag if set, rounds the decimal values in json columns to a number that can be"
+ " stored without loss of precision.")
@Default.Boolean(false)
Boolean getRoundJsonDecimals();
void setRoundJsonDecimals(Boolean value);
@TemplateParameter.Enum(
order = 20,
optional = true,
description = "Run mode - currently supported are : regular or retryDLQ",
enumOptions = {@TemplateEnumOption("regular"), @TemplateEnumOption("retryDLQ")},
helpText = "This is the run mode type, whether regular or with retryDLQ.")
@Default.String("regular")
String getRunMode();
void setRunMode(String value);
@TemplateParameter.GcsReadFile(
order = 21,
optional = true,
helpText =
"Transformation context file path in cloud storage used to populate data used in"
+ " transformations performed during migrations Eg: The shard id to db name to"
+ " identify the db from which a row was migrated",
description = "Transformation context file path in cloud storage")
String getTransformationContextFilePath();
void setTransformationContextFilePath(String value);
@TemplateParameter.Integer(
order = 22,
optional = true,
description = "Directory watch duration in minutes. Default: 10 minutes",
helpText =
"The Duration for which the pipeline should keep polling a directory in GCS. Datastream"
+ "output files are arranged in a directory structure which depicts the timestamp "
+ "of the event grouped by minutes. This parameter should be approximately equal to"
+ "maximum delay which could occur between event occurring in source database and "
+ "the same event being written to GCS by Datastream. 99.9 percentile = 10 minutes")
@Default.Integer(10)
Integer getDirectoryWatchDurationInMinutes();
void setDirectoryWatchDurationInMinutes(Integer value);
@TemplateParameter.Enum(
order = 23,
enumOptions = {
@TemplateEnumOption("LOW"),
@TemplateEnumOption("MEDIUM"),
@TemplateEnumOption("HIGH")
},
optional = true,
description = "Priority for Spanner RPC invocations",
helpText =
"The request priority for Cloud Spanner calls. The value must be one of:"
+ " [`HIGH`,`MEDIUM`,`LOW`]. Defaults to `HIGH`.")
@Default.Enum("HIGH")
RpcPriority getSpannerPriority();
void setSpannerPriority(RpcPriority value);
@TemplateParameter.PubsubSubscription(
order = 24,
optional = true,
description =
"The Pub/Sub subscription being used in a Cloud Storage notification policy for DLQ"
+ " retry directory when running in regular mode.",
helpText =
"The Pub/Sub subscription being used in a Cloud Storage notification policy for DLQ"
+ " retry directory when running in regular mode. For the name, use the format"
+ " `projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>`. When set, the"
+ " deadLetterQueueDirectory and dlqRetryMinutes are ignored.")
String getDlqGcsPubSubSubscription();
void setDlqGcsPubSubSubscription(String value);
@TemplateParameter.GcsReadFile(
order = 25,
optional = true,
description = "Custom jar location in Cloud Storage",
helpText =
"Custom JAR file location in Cloud Storage for the file that contains the custom transformation logic for processing records"
+ " in forward migration.")
@Default.String("")
String getTransformationJarPath();
void setTransformationJarPath(String value);
@TemplateParameter.Text(
order = 26,
optional = true,
description = "Custom class name",
helpText =
"Fully qualified class name having the custom transformation logic. It is a"
+ " mandatory field in case transformationJarPath is specified")
@Default.String("")
String getTransformationClassName();
void setTransformationClassName(String value);
@TemplateParameter.Text(
order = 27,
optional = true,
description = "Custom parameters for transformation",
helpText =
"String containing any custom parameters to be passed to the custom transformation class.")
@Default.String("")
String getTransformationCustomParameters();
void setTransformationCustomParameters(String value);
@TemplateParameter.Text(
order = 28,
optional = true,
description = "Filtered events directory",
helpText =
"This is the file path to store the events filtered via custom transformation. Default is a directory"
+ " under the Dataflow job's temp location. The default value is enough under most"
+ " conditions.")
@Default.String("")
String getFilteredEventsDirectory();
void setFilteredEventsDirectory(String value);
@TemplateParameter.GcsReadFile(
order = 29,
optional = true,
helpText =
"Sharding context file path in cloud storage is used to populate the shard id in spanner database for each source shard."
+ "It is of the format Map<stream_name, Map<db_name, shard_id>>",
description = "Sharding context file path in cloud storage")
String getShardingContextFilePath();
void setShardingContextFilePath(String value);
@TemplateParameter.Text(
order = 30,
optional = true,
description = "Table name overrides from source to spanner",
regexes =
"^\\[([[:space:]]*\\{[[:space:]]*[[:graph:]]+[[:space:]]*,[[:space:]]*[[:graph:]]+[[:space:]]*\\}[[:space:]]*(,[[:space:]]*)*)*\\]$",
example = "[{Singers, Vocalists}, {Albums, Records}]",
helpText =
"These are the table name overrides from source to spanner. They are written in the"
+ "following format: [{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}]"
+ "This example shows mapping Singers table to Vocalists and Albums table to Records.")
@Default.String("")
String getTableOverrides();
void setTableOverrides(String value);
@TemplateParameter.Text(
order = 31,
optional = true,
regexes =
"^\\[([[:space:]]*\\{[[:space:]]*[[:graph:]]+\\.[[:graph:]]+[[:space:]]*,[[:space:]]*[[:graph:]]+\\.[[:graph:]]+[[:space:]]*\\}[[:space:]]*(,[[:space:]]*)*)*\\]$",
description = "Column name overrides from source to spanner",
example =
"[{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]",
helpText =
"These are the column name overrides from source to spanner. They are written in the"
+ "following format: [{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}]"
+ "Note that the SourceTableName should remain the same in both the source and spanner pair. To override table names, use tableOverrides."
+ "The example shows mapping SingerName to TalentName and AlbumName to RecordName in Singers and Albums table respectively.")
@Default.String("")
String getColumnOverrides();
void setColumnOverrides(String value);
@TemplateParameter.Text(
order = 32,
optional = true,
description = "File based overrides from source to spanner",
helpText =
"A file which specifies the table and the column name overrides from source to spanner.")
@Default.String("")
String getSchemaOverridesFilePath();
void setSchemaOverridesFilePath(String value);
}
private static void validateSourceType(Options options) {
boolean isRetryMode = "retryDLQ".equals(options.getRunMode());
if (isRetryMode) {
// retry mode does not read from Datastream
return;
}
String sourceType = getSourceType(options);
if (!DatastreamConstants.SUPPORTED_DATASTREAM_SOURCES.contains(sourceType)) {
throw new IllegalArgumentException(
"Unsupported source type found: "
+ sourceType
+ ". Specify one of the following source types: "
+ DatastreamConstants.SUPPORTED_DATASTREAM_SOURCES);
}
options.setDatastreamSourceType(sourceType);
}
static String getSourceType(Options options) {
if (options.getDatastreamSourceType() != null) {
return options.getDatastreamSourceType();
}
if (options.getStreamName() == null) {
throw new IllegalArgumentException("Stream name cannot be empty.");
}
GcpOptions gcpOptions = options.as(GcpOptions.class);
DataStreamClient datastreamClient;
SourceConfig sourceConfig;
try {
datastreamClient = new DataStreamClient(gcpOptions.getGcpCredential());
sourceConfig = datastreamClient.getSourceConnectionProfile(options.getStreamName());
} catch (IOException e) {
LOG.error("IOException Occurred: DataStreamClient failed initialization.");
throw new IllegalArgumentException("Unable to initialize DatastreamClient: " + e);
}
// TODO: use getPostgresSourceConfig() instead of an else once SourceConfig.java is updated.
if (sourceConfig.getMysqlSourceConfig() != null) {
return DatastreamConstants.MYSQL_SOURCE_TYPE;
} else if (sourceConfig.getOracleSourceConfig() != null) {
return DatastreamConstants.ORACLE_SOURCE_TYPE;
} else {
return DatastreamConstants.POSTGRES_SOURCE_TYPE;
}
// LOG.error("Source Connection Profile Type Not Supported");
// throw new IllegalArgumentException("Unsupported source connection profile type in
// Datastream");
}
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
UncaughtExceptionLogger.register();
LOG.info("Starting DataStream to Cloud Spanner");
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
validateSourceType(options);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
/*
* Stages:
* 1) Ingest and Normalize Data to FailsafeElement with JSON Strings
* 2) Write JSON Strings to Cloud Spanner
* 3) Write Failures to GCS Dead Letter Queue
*/
Pipeline pipeline = Pipeline.create(options);
DeadLetterQueueManager dlqManager = buildDlqManager(options);
// Ingest session file into schema object.
Schema schema = SessionFileReader.read(options.getSessionFilePath());
/*
* Stage 1: Ingest/Normalize Data to FailsafeElement with JSON Strings and
* read Cloud Spanner information schema.
* a) Prepare spanner config and process information schema
* b) Read DataStream data from GCS into JSON String FailsafeElements
* c) Reconsume Dead Letter Queue data from GCS into JSON String FailsafeElements
* d) Flatten DataStream and DLQ Streams
*/
// Prepare Spanner config
SpannerConfig spannerConfig =
SpannerConfig.create()
.withProjectId(ValueProvider.StaticValueProvider.of(options.getProjectId()))
.withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
.withInstanceId(ValueProvider.StaticValueProvider.of(options.getInstanceId()))
.withDatabaseId(ValueProvider.StaticValueProvider.of(options.getDatabaseId()))
.withRpcPriority(ValueProvider.StaticValueProvider.of(options.getSpannerPriority()))
.withCommitRetrySettings(
RetrySettings.newBuilder()
.setTotalTimeout(org.threeten.bp.Duration.ofMinutes(4))
.setInitialRetryDelay(org.threeten.bp.Duration.ofMinutes(0))
.setRetryDelayMultiplier(1)
.setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(0))
.setInitialRpcTimeout(org.threeten.bp.Duration.ofMinutes(4))
.setRpcTimeoutMultiplier(1)
.setMaxRpcTimeout(org.threeten.bp.Duration.ofMinutes(4))
.setMaxAttempts(1)
.build());
/* Process information schema
* 1) Read information schema from destination Cloud Spanner database
* 2) Check if shadow tables are present and create if necessary
* 3) Return new information schema
*/
PCollection<Ddl> ddl =
pipeline.apply(
"Process Information Schema",
new ProcessInformationSchema(
spannerConfig,
options.getShouldCreateShadowTables(),
options.getShadowTablePrefix(),
options.getDatastreamSourceType()));
PCollectionView<Ddl> ddlView = ddl.apply("Cloud Spanner DDL as view", View.asSingleton());
PCollection<FailsafeElement<String, String>> jsonRecords = null;
// Elements sent to the Dead Letter Queue are to be reconsumed.
// A DLQManager is to be created using PipelineOptions, and it is in charge
// of building pieces of the DLQ.
PCollectionTuple reconsumedElements = null;
boolean isRegularMode = "regular".equals(options.getRunMode());
if (isRegularMode && (!Strings.isNullOrEmpty(options.getDlqGcsPubSubSubscription()))) {
reconsumedElements =
dlqManager.getReconsumerDataTransformForFiles(
pipeline.apply(
"Read retry from PubSub",
new PubSubNotifiedDlqIO(
options.getDlqGcsPubSubSubscription(),
// file paths to ignore when re-consuming for retry
new ArrayList<String>(
Arrays.asList("/severe/", "/tmp_retry", "/tmp_severe/", ".temp")))));
} else {
reconsumedElements =
dlqManager.getReconsumerDataTransform(
pipeline.apply(dlqManager.dlqReconsumer(options.getDlqRetryMinutes())));
}
PCollection<FailsafeElement<String, String>> dlqJsonRecords =
reconsumedElements
.get(DeadLetterQueueManager.RETRYABLE_ERRORS)
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
if (isRegularMode) {
LOG.info("Regular Datastream flow");
PCollection<FailsafeElement<String, String>> datastreamJsonRecords =
pipeline.apply(
new DataStreamIO(
options.getStreamName(),
options.getInputFilePattern(),
options.getInputFileFormat(),
options.getGcsPubSubSubscription(),
options.getRfcStartDateTime())
.withFileReadConcurrency(options.getFileReadConcurrency())
.withoutDatastreamRecordsReshuffle()
.withDirectoryWatchDuration(
Duration.standardMinutes(options.getDirectoryWatchDurationInMinutes())));
int maxNumWorkers = options.getMaxNumWorkers() != 0 ? options.getMaxNumWorkers() : 1;
jsonRecords =
PCollectionList.of(datastreamJsonRecords)
.and(dlqJsonRecords)
.apply(Flatten.pCollections())
.apply(
"Reshuffle",
Reshuffle.<FailsafeElement<String, String>>viaRandomKey()
.withNumBuckets(
maxNumWorkers * DatastreamToSpannerConstants.MAX_DOFN_PER_WORKER));
} else {
LOG.info("DLQ retry flow");
jsonRecords =
PCollectionList.of(dlqJsonRecords)
.apply(Flatten.pCollections())
.apply("Reshuffle", Reshuffle.viaRandomKey());
}
/*
* Stage 2: Transform records
*/
// Ingest transformation context file into memory.
TransformationContext transformationContext =
TransformationContextReader.getTransformationContext(
options.getTransformationContextFilePath());
// Ingest sharding context file into memory.
ShardingContext shardingContext =
ShardingContextReader.getShardingContext(options.getShardingContextFilePath());
CustomTransformation customTransformation =
CustomTransformation.builder(
options.getTransformationJarPath(), options.getTransformationClassName())
.setCustomParameters(options.getTransformationCustomParameters())
.build();
// Create the overrides mapping.
ISchemaOverridesParser schemaOverridesParser = configureSchemaOverrides(options);
ChangeEventTransformerDoFn changeEventTransformerDoFn =
ChangeEventTransformerDoFn.create(
schema,
schemaOverridesParser,
transformationContext,
shardingContext,
options.getDatastreamSourceType(),
customTransformation,
options.getRoundJsonDecimals(),
ddlView,
spannerConfig);
PCollectionTuple transformedRecords =
jsonRecords.apply(
"Apply Transformation to events",
ParDo.of(changeEventTransformerDoFn)
.withSideInputs(ddlView)
.withOutputTags(
DatastreamToSpannerConstants.TRANSFORMED_EVENT_TAG,
TupleTagList.of(
Arrays.asList(
DatastreamToSpannerConstants.FILTERED_EVENT_TAG,
DatastreamToSpannerConstants.PERMANENT_ERROR_TAG))));
/*
* Stage 3: Write filtered records to GCS
*/
String tempLocation =
options.as(DataflowPipelineOptions.class).getTempLocation().endsWith("/")
? options.as(DataflowPipelineOptions.class).getTempLocation()
: options.as(DataflowPipelineOptions.class).getTempLocation() + "/";
String filterEventsDirectory =
options.getFilteredEventsDirectory().isEmpty()
? tempLocation + "filteredEvents/"
: options.getFilteredEventsDirectory();
LOG.info("Filtered events directory: {}", filterEventsDirectory);
transformedRecords
.get(DatastreamToSpannerConstants.FILTERED_EVENT_TAG)
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(
"Write Filtered Events To GCS",
TextIO.write().to(filterEventsDirectory).withSuffix(".json").withWindowedWrites());
/*
* Stage 4: Write transformed records to Cloud Spanner
*/
SpannerTransactionWriter.Result spannerWriteResults =
transformedRecords
.get(DatastreamToSpannerConstants.TRANSFORMED_EVENT_TAG)
.apply(
"Write events to Cloud Spanner",
new SpannerTransactionWriter(
spannerConfig,
ddlView,
options.getShadowTablePrefix(),
options.getDatastreamSourceType(),
isRegularMode));
/*
* Stage 5: Write failures to GCS Dead Letter Queue
* a) Retryable errors are written to retry GCS Dead letter queue
* b) Severe errors are written to severe GCS Dead letter queue
*/
// We will write only the original payload from the failsafe event to the DLQ. We are doing
// that in
// StringDeadLetterQueueSanitizer.
spannerWriteResults
.retryableErrors()
.apply(
"DLQ: Write retryable Failures to GCS",
MapElements.via(new StringDeadLetterQueueSanitizer()))
.setCoder(StringUtf8Coder.of())
.apply(
"Write To DLQ",
DLQWriteTransform.WriteDLQ.newBuilder()
.withDlqDirectory(dlqManager.getRetryDlqDirectoryWithDateTime())
.withTmpDirectory(options.getDeadLetterQueueDirectory() + "/tmp_retry/")
.setIncludePaneInfo(true)
.build());
PCollection<FailsafeElement<String, String>> dlqErrorRecords =
reconsumedElements
.get(DeadLetterQueueManager.PERMANENT_ERRORS)
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
// TODO: Write errors from transformer and spanner writer into separate folders
PCollection<FailsafeElement<String, String>> permanentErrors =
PCollectionList.of(dlqErrorRecords)
.and(spannerWriteResults.permanentErrors())
.and(transformedRecords.get(DatastreamToSpannerConstants.PERMANENT_ERROR_TAG))
.apply(Flatten.pCollections());
// increment the metrics
permanentErrors
.apply("Update metrics", ParDo.of(new MetricUpdaterDoFn(isRegularMode)))
.apply(
"DLQ: Write Severe errors to GCS",
MapElements.via(new StringDeadLetterQueueSanitizer()))
.setCoder(StringUtf8Coder.of())
.apply(
"Write To DLQ",
DLQWriteTransform.WriteDLQ.newBuilder()
.withDlqDirectory(dlqManager.getSevereDlqDirectoryWithDateTime())
.withTmpDirectory((options).getDeadLetterQueueDirectory() + "/tmp_severe/")
.setIncludePaneInfo(true)
.build());
// Execute the pipeline and return the result.
return pipeline.run();
}
private static DeadLetterQueueManager buildDlqManager(Options options) {
String tempLocation =
options.as(DataflowPipelineOptions.class).getTempLocation().endsWith("/")
? options.as(DataflowPipelineOptions.class).getTempLocation()
: options.as(DataflowPipelineOptions.class).getTempLocation() + "/";
String dlqDirectory =
options.getDeadLetterQueueDirectory().isEmpty()
? tempLocation + "dlq/"
: options.getDeadLetterQueueDirectory();
LOG.info("Dead-letter queue directory: {}", dlqDirectory);
options.setDeadLetterQueueDirectory(dlqDirectory);
if ("regular".equals(options.getRunMode())) {
return DeadLetterQueueManager.create(dlqDirectory, options.getDlqMaxRetryCount());
} else {
String retryDlqUri =
FileSystems.matchNewResource(dlqDirectory, true)
.resolve("severe", StandardResolveOptions.RESOLVE_DIRECTORY)
.toString();
LOG.info("Dead-letter retry directory: {}", retryDlqUri);
return DeadLetterQueueManager.create(dlqDirectory, retryDlqUri, 0);
}
}
static ISchemaOverridesParser configureSchemaOverrides(Options options) {
// incorrect configuration
if (!options.getSchemaOverridesFilePath().isEmpty()
&& (!options.getTableOverrides().isEmpty() || !options.getColumnOverrides().isEmpty())) {
throw new IllegalArgumentException(
"Only one of file based or string based overrides must be configured! Please correct the configuration and re-run the job");
}
// string based overrides
if (!options.getTableOverrides().isEmpty() || !options.getColumnOverrides().isEmpty()) {
Map<String, String> userOptionsOverrides = new HashMap<>();
if (!options.getTableOverrides().isEmpty()) {
userOptionsOverrides.put("tableOverrides", options.getTableOverrides());
}
if (!options.getColumnOverrides().isEmpty()) {
userOptionsOverrides.put("columnOverrides", options.getColumnOverrides());
}
return new SchemaStringOverridesParser(userOptionsOverrides);
}
// file based overrides
if (!options.getSchemaOverridesFilePath().isEmpty()) {
return new SchemaFileOverridesParser(options.getSchemaOverridesFilePath());
}
// no overrides
return new NoopSchemaOverridesParser();
}
}
后续步骤
- 了解 Dataflow 模板。
- 参阅 Google 提供的模板列表。