Spanner change streams to Cloud Storage 模板

Spanner change streams to Cloud Storage 模板是一种流处理流水线,可使用 Dataflow Runner v2 流式传输 Spanner 数据更改记录并将其写入 Cloud Storage 存储桶。

流水线根据 Spanner 变更数据流记录的时间戳将其分组到窗口中,每个窗口代表一个时长,您可以使用此模板配置该时长。时间戳属于某个窗口的所有记录都保证在该窗口中;不会有延迟到达。您还可以定义多个输出分片。流水线会为每个窗口的每个分片创建一个 Cloud Storage 输出文件。在输出文件中,记录是无序的。输出文件可以采用 JSON 或 AVRO 格式编写,具体取决于用户配置。

请注意,通过在与 Spanner 实例或 Cloud Storage 存储桶相同的区域运行 Dataflow 作业,您可以最大限度地减少网络延迟和网络传输费用。如果您使用位于作业区域之外的源、接收器、暂存文件位置或临时文件位置,则数据可能会跨区域发送。详细了解 Dataflow 区域

详细了解变更数据流如何构建变更数据流 Dataflow 流水线最佳实践

流水线要求

  • 在运行流水线之前,Spanner 实例必须已存在。
  • 在运行流水线之前,Spanner 数据库必须已存在。
  • 在运行流水线之前,Spanner 元数据实例必须已存在。
  • 在运行流水线之前,Spanner 元数据数据库必须已存在。
  • 在运行流水线之前,Spanner 变更数据流必须已存在。
  • 在运行流水线之前,Cloud Storage 输出存储桶必须已存在。

模板参数

必需参数

  • spannerInstanceId:要从中读取变更数据流数据的 Spanner 实例 ID。
  • spannerDatabase:要从中读取变更数据流数据的 Spanner 数据库。
  • spannerMetadataInstanceId:要用于变更数据流连接器元数据表的 Spanner 实例 ID。
  • spannerMetadataDatabase:要用于变更数据流连接器元数据表的 Spanner 数据库。
  • spannerChangeStreamName:要从中读取数据的 Spanner 变更数据流的名称。
  • gcsOutputDirectory:用于写入输出文件的路径和文件名前缀。必须以斜杠结尾。 日期时间格式用于解析日期和时间格式化程序的目录路径。(示例:gs://your-bucket/your-path)。

可选参数

  • spannerProjectId:包含要从中读取变更数据流的 Spanner 数据库的 Google Cloud 项目的 ID。此项目也是创建变更数据流连接器元数据表的项目。此参数的默认项目是 Dataflow 流水线在其中运行的项目。
  • spannerDatabaseRole:运行模板时使用的 Spanner 数据库角色。仅当运行模板的 IAM 主账号是精细访问权限控制用户时,才需要此参数。数据库角色必须拥有变更数据流的 SELECT 特权和变更数据流的读取函数的 EXECUTE 特权。如需了解详情,请参阅变更数据流的精细访问权限控制 (https://cloud.google.com/spanner/docs/fgac-change-streams)。
  • spannerMetadataTableName:要使用的 Spanner 变更数据流连接器元数据表名称。如果未提供,系统会在流水线执行期间自动创建 Spanner 变更数据流元数据表。更新现有流水线时,您必须为此参数提供一个值。否则,请勿使用此参数。
  • startTimestamp:用于读取变更数据流的起始日期时间(含边界值),格式为 Ex-2021-10-12T07:20:50.52Z。默认为流水线启动时的时间戳,即当前时间。
  • endTimestamp:要用于读取变更数据流的结束日期时间(含边界值)。例如 Ex-2021-10-12T07:20:50.52Z。默认为未来的无限时间。
  • spannerHost:要在模板中调用的 Cloud Spanner 端点。仅用于测试。(示例:https://spanner.googleapis.com)。默认值为 https://spanner.googleapis.com
  • outputFileFormat:输出 Cloud Storage 文件的格式。允许的格式为 TEXT 和 AVRO。默认值为 AVRO。
  • windowDuration:窗口时长是将数据写入输出目录的时间间隔。请根据流水线的吞吐量配置时长。例如,较高的吞吐量可能需要较短的窗口时长,以便数据适应内存。默认值为 5m(5 分钟),最少为 1s(1 秒)。允许的格式为 [int](以秒为单位,例如 5s)、[int]m(以分钟为单位,例如 12m)、[int]h(以小时为单位,例如 2h)。 (示例:5m)。
  • rpcPriority:Spanner 调用的请求优先级。该值必须为“HIGH”“MEDIUM”或“LOW”。默认值为:HIGH。
  • outputFilenamePrefix:要在各窗口文件上放置的前缀。(示例:output-)。默认值为:output。
  • numShards:写入时生成的输出分片数上限。分片数越多,写入 Cloud Storage 的吞吐量越高,但处理输出 Cloud Storage 文件时跨分片聚合数据的费用也可能更高。默认值为 20。

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Spanner change streams to Google Cloud Storage template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
gcsOutputDirectory=GCS_OUTPUT_DIRECTORY

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • SPANNER_INSTANCE_ID:Cloud Spanner 实例 ID
  • SPANNER_DATABASE:Cloud Spanner 数据库
  • SPANNER_METADATA_INSTANCE_ID:Cloud Spanner 元数据实例 ID
  • SPANNER_METADATA_DATABASE:Cloud Spanner 元数据数据库
  • SPANNER_CHANGE_STREAM:Cloud Spanner 变更数据流
  • GCS_OUTPUT_DIRECTORY:变更数据流输出的文件位置

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage",
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • SPANNER_INSTANCE_ID:Cloud Spanner 实例 ID
  • SPANNER_DATABASE:Cloud Spanner 数据库
  • SPANNER_METADATA_INSTANCE_ID:Cloud Spanner 元数据实例 ID
  • SPANNER_METADATA_DATABASE:Cloud Spanner 元数据数据库
  • SPANNER_CHANGE_STREAM:Cloud Spanner 变更数据流
  • GCS_OUTPUT_DIRECTORY:变更数据流输出的文件位置

后续步骤