Datastream to MySQL or PostgreSQL (Stream) 模板

Datastream to SQL 模板是一种流处理流水线,可读取 Datastream 数据并将其复制到任何 MySQL 或 PostgreSQL 数据库。该模板使用 Pub/Sub 通知从 Cloud Storage 中读取数据,并将此数据复制到 SQL 副本表。

该模板不支持数据定义语言 (DDL),要求数据库中已存在所有表。复制使用 Dataflow 有状态转换来过滤过时的数据,并确保数据无序的一致性。例如,如果传递了某行的较新版本,则会忽略该行较晚到达的版本。执行的数据操纵语言 (DML) 是将源数据最佳复制到目标数据的最佳尝试。执行的 DML 语句遵循以下规则:

  • 如果存在主键,则插入和更新操作使用插入/更新语法(例如 INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE)。
  • 如果存在主键,则删除将作为删除 DML 复制。
  • 如果不存在主键,则会将插入和更新操作插入表中。
  • 如果不存在主键,则会忽略删除操作。

如果您使用的是 Oracle to Postgres 实用程序,请在 SQL 中添加 ROWID 作为主键(如果主键不存在)。

流水线要求

  • 已准备好或已在复制数据的 Datastream 数据流。
  • 已经为 Datastream 数据启用 Cloud Storage Pub/Sub 通知
  • 为 PostgreSQL 数据库提供了所需的架构。
  • 已在 Dataflow 工作器和 PostgreSQL 之间设置网络访问权限。

模板参数

参数 说明
inputFilePattern Cloud Storage 中要复制的 Datastream 文件的位置。此文件位置通常是数据流的根路径。
gcsPubSubSubscription 包含 Datastream 文件通知的 Pub/Sub 订阅。例如 projects/my-project-id/subscriptions/my-subscription-id
inputFileFormat Datastream 生成的输出文件的格式。例如:avro,json。默认值:avro
databaseHost 要连接的 SQL 主机。
databaseUser 具有写入副本中所有表所需的全部权限的 SQL 用户。
databasePassword 给定 SQL 用户的密码。
databasePort (可选)要连接到的 SQL 数据库端口。默认值:5432。
databaseName (可选)要连接到的 SQL 数据库的名称。默认值:postgres。
streamName (可选)用于轮询架构信息的数据流的名称或模板。默认值:{_metadata_stream}

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Datastream to SQL template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_SQL \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

请替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH:Datastream 数据的 Cloud Storage 路径。例如 gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME:要从中读取已更改文件的 Pub/Sub 订阅。例如:projects/my-project-id/subscriptions/my-subscription-id
  • DATABASE_HOST:您的 SQL 主机 IP。
  • DATABASE_USER:您的 SQL 用户。
  • DATABASE_PASSWORD:您的 SQL 密码。

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_SQL",
   }
}
  

请替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH:Datastream 数据的 Cloud Storage 路径。例如 gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME:要从中读取已更改文件的 Pub/Sub 订阅。例如:projects/my-project-id/subscriptions/my-subscription-id
  • DATABASE_HOST:您的 SQL 主机 IP。
  • DATABASE_USER:您的 SQL 用户。
  • DATABASE_PASSWORD:您的 SQL 密码。

后续步骤