使用数据源和目标

本页面介绍如何使用 Dataflow SQL 查询数据和写入查询结果。

Dataflow SQL 可以查询以下来源:

Dataflow SQL 可以将查询结果写入以下目标:

Pub/Sub

查询 Pub/Sub 主题

如需使用 Dataflow SQL 查询 Pub/Sub 主题,请完成以下步骤:

  1. 添加 Pub/Sub 主题作为 Dataflow 来源。

  2. 为 Pub/Sub 主题分配架构

  3. 在 Dataflow SQL 查询中使用 Pub/Sub 主题

添加 Pub/Sub 主题

您可以使用 BigQuery 网页界面添加 Pub/Sub 主题作为 Dataflow 来源。

  1. 在 Google Cloud Console 中,转到 BigQuery 页面,在此页面中,您可以使用 Dataflow SQL。

    转到 BigQuery

  2. 在导航面板中,点击添加数据下拉列表,然后选择 Cloud Dataflow 来源

    “添加数据”下拉列表,其中已选择 Cloud Dataflow 来源

  3. 添加 Cloud Dataflow 来源面板中,选择 Cloud Pub/Sub 主题并搜索相应主题。

    以下屏幕截图显示了对 transactions Pub/Sub 主题的搜索:

    “添加 Cloud Dataflow 来源”面板,其中已选择 Pub/Sub 主题选项、已完成事务搜索查询并且已选择事务主题。

  4. 点击添加

在添加相应 Pub/Sub 主题作为 Dataflow 来源后,该 Pub/Sub 主题会显示在导航菜单的资源部分中。

如需查找该主题,请展开 Cloud Dataflow 来源 > Cloud Pub/Sub 主题

分配 Pub/Sub 主题架构

Pub/Sub 主题架构包含以下字段:

  • event_timestamp 字段。

    Pub/Sub 事件时间戳用于标识消息的发布时间。时间戳会自动添加到 Pub/Sub 消息中。

  • Pub/Sub 消息中每个键值对的字段。

    例如,消息 {"k1":"v1", "k2":"v2"} 的架构包含两个 STRING 字段,分别名为 k1k2

您可以使用 Cloud Console 或 Google Cloud CLI 为 Pub/Sub 主题分配架构。

控制台

如需为 Pub/Sub 主题分配架构,请完成以下步骤:

  1. 资源面板中选择主题。

  2. 架构标签页中,点击修改架构,以打开显示架构字段的架构侧边面板。

    用于添加或修改架构的侧边面板

  3. 点击添加字段以向架构添加字段,或切换以文本形式修改按钮以复制并粘贴整个架构文本。

    例如,以下是包含销售交易的 Pub/Sub 主题的架构文本:

    [
      {
          "description": "Pub/Sub event timestamp",
          "name": "event_timestamp",
          "mode": "REQUIRED",
          "type": "TIMESTAMP"
      },
      {
          "description": "Transaction time string",
          "name": "tr_time_str",
          "mode": "NULLABLE",
          "type": "STRING"
      },
      {
          "description": "First name",
          "name": "first_name",
          "mode": "NULLABLE",
          "type": "STRING"
      },
      {
          "description": "Last name",
          "name": "last_name",
          "mode": "NULLABLE",
          "type": "STRING"
      },
      {
          "description": "City",
          "name": "city",
          "mode": "NULLABLE",
          "type": "STRING"
      },
      {
          "description": "State",
          "name": "state",
          "mode": "NULLABLE",
          "type": "STRING"
      },
      {
          "description": "Product",
          "name": "product",
          "mode": "NULLABLE",
          "type": "STRING"
      },
      {
          "description": "Amount of transaction",
          "name": "amount",
          "mode": "NULLABLE",
          "type": "FLOAT64"
      }
    ]
    
  4. 点击提交

  5. (可选)点击预览主题,检查消息的内容并确认其匹配您定义的架构。

    Dataflow SQL 页面中的详细信息面板,其中已选择 Pub/Sub 主题并突出显示了“预览主题”按钮

gcloud

如需为 Pub/Sub 主题分配架构,请完成以下步骤:

  1. 使用架构文本创建一个 JSON 文件。

    例如,以下是包含销售交易的 Pub/Sub 主题的架构文本:

    [
      {
          "description": "Pub/Sub event timestamp",
          "column": "event_timestamp",
          "mode": "REQUIRED",
          "type": "TIMESTAMP"
      },
      {
          "description": "Transaction time string",
          "column": "tr_time_str",
          "mode": "NULLABLE",
          "type": "STRING"
      },
      {
          "description": "First name",
          "column": "first_name",
          "mode": "NULLABLE",
          "type": "STRING"
      },
      {
          "description": "Last name",
          "column": "last_name",
          "mode": "NULLABLE",
          "type": "STRING"
      },
      {
          "description": "City",
          "column": "city",
          "mode": "NULLABLE",
          "type": "STRING"
      },
      {
          "description": "State",
          "column": "state",
          "mode": "NULLABLE",
          "type": "STRING"
      },
      {
          "description": "Product",
          "column": "product",
          "mode": "NULLABLE",
          "type": "STRING"
      },
      {
          "description": "Amount of transaction",
          "column": "amount",
          "mode": "NULLABLE",
          "type": "FLOAT64"
      }
    ]
    
  2. 使用 gcloud data-catalog entries 命令为 Pub/Sub 主题分配架构:

    gcloud data-catalog entries update \
     --lookup-entry='pubsub.topic.`PROJECT_ID`.`TOPIC_NAME`' \
     --schema-from-file=FILE_PATH
    

    请替换以下内容:

    • PROJECT_ID:您的项目 ID
    • TOPIC_NAME:您的 Pub/Sub 主题名称
    • FILE_PATH:包含架构文本的 JSON 文件的路径
  3. (可选)通过运行以下命令,确认已成功为 Pub/Sub 主题分配架构:

    gcloud data-catalog entries lookup \
     'pubsub.topic.`PROJECT_ID`.`TOPIC_NAME`'
    

使用 Pub/Sub 主题

如需在 Dataflow SQL 查询中引用 Pub/Sub,请使用以下标识符:

pubsub.topic.`PROJECT_ID`.`TOPIC_NAME`

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • TOPIC_NAME:您的 Pub/Sub 主题名称
标识符必须遵循 Dataflow SQL 词法结构。使用反引号将包含非字母、数字或下划线字符的标识符括起来。

例如,以下查询会从项目 dataflow-sql 中的 Dataflow 主题 daily.transactions 选择。

SELECT *
FROM pubsub.topic.`dataflow-sql`.`daily.transactions`

写入 Pub/Sub 主题

您可以使用 Cloud Console 或 Google Cloud CLI 将查询结果写入 Pub/Sub 主题。

控制台

如需将查询结果写入 Pub/Sub 主题,请使用 Dataflow SQL 运行查询:

  1. 在 Cloud Console 中,转到 BigQuery 页面,在此页面中,您可以使用 Dataflow SQL。

    转到 BigQuery

  2. 在查询编辑器中输入 Dataflow SQL 查询。

  3. 点击创建 Cloud Dataflow 作业,打开作业选项面板。

  4. 在该面板的目标部分中,选择输出类型 > Cloud Pub/Sub 主题

  5. 点击选择 Cloud Pub/Sub 主题,然后选择一个主题。

  6. 点击创建

gcloud

如需将查询结果写入 Pub/Sub 主题,请使用 gcloud dataflow sql query 命令的 --pubsub-topic 标志:

gcloud dataflow sql query \
  --job-name=JOB_NAME \
  --region=REGION \
  --pubsub-project=PROJECT_ID \
  --pubsub-topic=TOPIC_NAME \
  'QUERY'

请替换以下内容:

  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • PROJECT_ID:您的项目 ID
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • QUERY:Dataflow SQL 查询

目标 Pub/Sub 主题的架构必须与查询结果的架构匹配。如果目标 Pub/Sub 主题没有架构,则系统会自动分配与查询结果匹配的架构。

Cloud Storage

查询 Cloud Storage 文件集

如需使用 Dataflow SQL 查询 Cloud Storage 文件集,请完成以下步骤:

  1. 为 Dataflow SQL 创建 Data Catalog 文件集

  2. 添加 Cloud Storage 文件集作为 Dataflow 来源。

  3. 在 Dataflow SQL 查询中使用 Cloud Storage 文件集

创建 Cloud Storage 文件集

如需创建 Cloud Storage 文件集,请参阅创建条目组和文件集

Cloud Storage 文件集必须具有架构,并且只能包含没有标题行的 CSV 文件。

添加 Cloud Storage 文件集

您可以使用 Dataflow SQL 添加 Cloud Storage 文件集作为 Dataflow 来源:

  1. 在 Cloud Console 中,转到 BigQuery 页面,在此页面中,您可以使用 Dataflow SQL。

    转到 BigQuery

  2. 在导航面板中,点击添加数据下拉列表,然后选择 Cloud Dataflow 来源

    “添加数据”下拉列表,其中已选择 Cloud Dataflow 来源

  3. 添加 Cloud Dataflow 来源面板中,选择 Cloud Storage 文件集并搜索相应主题。

  4. 点击添加

在添加相应 Cloud Storage 文件集作为 Dataflow 来源后,该 Cloud Storage 文件集会显示在导航菜单的资源部分中。

如需查找该文件集,请展开 Cloud Dataflow 来源 > Cloud Storage 主题

使用 Cloud Storage 文件集

如需在 Dataflow SQL 查询中引用 Cloud Storage 表,请使用以下标识符:

datacatalog.entry.`PROJECT_ID`.REGION.`ENTRY_GROUP`.`FILESET_NAME`

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • REGION:地区端点(例如 us-west1
  • ENTRY_GROUP:Cloud Storage 文件集的条目组
  • FILESET_NAME:Cloud Storage 文件集的名称
标识符必须遵循 Dataflow SQL 词法结构。使用反引号将包含非字母、数字或下划线字符的标识符括起来。

例如,以下查询会从项目 dataflow-sql 和条目组 my-fileset-group 中的 Cloud Storage 文件集 daily.registrations 选择。

SELECT *
FROM datacatalog.entry.`dataflow-sql`.`us-central1`.`my-fileset-group`.`daily.registrations`

BigQuery

查询 BigQuery 表

如需使用 Dataflow SQL 查询 BigQuery 表,请完成以下步骤:

  1. 为 Dataflow SQL 创建 BigQuery 表

  2. 在 Dataflow SQL 查询中使用 BigQuery 表

您无需添加 BigQuery 表作为 Dataflow 来源。

创建 BigQuery 表

如需为 Dataflow SQL 创建 BigQuery 表,请参阅创建具有架构定义的空表

在查询中使用 BigQuery 表

如需在 Dataflow SQL 查询中引用 BigQuery 表,请使用以下标识符:

bigquery.table.`PROJECT_ID`.`DATASET_NAME`.`TABLE_NAME`

标识符必须遵循 Dataflow SQL 词法结构。使用反引号将包含非字母、数字或下划线字符的标识符括起来。

例如,以下查询会从数据集 dataflow_sql_dataset 和项目 dataflow-sql 中的 BigQuery 表 us_state_salesregions 选择。

SELECT *
FROM bigquery.table.`dataflow-sql`.dataflow_sql_dataset.us_state_salesregions

写入 BigQuery 表

您可以使用 Cloud Console 或 Google Cloud CLI 将查询结果写入 Dataflow SQL 查询。

控制台

如需将查询结果写入 Dataflow SQL 查询,请使用 Dataflow SQL 运行查询:

  1. 在 Cloud Console 中,转到 BigQuery 页面,在此页面中,您可以使用 Dataflow SQL。

    转到 BigQuery

  2. 在查询编辑器中输入 Dataflow SQL 查询。

  3. 点击创建 Cloud Dataflow 作业,打开作业选项面板。

  4. 在该面板的目标部分中,选择输出类型 > BigQuery

  5. 点击数据集 ID,然后选择已加载的数据集 (Loaded dataset) 或新建数据集

  6. 表名称字段中,输入目标表。

  7. (可选)选择如何将数据加载到 BigQuery 表中。

    • 只写入空白表:(默认)仅当表为空时才写入数据。
    • 附加到表:将数据附加到表的末尾。
    • 覆盖表:在写入新数据之前清空表中的所有现有数据。
  8. 点击创建

gcloud

如需将查询结果写入 BigQuery 表,请使用 gcloud dataflow sql query 命令的 --bigquery-table 标志:

gcloud dataflow sql query \
  --job-name=JOB_NAME \
  --region=REGION \
  --bigquery-dataset=DATASET_NAME \
  --bigquery-table=TABLE_NAME \
  'QUERY'

请替换以下内容:

  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • DATASET_NAME:您的 BigQuery 数据集名称
  • TABLE_NAME:您的 BigQuery 表名称
  • QUERY:Dataflow SQL 查询

如需选择将数据写入 BigQuery 表的方式,您可以使用 --bigquery-write-disposition 标志和以下值:

  • write-empty:(默认)仅当表为空时才写入数据。
  • write-append:将数据附加到表的末尾。
  • write-truncate:在写入新数据之前清空表中的所有现有数据。
gcloud dataflow sql query \
  --job-name=JOB_NAME \
  --region=REGION \
  --bigquery-dataset=DATASET_NAME \
  --bigquery-table=TABLE_NAME \
  --bigquery-write-disposition=WRITE_MODE
  'QUERY'

WRITE_MODE 替换为 BigQuery 写入处置值。

目标 BigQuery 表的架构必须与查询结果的架构匹配。如果目标 BigQuery 表没有架构,则系统会自动分配与查询结果匹配的架构。