Spanner Change Streams to Source Database 模板

流式处理流水线。从 Spanner 变更数据流读取数据并将其写入来源。

模板参数

参数 说明
changeStreamName 流水线从中读取数据的 Spanner 变更数据流的名称。
instanceId 包含变更数据流的 Spanner 实例的名称。
databaseId 变更数据流监控的 Spanner 数据库的名称。
spannerProjectId Spanner 项目的名称。
metadataInstance 用于存储连接器用于控制变更数据流 API 数据用量的元数据的实例。
metadataDatabase 用于存储连接器用于控制变更数据流 API 数据用量的元数据的数据库。
sourceShardsFilePath 包含源分片连接配置文件信息的 Cloud Storage 文件的路径。
startTimestamp 可选:用于读取更改的开始时间戳。默认值为空。
endTimestamp 可选:读取更改的结束时间戳。如果未提供时间戳,则无限期读取。默认值为空。
shadowTablePrefix 可选:用于为影子表命名的前缀。默认值:shadow_
sessionFilePath 可选:Cloud Storage 中的会话路径,其中包含 HarbourBridge 的映射信息。
filtrationMode 可选:过滤模式。指定如何根据条件删除特定记录。支持的模式包括:none(不过滤任何内容)、forward_migration(过滤使用向前迁移流水线写入的记录)。默认值为 forward_migration
shardingCustomJarPath 可选:Cloud Storage 中的自定义 JAR 文件位置,其中包含用于提取分片 ID 的自定义逻辑。如果您设置此参数,请设置 shardingCustomJarPath 参数。默认值为空。
shardingCustomClassName 可选:包含自定义分片 ID 实现的完全限定类名称。如果指定了 shardingCustomJarPath,则此参数是必需的。默认值为空。
shardingCustomParameters 可选:包含要传递给自定义分片类的任何自定义参数的字符串。默认值为空。
sourceDbTimezoneOffset 可选:源数据库相对于世界协调时间 (UTC) 的时区偏移量。示例值:+10:00。默认值为 +00:00
dlqGcsPubSubSubscription 可选:在常规模式下运行时,Cloud Storage 通知政策中用于 DLQ 重试目录的 Pub/Sub 订阅。名称应采用 projects/<project-id>/subscriptions/<subscription-name> 格式。设置后,系统会忽略 deadLetterQueueDirectory 和 dlqRetryMinutes。
skipDirectoryName 可选:从反向复制中跳过的记录会写入此目录。默认目录名称为“skip”。
maxShardConnections 可选:给定分片可接受的连接数上限。默认值为 10000
deadLetterQueueDirectory 可选:存储错误队列输出时使用的路径。默认路径为 Dataflow 作业的临时位置下的目录。
dlqMaxRetryCount 可选:可通过死信队列重试临时错误的次数上限。默认值为 500。
runMode 可选:运行模式类型。支持的值:regularretryDLQ。默认值:regular。 指定 retryDLQ 表示仅重试严重的邮件转送队列记录。
dlqRetryMinutes 可选:死信队列重试之间的分钟数。默认值为 10。

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Spanner Change Streams to Source Database template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud CLI

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_to_SourceDb \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       changeStreamName=CHANGE_STREAM_NAME,\
       instanceId=INSTANCE_ID,\
       databaseId=DATABASE_ID,\
       spannerProjectId=SPANNER_PROJECT_ID,\
       metadataInstance=METADATA_INSTANCE,\
       metadataDatabase=METADATA_DATABASE,\
       sourceShardsFilePath=SOURCE_SHARDS_FILE_PATH,\

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • CHANGE_STREAM_NAME:要从中读取的更改数据流的名称
  • INSTANCE_ID:Cloud Spanner 实例 ID。
  • DATABASE_ID:Cloud Spanner 数据库 ID。
  • SPANNER_PROJECT_ID:Cloud Spanner 项目 ID。
  • METADATA_INSTANCE:从变更数据流读取时用于存储元数据的 Cloud Spanner 实例
  • METADATA_DATABASE:从更改数据流读取时用于存储元数据的 Cloud Spanner 数据库
  • SOURCE_SHARDS_FILE_PATH:包含来源分片详细信息的 GCS 文件的路径

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "changeStreamName": "CHANGE_STREAM_NAME",
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "spannerProjectId": "SPANNER_PROJECT_ID",
       "metadataInstance": "METADATA_INSTANCE",
       "metadataDatabase": "METADATA_DATABASE",
       "sourceShardsFilePath": "SOURCE_SHARDS_FILE_PATH",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_to_SourceDb",
     "environment": { "maxWorkers": "10" }
  }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • CHANGE_STREAM_NAME:要从中读取的更改数据流的名称
  • INSTANCE_ID:Cloud Spanner 实例 ID。
  • DATABASE_ID:Cloud Spanner 数据库 ID。
  • SPANNER_PROJECT_ID:Cloud Spanner 项目 ID。
  • METADATA_INSTANCE:从变更数据流读取时用于存储元数据的 Cloud Spanner 实例
  • METADATA_DATABASE:从更改数据流读取时用于存储元数据的 Cloud Spanner 数据库
  • SOURCE_SHARDS_FILE_PATH:包含来源分片详细信息的 GCS 文件的路径