Using data sources and destinations

This page explains how to query data and write query results with Dataflow SQL.

Dataflow SQL can query the following sources:

Dataflow SQL can write query results to the following destinations:

Pub/Sub

Querying Pub/Sub topics

To query a Pub/Sub topic with Dataflow SQL, complete the following steps:

  1. Add the Pub/Sub topic as Dataflow source.

  2. Assign a schema to the Pub/Sub topic.

  3. Use the Pub/Sub topic in a Dataflow SQL query.

Adding a Pub/Sub topic

You can add a Pub/Sub topic as a Dataflow source using the Dataflow SQL UI.

  1. Go to the Dataflow SQL UI.

    Go to the Dataflow SQL UI

  2. In the navigation panel, click the Add Data drop-down list and select Cloud Dataflow sources.

    The Add data drop-down list with Cloud Dataflow sources selected

  3. In the Add Cloud Dataflow source panel, select Cloud Pub/Sub topics and search for the topic.

    The following screenshot shows a search for the transactions Pub/Sub topic:

    The Add Cloud Dataflow source panel with the Pub/Sub topic option selected, the transactions search query completed, and transactions topic selected.

  4. Click Add.

After adding the Dataflow source as a Dataflow source, the Pub/Sub topic appears in the Resources section of the navigation menu.

To find the topic, expand Cloud Dataflow sources > Cloud Pub/Sub topics.

Assigning a Pub/Sub topic schema

Pub/Sub topic schemas consistent of the following fields:

  • An event_timestamp field.

    Pub/Sub event timestamps identify when the messages are published. The timestamps are automatically added to Pub/Sub messages.

  • A field for each key-value pair in the Pub/Sub messages.

    For example, the schema for the message {"k1":"v1", "k2":"v2"} includes two STRING fields, named k1 and k2.

You can assign a schema to a Pub/Sub topic using the Cloud Console or gcloud command-line tool.

Console

To assign a schema to a Pub/Sub topic, complete the following steps:

  1. Select the topic in the Resources panel.

  2. In the Schema tab, click Edit schema to open the Schema side panel that displays the schema fields.

    The side panel in which to add or edit a schema

  3. Click Add field to add a field to the schema or toggle the Edit as text button to copy and paste the entire schema text.

    For example, the following is the schema text for a Pub/Sub topic with sales transactions.

    [
      {
          "description": "Pub/Sub event timestamp",
          "name": "event_timestamp",
          "mode": "REQUIRED",
          "type": "TIMESTAMP"
      },
      {
          "description": "Transaction time string",
          "name": "tr_time_str",
          "mode": NULLABLE,
          "type": "STRING"
      },
      {
          "description": "First name",
          "name": "first_name",
          "mode": NULLABLE,
          "type": "STRING"
      },
      {
          "description": "Last name",
          "name": "last_name",
          "mode": NULLABLE,
          "type": "STRING"
      },
      {
          "description": "City",
          "name": "city",
          "mode": NULLABLE,
          "type": "STRING"
      },
      {
          "description": "State",
          "name": "state",
          "mode": NULLABLE,
          "type": "STRING"
      },
      {
          "description": "Product",
          "name": "product",
          "mode": NULLABLE,
          "type": "STRING"
      },
      {
          "description": "Amount of transaction",
          "name": "amount",
          "mode": NULLABLE,
          "type": "FLOAT64"
      }
    ]
    
  4. Click Submit.

  5. (Optional) Click Preview topic to examine the content of your messages and confirm that they match the schema you defined.

    The details panel in Dataflow SQL UI with a Pub/Sub topic selected and the Preview topic button highlighted

gcloud

To assign a schema to a Pub/Sub topic, complete the following steps:

  1. Create a JSON file with the schema text.

    For example, the following is the schema text for a Pub/Sub topic with sales transactions.

    [
      {
          "description": "Pub/Sub event timestamp",
          "name": "event_timestamp",
          "mode": "REQUIRED",
          "type": "TIMESTAMP"
      },
      {
          "description": "Transaction time string",
          "name": "tr_time_str",
          "mode": NULLABLE,
          "type": "STRING"
      },
      {
          "description": "First name",
          "name": "first_name",
          "mode": NULLABLE,
          "type": "STRING"
      },
      {
          "description": "Last name",
          "name": "last_name",
          "mode": NULLABLE,
          "type": "STRING"
      },
      {
          "description": "City",
          "name": "city",
          "mode": NULLABLE,
          "type": "STRING"
      },
      {
          "description": "State",
          "name": "state",
          "mode": NULLABLE,
          "type": "STRING"
      },
      {
          "description": "Product",
          "name": "product",
          "mode": NULLABLE,
          "type": "STRING"
      },
      {
          "description": "Amount of transaction",
          "name": "amount",
          "mode": NULLABLE,
          "type": "FLOAT64"
      }
    ]
     ```
    
  2. Assign the schema to the Pub/Sub topic using the gcloud beta data-catalog entries command.

    gcloud beta data-catalog entries update \
     --lookup-entry='pubsub.topic.`project-id`.`topic-name`'
     --schema-from-file=file-path
    
  3. (Optional) Confirm that your schema was successfully assigned to the Pub/Sub topic.

    gcloud beta data-catalog entries lookup \
     'pubsub.topic.`project-id`.`topic-name`'
    

Using a Pub/Sub topic

To refer to a Pub/Sub in a Dataflow SQL query, use the following identifiers:

pubsub.topic.`project-id`.`topic-name`

The identifiers must follow the Dataflow SQL lexical structure. Use backticks to enclose identifiers with characters that are not letters, numbers, or underscores.

For example, the following query selects from the Dataflow topic daily.transactions in the project dataflow-sql.

SELECT *
FROM pubsub.topic.`dataflow-sql`.`daily.transactions`

Writing to Pub/Sub topics

You can write query results to a Pub/Sub topic using the Cloud Console or gcloud command-line tool.

Console

To write query results to a Pub/Sub topic, run the query with the Dataflow SQL UI:

  1. Go to the Dataflow SQL UI.

    Go to the Dataflow SQL UI

  2. Enter the Dataflow SQL query into the query editor.

  3. Click Create Cloud Dataflow job to open a panel of job options.

  4. In the Destination section of the panel, select Output type > Cloud Pub/Sub topic.

  5. Click Select a Cloud Pub/Sub topic and choose a topic.

  6. Click Create.

gcloud

To write query results to a Pub/Sub topic, use the --pubsub-topic flag of the gcloud dataflow sql query command.

gcloud dataflow sql query \
  --job-name=job-name \
  --region=region \
  --pubsub-topic='pubsub.topic.`project-id`.`topic-name`' \
  'query'

The schema of the destination Pub/Sub topics must match the schema of the query results. If a destination Pub/Sub topic does not have a schema, a schema that matches the query results is automatically assigned.

Cloud Storage

Querying Cloud Storage filesets

To query a Cloud Storage fileset with Dataflow SQL, complete the following steps:

  1. Create a Data Catalog fileset for Dataflow SQL

  2. Add the Cloud Storage fileset as Dataflow source.

  3. Use the Cloud Storage fileset in a Dataflow SQL query.

Creating Cloud Storage filesets

To create a Cloud Storage fileset, see Creating entry groups and filesets.

The Cloud Storage fileset must have a schema and only contain CSV files without header rows.

Adding Cloud Storage filesets

You can add a Cloud Storage fileset as a Dataflow source using the Dataflow SQL UI.

  1. Go to the Dataflow SQL UI.

    Go to the Dataflow SQL UI

  2. In the navigation panel, click the Add Data drop-down list and select Cloud Dataflow sources.

    The Add data drop-down list with Cloud Dataflow sources selected

  3. In the Add Cloud Dataflow source panel, select Cloud Storage filesets and search for the topic.

  4. Click Add.

After adding the Cloud Storage fileset as a Dataflow source, the Cloud Storage fileset appears in the Resources section of the navigation menu.

To find the fileset, expand Cloud Dataflow sources > Cloud Storage topics.

Using a Cloud Storage fileset

To refer to a Cloud Storage table in a Dataflow SQL query, use the following identifiers:

datacatalog.`project-id`.region.`entry-group`.`fileset-name`

The identifiers must follow the Dataflow SQL lexical structure. Use backticks to enclose identifiers with characters that are not letters, numbers, or underscores.

For example, the following query selects from the Cloud Storage fileset daily.registrations in the project dataflow-sql and entry group my-fileset-group.

SELECT *
FROM datacatalog.`dataflow-sql`.`us-central1`.`my-fileset-group`.`daily.registrations`

BigQuery

Querying BigQuery tables

To query a BigQuery table with Dataflow SQL, complete the following steps:

  1. Create a BigQuery table for Dataflow SQL.

  2. Use the BigQuery table in a Dataflow SQL query.

You do not need to add a BigQuery table as a Dataflow source.

Creating a BigQuery table

To create a BigQuery table for Dataflow SQL, see Creating an empty table with a schema definition.

Using a BigQuery table in a query

To refer to a BigQuery table in a Dataflow SQL query, use the following identifiers:

bigquery.table.`project-id`.`dataset-name`.`table-name`

The identifiers must follow the Dataflow SQL lexical structure. Use backticks to enclose identifiers with characters that are not letters, numbers, or underscores.

For example, the following query selects from the BigQuery table us_state_salesregions in the dataset dataflow_sql_dataset and project dataflow-sql.

SELECT *
FROM bigquery.table.`dataflow-sql`.dataflow_sql_dataset.us_state_salesregions

Writing to a BigQuery table

You can write query results to a Dataflow SQL query with the Cloud Console or gcloud command-line tool.

Console

To write query results to a Dataflow SQL query, run the query with the Dataflow SQL UI:

  1. Enter the Dataflow SQL query into the query editor.

  2. Click Create Cloud Dataflow job to open a panel of job options.

  3. In the Destination section of the panel, select Output type > BigQuery.

  4. Click Dataset ID and select a Loaded dataset or Create new dataset.

  5. In the Table name field, enter a destination table.

  6. (Optional) Choose how to load data into a BigQuery table.

    • Write if empty: (Default) Writes the data only if the table is empty.
    • Append to table: Appends the data to the end of the table.
    • Overwrite table: Erases all existing data in a table before writing the new data.
  7. Click Create.

gcloud

To write query results to a BigQuery table, use the --bigquery-table flag of the gcloud dataflow sql query command.

gcloud dataflow sql query \
  --job-name=job-name \
  --region=region \
  --bigquery-dataset=dataset-name \
  --bigquery-table=table-name \
  'query'

You can use the --bigquery-write-disposition flag and the following values to choose how to write data to a BigQuery table.

  • write-empty: (Default) Writes the data only if the table is empty.
  • write-truncate: Appends the data to the end of the table.
  • write-append: Erases all existing data in a table before writing the new data.
gcloud dataflow sql query \
  --job-name=job-name \
  --region=region \
  --bigquery-dataset=dataset-name \
  --bigquery-table=table-name \
  --bigquery-write-disposition=write-mode
  'query'

The schema of the destination BigQuery table must match the schema of the query results. If a destination BigQuery table does not have a schema, a schema that matches the query results is automatically assigned.