Using data sources and destinations

Dataflow SQL supports reading from the following sources:

Dataflow SQL supports writing to BigQuery tables.

To use Dataflow SQL and add Dataflow sources, you must switch to the Dataflow SQL UI.

Pub/Sub

Reading from a Pub/Sub topic

To read from a Pub/Sub topic in a Dataflow job, complete the following steps:

  1. Add the Pub/Sub topic as a Dataflow source.

  2. Use the topic in a query.

The Dataflow SQL UI provides a way to find Pub/Sub data source objects for any project that you have access to. You don't need to remember the full topic names.

Adding a Pub/Sub topic as a Dataflow source

To add your Pub/Sub topics to the Resources section in the left navigation panel, you must add the topic as a Dataflow source.

  1. In the left navigation panel, click the Add Data drop-down list and select Cloud Dataflow sources.

    Under ADD DATA, select Dataflow sources

  2. In the Add Cloud Dataflow source panel that opens on the right, choose Cloud Pub/Sub topics. In the search box, search for a topic name. Select the topic and click Add.

    The following screenshot shows a search for the transactions topic:

    Search for the transactions Pub/Sub topic

  3. The Resources section in the left navigation panel will display the Pub/Sub topics you added. To see the list of topics, expand Cloud Dataflow sources and then expand Cloud Pub/Sub topics.

To use a topic in a query, the topic must have a schema. You can assign a schema file with the gcloud command-line tool or an inline schema in the Dataflow SQL UI.

Console

The following steps show you how to specify an inline schema in the schema editor:

  1. After adding a Pub/Sub topic as a Dataflow, select the topic in the Resources panel.

  2. In the Schema tab, click Edit schema. The Schema side panel opens on the right.

    The side panel in which to add or edit a schema

  3. The Schema side panel displays the schema metadata. You can individually add fields to the schema or toggle the Edit as text button to copy and paste the entire schema text. Then, click Submit to assign the schema.

    For example, the following schema text is the inline schema for the transactions topic.

    [
      {
          "description": "Transaction time string",
          "name": "tr_time_str",
          "type": "STRING"
      },
      {
          "description": "First name",
          "name": "first_name",
          "type": "STRING"
      },
      {
          "description": "Last name",
          "name": "last_name",
          "type": "STRING"
      },
      {
          "description": "City",
          "name": "city",
          "type": "STRING"
      },
      {
          "description": "State",
          "name": "state",
          "type": "STRING"
      },
      {
          "description": "Product",
          "name": "product",
          "type": "STRING"
      },
      {
          "description": "Amount of transaction",
          "name": "amount",
          "type": "FLOAT"
      }
    ]
    

gcloud

The following steps show you how to assign a schema file to the transactions topic with the gcloud command-line tool:

  1. Create a text file and name it transactions_schema.yaml. Copy and paste the following schema text into transactions_schema.yaml.

    - column: event_timestamp
      description: Pub/Sub event timestamp
      mode: REQUIRED
      type: TIMESTAMP
    - column: attributes
      description: Pub/Sub message attributes
      mode: NULLABLE
      type: MAP<STRING,STRING>
    - column: payload
      description: Pub/Sub message payload
      mode: NULLABLE
      type: STRUCT
      subcolumns:
      - column: tr_time_str
        description: Transaction time string
        mode: NULLABLE
        type: STRING
      - column: first_name
        description: First name
        mode: NULLABLE
        type: STRING
      - column: last_name
        description: Last name
        mode: NULLABLE
        type: STRING
      - column: city
        description: City
        mode: NULLABLE
        type: STRING
      - column: state
        description: State
        mode: NULLABLE
        type: STRING
     -  column: product
        description: Product
        mode: NULLABLE
        type: STRING
      - column: amount
        description: Amount of transaction
        mode: NULLABLE
        type: FLOAT
    
  2. Assign the schema using the gcloud command-line tool.

    a. Update the gcloud tool with the following command. Ensure that the gcloud tool version is 242.0.0 or higher.

    gcloud components update
    

    b. Run the following command in a command-line window. Replace project-id with your project ID, and path/to/file with the path to your transactions_schema.yaml file.

    gcloud beta data-catalog entries update \
      --lookup-entry='pubsub.topic.`project-id`.transactions' \
      --schema-from-file=path/to/file/transactions_schema.yaml
    

    For more information about the command's parameters and allowed schema file formats, see the documentation page for gcloud beta data-catalog entries update.

    c. Confirm that your schema was successfully assigned to the transactions Pub/Sub topic. Replace project-id with your project ID.

    gcloud beta data-catalog entries lookup 'pubsub.topic.`project-id`.transactions'
    

Viewing a topic's schema

To see the schema of a specific topic, expand Cloud Dataflow sources in the Resources section in the left navigation panel. Then, expand Cloud Pub/Sub topics. Clicking a topic shows information about the resource in the details panel, including the Schema. The schema includes the following fields:

  • event_timestamp: the field that the watermark tracks
  • attributes: the Pub/Sub message attributes
  • payload: the contents of the Pub/Sub message. Pub/Sub messages must be in JSON format, as described in the JSON streaming library reference page. For example, you can insert messages formatted as {"k1":"v1", "k2":"v2"} into a BigQuery payload nested row with two fields, named k1 and k2, with string data types. You must serialize all fields that are in the schema, regardless of the field's value (null or non-null). For example, for NULLABLE fields, you must serialize the field in the JSON with a NULL value. If your JSON does not contain the field, Dataflow SQL returns an error.

The following screenshot shows the schema for the transactions topic.

View the transactions schema

Using a topic in a query

You can specify a Pub/Sub topic with a fully-qualified, dot-separated list of identifiers that follow the Standard SQL lexical structure.

pubsub.topic.project-id.topic-name

You must use backticks to enclose identifiers that contain characters which are not letters, numbers, or underscores.

For example, the following string specifies the Pub/Sub topic daily.transactions from the project dataflow-sql.

pubsub.topic.`dataflow-sql`.`daily.transactions`

You can also populate the query editor by selecting the topic in the Resources section in the left navigation panel. Expand Cloud Dataflow sources and then expand Cloud Pub/Sub topics. When you click a topic in the navigation panel, you can then click the Query Topic button on the right side of the details panel to populate the query box with a basic query for that topic.

The following screenshot shows the populated query in the query editor:

The Query topic button populates the query box

Use the topic's schema to write your SQL query. When you enter a query in the Dataflow SQL UI, the query validator verifies the query syntax. A green check mark icon is displayed if the query is valid. If the query is invalid, a red exclamation point icon is displayed. If your query syntax is invalid, clicking on the validator icon provides information about what you need to fix.

The following data enrichment query adds an additional field sales_region to a Pub/Sub stream of events (transactions). The sales regions are from a BigQuery table (us_state_salesregions) that maps states to sales regions.

Enter your query in the editor

Cloud Storage

Reading from a Cloud Storage fileset

To read from a Cloud Storage fileset in a Dataflow job, complete the following steps:

  1. Add the Cloud Storage fileset as a Dataflow source.

  2. Use the fileset in a query.

The Dataflow SQL UI provides a way to find Cloud Storage data source objects for any project that you have access to. You don't need to remember the full fileset names.

Adding a Cloud Storage fileset as a Dataflow source

To add your Cloud Storage filesets as a Dataflow source, you must create a corresponding Google Cloud resource in Data Catalog.

  1. Create an entry group to manage Cloud Storage filesets for Dataflow:

    gcloud beta data-catalog entry-groups create Entry_group_name \
       --location=Region
    
  2. Place a Cloud Storage fileset in the entry group:

    gcloud beta data-catalog entries create Fileset_name
      --entry-group=Entry_group_name \
      --location=Region \
      --gcs-file-patterns=gs://my-bucket/*.csv \
      --description="Fileset_description" \
    

After placing a Cloud Storage fileset in a Data Catalog entry group, add the fileset to the Resources section in the left navigation panel.

  1. In the left navigation panel, click the Add Data drop-down list and select Cloud Dataflow sources.

    Under ADD DATA, select Dataflow sources

  2. In the Add Cloud Dataflow source panel that opens on the right, choose Cloud Storage filesets. In the search box, search for a fileset name. Select the fileset and click Add.

  3. The Resources section in the left navigation panel will display the Pub/Sub filesets you added. To see the list of filesets, expand Cloud Dataflow sources and then expand Cloud Storage filesets.

To use a Cloud Storage fileset in a query, the fileset must have a schema. You can assign a schema file with the gcloud command-line tool or an inline schema in the Dataflow SQL UI.

Console

The following steps show you how to specify an inline schema in the schema editor:

  1. After adding a Cloud Storage fileset as a Dataflow source, select the fileset in the Resources panel.

  2. In the Schema tab, click Edit schema. The Schema side panel opens on the right.

    The side panel in which to add or edit a schema

  3. The Schema side panel will display the schema metadata. You can individually add fields to the schema or toggle the Edit as text button to copy and paste the entire schema text. Then, click Submit to assign the schema.

    For example, the following schema text is the inline schema the transactions fileset.

    [
     {
         "description": "Transaction time string",
         "name": "tr_time_str",
         "mode": "NULLABLE",
         "type": "STRING"
     },
     {
         "description": "Last name",
         "name": "last_name",
         "mode": "NULLABLE",
         "type": "STRING"
     },
     {
         "description": "City",
         "name": "city",
         "mode": "NULLABLE",
         "type": "STRING"
     },
     {
         "description": "State",
         "name": "state",
         "mode": "NULLABLE",
         "type": "STRING"
     },
     {
         "description": "Product",
         "name": "product",
         "mode": "NULLABLE",
         "type": "STRING"
     },
     {
         "description": "amount",
         "name": "Amount of transaction",
         "mode": "NULLABLE",
         "type": "FLOAT"
     }
    ]
    

gcloud

The following steps show you how to assign a schema file to the transactions fileset with the gcloud command-line tool:

  1. Create a text file and name it transactions_schema.yaml. Copy and paste the following schema text into transactions_schema.yaml.

    - column: tr_time_str
      description: Transaction time string
      mode: NULLABLE
      type: STRING
    - column: first_name
      description: First name
      mode: NULLABLE
      type: STRING
    - column: last_name
      description: Last name
      mode: NULLABLE
      type: STRING
    - column: city
      description: City
      mode: NULLABLE
      type: STRING
    - column: state
      description: State
      mode: NULLABLE
      type: STRING
    -  column: product
      description: Product
      mode: NULLABLE
      type: STRING
    - column: amount
      description: Amount of transaction
      mode: NULLABLE
      type: FLOAT
    
  2. Assign the schema using the gcloud command-line tool.

    a. Update the gcloud tool with the following command. Ensure that the gcloud tool version is 242.0.0 or higher.

    gcloud components update
    

    b. Run the following command in a command-line window.

    gcloud beta data-catalog entries update \
      --lookup-entry='datacatalog.Project_ID.Region.Entry_group.transactions' \
      --schema-from-file=path/to/file/transactions_schema.yaml
    

    For more information about the command's parameters and allowed schema file formats, see the documentation page for gcloud beta data-catalog entries update.

    c. Confirm that your schema was successfully assigned to the transactions Cloud Storage fileset.

    gcloud beta data-catalog entries lookup 'datacatalog.Project_ID.Region.Entry_group.transactions'
    

Viewing a fileset's schema

To see the schema of a specific fileset, expand Cloud Dataflow sources in the Resources section in the left navigation panel. Then, expand Cloud Storage filesets. Clicking a fileset shows information about the resource in the details panel, including the Schema.

Using a fileset in a query

You can specify a Cloud Storage fileset with a fully-qualified, dot-separated list of identifiers that follow the Standard SQL lexical structure.

datacatalog.Project_ID.Region.Entry_group.Fileset_name

For example, the following string specifies the Cloud Storage fileset daily.registrations from the project dataflow-sql and entry group filesets.

datacatalog.`dataflow-sql`.`us-central1`.filesets.`daily.registrations`

BigQuery

Reading from a BigQuery table

Adding a BigQuery table as a Dataflow source

You do not need to add your BigQuery table as a Dataflow source to add datasets and tables to the Resources section in the left navigation panel. You can expand pinned projects in the Resources section to view BigQuery datasets and tables. If your BigQuery table is in a project that is not listed in the Resources section, you can pin the project to add it to the list.

Viewing a table's schema

To see the schema of a specific table, find the project in the Resources section in the left navigation panel. Click the expand dataset icon icon next to your project to display the datasets in that project. Then, click the expand dataset icon next to any dataset to expand it and to show the tables within that dataset. You can also use the search box to search for a specific dataset or table.

Clicking the table shows information about the resource in the details panel, including the Schema.

Using a table in a query

You can specify a BigQuery table with a fully-qualified, dot-separated list of identifiers that follow the Standard SQL lexical structure.

  bigquery.table.project-id.my_dataset.my_table

You must use backticks to enclose identifiers that contain characters which are not letters, numbers, or underscores.

For example, the following string specifies a BigQuery dataset dataflow_sql_dataset and table us_state_salesregions from the project dataflow-sql.

  bigquery.table.`dataflow-sql`.dataflow_sql_dataset.us_state_salesregions

You can also populate the query editor with this string by selecting the table in the Resources section in the left navigation panel. Find your project in the Resources section, expand the project, and then expand the dataset that contains the table you want to use. When you click a table in the navigation panel, you can then click the Query Table button on the right side of the details panel to populate the query box with a basic query for that table.

The following screenshot shows the populated query in the query editor:

The Query table button populates the query box

Use the table's schema to write your SQL query. When you enter a query in the Dataflow SQL UI, the query validator verifies the query syntax. A green check mark icon is displayed if the query is valid. If the query is invalid, a red exclamation point icon is displayed. If your query syntax is invalid, clicking on the validator icon provides information about what you need to fix.

The following data enrichment query uses a BigQuery table (us_state_salesregions) that maps states to sales regions to add an additional field (sales_region) to a Pub/Sub stream of events.

Enter your query in the editor

Writing to a BigQuery table

When you create a Dataflow job to run your SQL query, you must specify a destination BigQuery table for the results. Keep the following prerequisites and considerations in mind:

  • The destination dataset must exist before you click Create Cloud Dataflow job.
  • If the destination table does not exist, the job creates a new table with the specified name.
  • If the destination table already exists, the table must be empty.
Us ha resultat útil aquesta pàgina? Feu-nos-ho saber:

Envia suggeriments sobre...

Necessiteu ajuda? Visiteu la nostra pàgina d'assistència.