Quickstart Using Templates

This page shows you how to create a streaming pipeline using a Google-Provided Cloud Dataflow template. Specifically, this page uses the Cloud Pub/Sub to BigQuery template as an example.

Before you begin

  1. Sign in to your Google Account.

    If you don't already have one, sign up for a new account.

  2. Select or create a GCP project.

    Go to the Manage resources page

  3. Make sure that billing is enabled for your project.

    Learn how to enable billing

  4. Enable the Google Dataflow, Compute Engine, Stackdriver Logging, Google Cloud Storage, Google Cloud Storage JSON, BigQuery, and Pub/Sub APIs.

    Enable the APIs

  5. Create a Cloud Storage bucket:
    1. In the GCP Console, go to the Cloud Storage browser.

      Go to the Cloud Storage browser

    2. Click Create bucket.
    3. In the Create bucket dialog, specify the following attributes:
      • Name: A unique bucket name. Do not include sensitive information in the bucket name, as the bucket namespace is global and publicly visible.
      • Storage class: Multi-Regional
      • A location where bucket data will be stored.
    4. Click Create.

Create a Cloud BigQuery dataset and table

Create a BigQuery dataset and table with the appropriate schema for your Cloud Pub/Sub topic, using Google Cloud Shell or the GCP Console.

In this example, the name of the dataset is taxirides and the name of the table is realtime.

Using Cloud Shell

Use the Cloud Shell to create a dataset and table.

  1. Create your dataset by running the command:
    bq mk taxirides
    Your output should look similar to:
    Dataset “myprojectid:taxirides” successfully created
  2. Create your table by running the command:
    bq mk \
    --time_partitioning_field timestamp \
    --schema ride_id:string,point_idx:integer,latitude:float,longitude:float,\
    timestamp:timestamp,meter_reading:float,meter_increment:float,ride_status:string,\
    passenger_count:integer -t taxirides.realtime
    Your output should look similar to:
    Table “myprojectid:taxirides.realtime” successfully created

    The table is partitioned to decrease query costs and improve performance.

Using the Google Cloud Platform console

Use the Google Cloud Platform console to create a dataset and table.

  1. Go to the BigQuery Web UI.
    GO TO THE BIGQUERY WEB UI
  2. Click the down arrow icon next to your project name in the navigation, then click Create new dataset. Input taxirides as your dataset ID.

    Dataset IDs are unique on a per-project basis. Click the question mark icon to see ID limitations.

  3. Leave all of the other default settings in place and click OK.
  4. In the navigation, hover over the dataset ID that you just created. Click the down arrow icon next to the ID and click Create new table.
  5. Next to Source Data, select the Create empty table option.
  6. Under Destination Table, select taxirides and enter realtime.
  7. Under Schema, select Edit as Text and enter:
    ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp,
    meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
  8. Under Options, select the Day option for the Partitioning type field.
  9. Under Options, select the timestamp column for the Partitioning field selector.
  10. Click the Create Table button.
  11. BigQuery Setup

Run the pipeline

Run a streaming pipeline using the Google-provided Cloud Pub/Sub to BigQuery template.

  1. Go to the Cloud Dataflow Web UI.
    GO TO THE CLOUD DATAFLOW WEB UI
  2. Enter a Job name for your Cloud Dataflow job.
  3. Under Cloud Dataflow template, select the Cloud Pub/Sub to BigQuery template.
  4. Under Cloud Pub/Sub input topic, enter projects/pubsub-public-data/topics/taxirides-realtime.
  5. Under BigQuery output table, enter <myprojectid>:taxirides.realtime.
  6. Under Temporary Location, enter gs://<mybucket>/tmp/.
  7. Click the Run job button.
  8. Cloud Dataflow create job
  9. View the data written to BigQuery. Go to the BigQuery Web UI.
    GO TO THE BIGQUERY WEB UI
    You can submit queries using standard SQL. For example, the following query selects all rows which have been added in the past 24 hours:
    SELECT * FROM `myprojectid.taxirides.realtime`
    WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
    LIMIT 1000

Clean up

To avoid incurring charges to your Google Cloud Platform account for the resources used in this quickstart:

  1. Go to the Cloud Dataflow Web UI.
    GO TO THE CLOUD DATAFLOW WEB UI
    1. You may need to select your streaming job from the job list in the Google Cloud Platform Console.
    2. In the navigation, click Cancel.
    3. In the Cancel dialog box, choose to either Cancel or Drain your pipeline.
  2. Go to the BigQuery Web UI.
    GO TO THE BIGQUERY WEB UI
    1. In the navigation, hover over the taxirides dataset you created.
    2. Click the down arrow icon next to your dataset name in the navigation, then click Delete dataset.
    3. In the Delete dataset dialog box, confirm the delete command by typing the name of your dataset (`taxirides`) and clicking OK.

What's next

Was this page helpful? Let us know how we did:

Send feedback about...

Cloud Dataflow
Need help? Visit our support page.