Spanner change streams to Cloud Storage 模板

Spanner change streams to Cloud Storage 模板是一种流处理流水线,可使用 Dataflow Runner v2 流式传输 Spanner 数据更改记录并将其写入 Cloud Storage 存储桶。

流水线根据 Spanner 变更数据流记录的时间戳将其分组到窗口中,每个窗口代表一个时长,您可以使用此模板配置该时长。时间戳属于某个窗口的所有记录都保证在该窗口中;不会有延迟到达。您还可以定义多个输出分片。流水线会为每个窗口的每个分片创建一个 Cloud Storage 输出文件。在输出文件中,记录是无序的。输出文件可以采用 JSON 或 AVRO 格式编写,具体取决于用户配置。

请注意,通过在与 Spanner 实例或 Cloud Storage 存储桶相同的区域运行 Dataflow 作业,您可以最大限度地减少网络延迟和网络传输费用。如果您使用位于作业区域之外的源、接收器、暂存文件位置或临时文件位置,则数据可能会跨区域发送。详细了解 Dataflow 区域

详细了解变更数据流如何构建变更数据流 Dataflow 流水线最佳实践

流水线要求

  • 在运行流水线之前,Spanner 实例必须已存在。
  • 在运行流水线之前,Spanner 数据库必须已存在。
  • 在运行流水线之前,Spanner 元数据实例必须已存在。
  • 在运行流水线之前,Spanner 元数据数据库必须已存在。
  • 在运行流水线之前,Spanner 变更数据流必须已存在。
  • 在运行流水线之前,Cloud Storage 输出存储桶必须已存在。

模板参数

参数 说明
spannerInstanceId 要从中读取变更数据流数据的 Spanner 实例 ID。
spannerDatabase 要从中读取变更数据流数据的 Spanner 数据库。
spannerDatabaseRole (可选)运行模板时使用的 Spanner 数据库角色。仅当运行模板的 IAM 主账号是精细访问权限控制用户时,才需要此参数。数据库角色必须拥有变更数据流的 SELECT 特权和变更数据流的读取函数的 EXECUTE 特权。如需了解详情,请参阅变更数据流的精细访问权限控制
spannerMetadataInstanceId 要用于变更数据流连接器元数据表的 Spanner 实例 ID。
spannerMetadataDatabase 要用于变更数据流连接器元数据表的 Spanner 数据库。
spannerChangeStreamName 要从中读取数据的 Spanner 变更数据流的名称。
gcsOutputDirectory 变更数据流输出在 Cloud Storage 中的文件位置,格式为:“gs://${BUCKET}/${ROOT_PATH}/”。
outputFilenamePrefix (可选)要写入的文件的文件名前缀。默认文件前缀设置为“output”。
spannerProjectId (可选)从中读取变更数据流的项目。这也是创建变更数据流连接器元数据表的项目。此参数的默认项目是 Dataflow 流水线在其中运行的项目。
startTimestamp (可选)要用于读取变更数据流的起始 DateTime(含边界值)。Ex-2021-10-12T07:20:50.52Z。默认为流水线启动时的时间戳,即当前时间。
endTimestamp (可选)要用于读取变更数据流的结束 DateTime(含边界值)。Ex-2021-10-12T07:20:50.52Z。默认为未来的无限时间。
outputFileFormat (可选)输出 Cloud Storage 文件的格式。允许的格式为 TEXT、AVRO。默认为 AVRO。
windowDuration (可选)窗口时长是将数据写入输出目录的时间间隔。请根据流水线的吞吐量配置时长。例如,较高的吞吐量可能需要较短的窗口时长,以便数据适应内存。默认为 5 分钟,最短可为 1 秒。允许的格式如下:[int]s(表示数秒,例如 5s)、[int]m(表示数分钟,例如 12m)、[int]h(表示数小时,例如 2h)。
rpcPriority (可选)Spanner 调用的请求优先级。该值必须为 [HIGH,MEDIUM,LOW] 之一。(默认值:HIGH)
numShards (可选)写入时产生的最大输出分片数。默认值为 20。分片数量越多,写入 Cloud Storage 的吞吐量越高,但处理输出 Cloud Storage 文件时跨分片聚合数据的费用也可能更高。
spannerMetadataTableName (可选)要使用的 Spanner 变更数据流连接器元数据表名称。如果未提供,系统会在流水线流期间自动创建 Spanner 变更数据流元数据表。更新现有流水线时必须提供此参数,其他情况下则不应提供。

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Spanner change streams to Google Cloud Storage template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
gcsOutputDirectory=GCS_OUTPUT_DIRECTORY

请替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • SPANNER_INSTANCE_ID:Cloud Spanner 实例 ID
  • SPANNER_DATABASE:Cloud Spanner 数据库
  • SPANNER_METADATA_INSTANCE_ID:Cloud Spanner 元数据实例 ID
  • SPANNER_METADATA_DATABASE:Cloud Spanner 元数据数据库
  • SPANNER_CHANGE_STREAM:Cloud Spanner 变更数据流
  • GCS_OUTPUT_DIRECTORY:变更数据流输出的文件位置

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage",
   }
}

请替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • SPANNER_INSTANCE_ID:Cloud Spanner 实例 ID
  • SPANNER_DATABASE:Cloud Spanner 数据库
  • SPANNER_METADATA_INSTANCE_ID:Cloud Spanner 元数据实例 ID
  • SPANNER_METADATA_DATABASE:Cloud Spanner 元数据数据库
  • SPANNER_CHANGE_STREAM:Cloud Spanner 变更数据流
  • GCS_OUTPUT_DIRECTORY:变更数据流输出的文件位置

后续步骤