使用数据源和目标

本页面介绍了如何使用 Dataflow SQL 查询数据和写入查询结果。

Dataflow SQL 可以查询以下来源:

Dataflow SQL 可以将查询结果写入以下目标:

Pub/Sub

查询 Pub/Sub 主题

如需使用 Dataflow SQL 查询 Pub/Sub 主题,请完成以下步骤:

  1. 添加 Pub/Sub 主题作为 Dataflow 来源。

  2. 为 Pub/Sub 主题分配架构

  3. 在 Dataflow SQL 查询中使用 Pub/Sub 主题

添加 Pub/Sub 主题

您可以使用 Dataflow SQL 界面添加 Pub/Sub 主题作为 Dataflow 来源。

  1. 转到 Dataflow SQL 界面。

    转到 Dataflow SQL 界面

  2. 在导航面板中,点击添加数据下拉列表,然后选择 Cloud Dataflow 来源

    “添加数据”下拉列表,其中已选择 Cloud Dataflow 来源

  3. 添加 Cloud Dataflow 来源面板中,选择 Cloud Pub/Sub 主题并搜索相应主题。

    以下屏幕截图显示了对 transactions Pub/Sub 主题的搜索:

    “添加 Cloud Dataflow 来源”面板,其中已选择 Pub/Sub 主题选项、已完成事务搜索查询并且已选择事务主题。

  4. 点击添加

在添加相应 Dataflow 来源作为 Dataflow 来源后,Pub/Sub 主题会显示在导航菜单的资源部分中。

如需查找该主题,请展开 Cloud Dataflow 来源 > Cloud Pub/Sub 主题

分配 Pub/Sub 主题架构

Pub/Sub 主题架构包含以下字段:

  • event_timestamp 字段。

    Pub/Sub 事件时间戳用于标识消息的发布时间。时间戳会自动添加到 Pub/Sub 消息中。

  • Pub/Sub 消息中每个键值对的字段。

    例如,消息 {"k1":"v1", "k2":"v2"} 的架构包含两个 STRING 字段,分别名为 k1k2

您可以使用 Cloud Console 或 gcloud 命令行工具为 Pub/Sub 主题分配架构。

控制台

如需为 Pub/Sub 主题分配架构,请完成以下步骤:

  1. 资源面板中选择主题。

  2. 架构标签页中,点击修改架构,以打开显示架构字段的架构侧边面板。

    用于添加或修改架构的侧边面板

  3. 点击添加字段以向架构添加字段,或切换以文本形式修改按钮以复制并粘贴整个架构文本。

    例如,以下是包含销售交易的 Pub/Sub 主题的架构文本。

    [
      {
          "description": "Pub/Sub event timestamp",
          "name": "event_timestamp",
          "mode": "REQUIRED",
          "type": "TIMESTAMP"
      },
      {
          "description": "Transaction time string",
          "name": "tr_time_str",
          "mode": NULLABLE,
          "type": "STRING"
      },
      {
          "description": "First name",
          "name": "first_name",
          "mode": NULLABLE,
          "type": "STRING"
      },
      {
          "description": "Last name",
          "name": "last_name",
          "mode": NULLABLE,
          "type": "STRING"
      },
      {
          "description": "City",
          "name": "city",
          "mode": NULLABLE,
          "type": "STRING"
      },
      {
          "description": "State",
          "name": "state",
          "mode": NULLABLE,
          "type": "STRING"
      },
      {
          "description": "Product",
          "name": "product",
          "mode": NULLABLE,
          "type": "STRING"
      },
      {
          "description": "Amount of transaction",
          "name": "amount",
          "mode": NULLABLE,
          "type": "FLOAT64"
      }
    ]
    
  4. 点击提交

  5. (可选)点击预览主题以检查消息的内容并确认其与您定义的架构匹配。

    Dataflow SQL 界面中的详细信息面板,其中已选择 Pub/Sub 主题并突出显示了“预览主题”按钮

gcloud

如需为 Pub/Sub 主题分配架构,请完成以下步骤:

  1. 使用架构文本创建一个 JSON 文件。

    例如,以下是包含销售交易的 Pub/Sub 主题的架构文本。

    [
      {
          "description": "Pub/Sub event timestamp",
          "name": "event_timestamp",
          "mode": "REQUIRED",
          "type": "TIMESTAMP"
      },
      {
          "description": "Transaction time string",
          "name": "tr_time_str",
          "mode": NULLABLE,
          "type": "STRING"
      },
      {
          "description": "First name",
          "name": "first_name",
          "mode": NULLABLE,
          "type": "STRING"
      },
      {
          "description": "Last name",
          "name": "last_name",
          "mode": NULLABLE,
          "type": "STRING"
      },
      {
          "description": "City",
          "name": "city",
          "mode": NULLABLE,
          "type": "STRING"
      },
      {
          "description": "State",
          "name": "state",
          "mode": NULLABLE,
          "type": "STRING"
      },
      {
          "description": "Product",
          "name": "product",
          "mode": NULLABLE,
          "type": "STRING"
      },
      {
          "description": "Amount of transaction",
          "name": "amount",
          "mode": NULLABLE,
          "type": "FLOAT64"
      }
    ]
     ```
    
  2. 使用 gcloud beta data-catalog entries 命令为 Pub/Sub 主题分配架构。

    gcloud beta data-catalog entries update \
     --lookup-entry='pubsub.topic.`project-id`.`topic-name`'
     --schema-from-file=file-path
    
  3. (可选)确认已成功为 Pub/Sub 主题分配架构。

    gcloud beta data-catalog entries lookup \
     'pubsub.topic.`project-id`.`topic-name`'
    

使用 Pub/Sub 主题

如需在 Dataflow SQL 查询中引用 Pub/Sub,请使用以下标识符:

pubsub.topic.`project-id`.`topic-name`

标识符必须遵循 Dataflow SQL 词法结构。使用反引号将包含非字母、数字或下划线字符的标识符括起来。

例如,以下查询会从项目 dataflow-sql 中的 Dataflow 主题 daily.transactions 选择。

SELECT *
FROM pubsub.topic.`dataflow-sql`.`daily.transactions`

写入 Pub/Sub 主题

您可以使用 Cloud Console 或 gcloud 命令行工具将查询结果写入 Pub/Sub 主题。

控制台

如需将查询结果写入 Pub/Sub 主题,请使用 Dataflow SQL 界面运行查询:

  1. 转到 Dataflow SQL 界面。

    转到 Dataflow SQL 界面

  2. 在查询编辑器中输入 Dataflow SQL 查询。

  3. 点击创建 Cloud Dataflow 作业,打开作业选项面板。

  4. 在该面板的目标部分中,选择输出类型 > Cloud Pub/Sub 主题

  5. 点击选择 Cloud Pub/Sub 主题,然后选择一个主题。

  6. 点击创建

gcloud

如需将查询结果写入 Pub/Sub 主题,请使用 gcloud dataflow sql query 命令的 --pubsub-topic 标志。

gcloud dataflow sql query \
  --job-name=job-name \
  --region=region \
  --pubsub-topic='pubsub.topic.`project-id`.`topic-name`' \
  'query'

目标 Pub/Sub 主题的架构必须与查询结果的架构匹配。如果目标 Pub/Sub 主题没有架构,则系统会自动分配与查询结果匹配的架构。

Cloud Storage

查询 Cloud Storage 文件集

如需使用 Dataflow SQL 查询 Cloud Storage 文件集,请完成以下步骤:

  1. 为 Dataflow SQL 创建 Data Catalog 文件集

  2. 添加 Cloud Storage 文件集作为 Dataflow 来源。

  3. 在 Dataflow SQL 查询中使用 Cloud Storage 文件集

创建 Cloud Storage 文件集

如需创建 Cloud Storage 文件集,请参阅创建条目组和文件集

Cloud Storage 文件集必须具有架构,并且只能包含没有标题行的 CSV 文件。

添加 Cloud Storage 文件集

您可以使用 Dataflow SQL 界面添加 Cloud Storage 文件集作为 Dataflow 来源。

  1. 转到 Dataflow SQL 界面。

    转到 Dataflow SQL 界面

  2. 在导航面板中,点击添加数据下拉列表,然后选择 Cloud Dataflow 来源

    “添加数据”下拉列表,其中已选择 Cloud Dataflow 来源

  3. 添加 Cloud Dataflow 来源面板中,选择 Cloud Storage 文件集并搜索相应主题。

  4. 点击添加

在添加相应 Cloud Storage 文件集作为 Dataflow 来源后,该 Cloud Storage 文件集会显示在导航菜单的资源部分中。

如需查找该文件集,请展开 Cloud Dataflow 来源 > Cloud Storage 主题

使用 Cloud Storage 文件集

如需在 Dataflow SQL 查询中引用 Cloud Storage 表,请使用以下标识符:

datacatalog.entry.`project-id`.region.`entry-group`.`fileset-name`

标识符必须遵循 Dataflow SQL 词法结构。使用反引号将包含非字母、数字或下划线字符的标识符括起来。

例如,以下查询会从项目 dataflow-sql 和条目组 my-fileset-group 中的 Cloud Storage 文件集 daily.registrations 选择。

SELECT *
FROM datacatalog.entry.`dataflow-sql`.`us-central1`.`my-fileset-group`.`daily.registrations`

BigQuery

查询 BigQuery 表

如需使用 Dataflow SQL 查询 BigQuery 表,请完成以下步骤:

  1. 为 Dataflow SQL 创建 BigQuery 表

  2. 在 Dataflow SQL 查询中使用 BigQuery 表

您无需添加 BigQuery 表作为 Dataflow 来源。

创建 BigQuery 表

如需为 Dataflow SQL 创建 BigQuery 表,请参阅创建具有架构定义的空表

在查询中使用 BigQuery 表

如需在 Dataflow SQL 查询中引用 BigQuery 表,请使用以下标识符:

bigquery.table.`project-id`.`dataset-name`.`table-name`

标识符必须遵循 Dataflow SQL 词法结构。使用反引号将包含非字母、数字或下划线字符的标识符括起来。

例如,以下查询会从数据集 dataflow_sql_dataset 和项目 dataflow-sql 中的 BigQuery 表 us_state_salesregions 选择。

SELECT *
FROM bigquery.table.`dataflow-sql`.dataflow_sql_dataset.us_state_salesregions

写入 BigQuery 表

您可以使用 Cloud Console 或 gcloud 命令行工具将查询结果写入 Dataflow SQL 查询。

控制台

如需将查询结果写入 Dataflow SQL 查询,请使用 Dataflow SQL 界面运行查询:

  1. 在查询编辑器中输入 Dataflow SQL 查询。

  2. 点击创建 Cloud Dataflow 作业,打开作业选项面板。

  3. 在该面板的目标部分中,选择输出类型 > BigQuery

  4. 点击数据集 ID,然后选择已加载的数据集 (Loaded dataset) 或新建数据集

  5. 表名称字段中,输入目标表。

  6. (可选)选择如何将数据加载到 BigQuery 表中。

    • 只写入空白表:(默认)仅当表为空时才写入数据。
    • 附加到表:将数据附加到表的末尾。
    • 覆盖表:在写入新数据之前清空表中的所有现有数据。
  7. 点击创建

gcloud

如需将查询结果写入 BigQuery 表,请使用 gcloud dataflow sql query 命令的 --bigquery-table 标志。

gcloud dataflow sql query \
  --job-name=job-name \
  --region=region \
  --bigquery-dataset=dataset-name \
  --bigquery-table=table-name \
  'query'

您可以使用 --bigquery-write-disposition 标志和以下值来选择如何将数据写入 BigQuery 表。

  • write-empty:(默认)仅当表为空时才写入数据。
  • write-append:将数据附加到表的末尾。
  • write-truncate:在写入新数据之前清空表中的所有现有数据。
gcloud dataflow sql query \
  --job-name=job-name \
  --region=region \
  --bigquery-dataset=dataset-name \
  --bigquery-table=table-name \
  --bigquery-write-disposition=write-mode
  'query'

目标 BigQuery 表的架构必须与查询结果的架构匹配。如果目标 BigQuery 表没有架构,则系统会自动分配与查询结果匹配的架构。