Datastream to Spanner 模板

Datastream to Spanner 模板是一种流处理流水线,可从 Cloud Storage 存储桶中读取 Datastream 事件并将其写入 Spanner 数据库。它适用于从 Datastream 来源到 Spanner 的数据迁移。

在执行模板之前,迁移所需的所有表必须存在于目标 Spanner 数据库中。因此,在数据迁移之前,必须完成从源数据库到目标 Spanner 的架构迁移。在迁移之前,数据可能存在表中。此模板不会将 Datastream 架构更改传播到 Spanner 数据库。

只有在所有数据都写入 Spanner 后,才能在迁移结束时保证数据一致性。为了存储写入 Spanner 的每个记录的排序信息,此模板为 Spanner 数据库中的每个表创建了一个额外的表(称为影子表)。这用于确保迁移结束时的一致性。影子表在迁移后不会被删除,可在迁移结束时用于进行验证。

操作期间发生的任何错误(例如架构不匹配、JSON 文件格式错误或执行转换产生的错误)都会记录在错误队列中。错误队列是一个 Cloud Storage 文件夹,它以文本格式存储遇到错误的所有 Datastream 事件以及错误原因。这些错误可能是暂时性的,也可能是永久性的,它们存储在错误队列的相应 Cloud Storage 文件夹中。系统会自动重试暂时性错误,但不会自动重试永久性错误。如果发生永久性错误,您可以选择在模板运行期间更正更改事件,并将它们转移到可重试的存储桶。

流水线要求

  • 处于正在运行未启动状态的 Datastream 数据流。
  • 要在其中复制 Datastream 事件的 Cloud Storage 存储桶。
  • 包含现有表的 Spanner 数据库。这些表可以为空,也可以包含数据。

模板参数

参数 说明
inputFilePattern Cloud Storage 中要复制的 Datastream 文件的位置。通常,这是数据流的根路径。
streamName 用于轮询架构信息和来源类型的数据流的名称或模板。
instanceId 在其中复制更改的 Spanner 实例。
databaseId 在其中复制更改的 Spanner 数据库。
projectId Spanner 项目 ID。
deadLetterQueueDirectory (可选)用于存储错误队列输出的文件路径。默认值为 Dataflow 作业的临时位置下的目录。
inputFileFormat (可选)Datastream 生成的输出文件的格式。例如:avro,json。默认值:avro
shadowTablePrefix (可选)用于为影子表命名的前缀。默认值:shadow_

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Datastream to Spanner template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

请替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • GCS_FILE_PATH:用于存储 Datastream 事件的 Cloud Storage 路径。例如 gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE:Spanner 实例。
  • CLOUDSPANNER_DATABASE:Spanner 数据库。
  • DLQ:错误队列目录的 Cloud Storage 路径。

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

请替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • GCS_FILE_PATH:用于存储 Datastream 事件的 Cloud Storage 路径。例如 gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE:Spanner 实例。
  • CLOUDSPANNER_DATABASE:Spanner 数据库。
  • DLQ:错误队列目录的 Cloud Storage 路径。

后续步骤