Implementing Datastream and Dataflow for analytics

Overview

For businesses with many isolated data sources, access to enterprise data across the organization, especially in a real-time manner, can be difficult. This results in a world of limited and slow data access, preventing the organization's ability to introspect.

Datastream provides near-real-time access to change data from a variety of on-premises and cloud-based data sources to create access to organizational data. Datastream provides a simple setup experience and a unified consumption API which democratizes the organization's access to the freshest enterprise data available across the organization, powering integrated near-real-time scenarios.

One such scenario is transferring data from a source database into a cloud-based storage service or messaging queue and transforming this data into a form that's readable by other applications and services, such as Dataflow, that communicate with this storage service or messaging queue. Dataflow is a web service for capturing and processing data on Google Cloud.

In this tutorial, you'll learn how Datastream integrates with Dataflow seamlessly through streamlined Dataflow templates to power up-to-date materialized views in BigQuery for analytics.

You'll learn how to use Datastream to stream changes (data that's inserted, updated, or deleted) from a source MySQL database into a folder in a Cloud Storage bucket.

You'll configure the Cloud Storage bucket to send notifications that Dataflow will use to learn about any new files containing the data changes that Datastream streams from the source database. A Dataflow job will then process the files and transfer the changes into BigQuery.

integration user flow diagram

Objectives

In this tutorial, you:

  • Create a bucket in Cloud Storage. This is the destination bucket into which Datastream will stream schemas, tables, and data from a source MySQL database.
  • Enable Pub/Sub notifications for the Cloud Storage bucket. By doing this, you're configuring the bucket to send notifications that Dataflow will use to learn about any new files that are ready for processing. These files contain changes to data that Datastream streams from the source database into the bucket.
  • Create datasets in BigQuery. BigQuery uses datasets to contain the data that it receives from Dataflow. This data represents the changes in the source database that Datastream streams into the Cloud Storage bucket.
  • Create and manage connection profiles for a source database and a destination bucket in Cloud Storage. A stream in Datastream uses the information in the connection profiles to transfer data from the source database into the bucket.
  • Create and start a stream. This stream will transfer data, schemas, and tables from the source database into the bucket.
  • Verify that Datastream transfers the data and tables associated with a schema of the source database into the bucket.
  • Create a job in Dataflow. After Datastream streams data changes from the source database into the Cloud Storage bucket, notifications will be sent to Dataflow about new files containing the changes. The Dataflow job processes the files and transfers the changes into BigQuery.
  • Verify that Dataflow processes the files containing changes associated with this data, and transfers the changes into BigQuery. As a result, you have an end-to-end integration between Datastream and BigQuery.
  • Clean up the resources that you created on Datastream, Cloud Storage, Pub/Sub, Dataflow, and BigQuery so they won't take up quota and you won't be billed for them in the future.

Costs

This tutorial uses the following billable components of Google Cloud:

  • Datastream
  • Cloud Storage
  • Pub/Sub
  • Dataflow
  • BigQuery

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

  6. Enable the Datastream API.

    Enable the API

  7. Make sure that you have the Datastream Admin role assigned to your user account.

    Go to the IAM page

  8. Make sure that you have a source database that Datastream can access. For this tutorial, a MySQL database is used as the source.
  9. Make sure that you have data, tables, and schemas in the source database.
  10. Configure the source database to allow incoming connections from Datastream public IP addresses. See IP allowlists and regions for a list of all Datastream regions and their associated public IP addresses.
  11. Set up change data capture (CDC) for the source database. For more information, see Configure your source MySQL database.
  12. Make sure that you have configured a destination Cloud Storage bucket that Datastream can access.
  13. Make sure that you meet all prerequisites to enable Pub/Sub notifications for Cloud Storage.

    In this tutorial, you'll create a destination bucket in Cloud Storage and enable Pub/Sub notifications for the bucket. By doing this, Dataflow can receive notifications about new files that Datastream writes to the bucket. These files contain changes to data that Datastream streams from the source database into the bucket.

Requirements

Datastream offers a variety of source options, destination options, and networking connectivity methods.

For this tutorial, we assume that you're using a standalone MySQL database and a destination Cloud Storage service. For the source database, you should be able to configure your network to add an inbound firewall rule. The source database can be on-premises or in a cloud provider. For the Cloud Storage destination, no connectivity configuration is required.

Because we can't know the specifics of your environment, we can't provide detailed steps when it comes to your networking configuration.

For this tutorial, you'll select IP allowlisting as the network connectivity method. IP allowlisting is a security feature often used for limiting and controlling access to the data in your source database to trusted users. You can use IP allowlists to create lists of trusted IP addresses or IP ranges from which your users and other Google Cloud services such as Datastream can access this data. To use IP allowlists, you must open the source database or firewall to incoming connections from Datastream.

Time

This tutorial takes approximately 45 minutes to complete.

Creating a bucket in Cloud Storage

In this section, you create a bucket in Cloud Storage. This is the destination bucket into which Datastream will stream schemas, tables, and data from a source MySQL database.

  1. Go to the Browser page for Cloud Storage in the Google Cloud console.

    Go to the Browser page

  2. Click CREATE BUCKET. The Create a bucket page appears.

  3. In the text field of the Name your bucket region, enter my-integration-bucket, and then click CONTINUE.

  4. Accept the default settings for each remaining region of the page. To do this, click CONTINUE at the bottom of each region.

  5. Click CREATE.

Enabling Pub/Sub notifications for the Cloud Storage bucket

In this section, you enable Pub/Sub notifications for the Cloud Storage bucket that you created. By doing this, you're configuring the bucket to send notifications that Dataflow will use to learn about any new files that Datastream writes to the bucket. These files contain changes to data that Datastream streams from a source MySQL database into the bucket.

  1. Access the Cloud Storage bucket that you created. The Bucket details page appears.

  2. Click the Activate Cloud Shell button in the upper-right corner of the page.

  3. At the prompt, enter the following command:

    gsutil notification create -t my_integration_notifs -f json gs://my-integration-bucket

  4. Optional. If an Authorize Cloud Shell window appears, then click AUTHORIZE.

  5. Verify that you see the following lines of code:

    Created Cloud Pub/Sub topic projects/project-name/topics/my_integration_notifs
    Created notification config projects/_/buckets/my-integration-bucket/notificationConfigs/1
    
  6. Go to the Topics page for Pub/Sub in the Google Cloud console.

    Go to the Topics page

  7. Click the my_integration_notifs topic that you created in this procedure.

  8. On the my_integration_notifs page, scroll to the bottom of the page. The SUBSCRIPTIONS tab is active. Also, a No subscriptions to display message appears.

  9. Click CREATE SUBSCRIPTION, and then select the Create subscription item from the drop-down menu that appears.

  10. Populate the Add subscription to topic page, as follows:

    1. In the Subscription ID field, enter an ID for the subscription. For this tutorial, enter my_integration_notifs_sub in the field.
    2. Leave all other default values on the page.
    3. Click CREATE.

Later in this tutorial, you'll create a Dataflow job. As part of creating this job, you'll assign Dataflow to be a subscriber of the my_integration_notifs_sub subscription. By doing this, Dataflow can receive notifications about new files that Datastream writes to Cloud Storage, process the files, and transfer the data changes into BigQuery.

Creating Datasets in BigQuery

In this section, you create datasets in BigQuery. BigQuery uses datasets to contain the data that it receives from Dataflow. This data represents the changes in the source MySQL database that Datastream streams into your Cloud Storage bucket.

  1. Go to the SQL workspace page for BigQuery in the Google Cloud console.

    Go to the SQL workspace page

  2. In the Explorer pane, click the View actions button to the right of your Google Cloud project name. This button looks like a vertical ellipsis.

  3. Select Create dataset from the drop-down menu that appears.

  4. Populate the Create dataset window, as follows:

    1. In the Dataset ID field, enter an ID for the dataset. For this tutorial, enter My_integration_dataset_log in the field.
    2. Leave all other default values in the window.
    3. Click CREATE DATASET.
  5. In the Explorer pane, click the node icon to the left of your Google Cloud project name, and verify that you see the dataset that you created.

  6. Use the steps in this procedure to create a second dataset: My_integration_dataset_final.

  7. Expand the node to the left of each dataset.

  8. Verify that each dataset is empty.

After Datastream streams data changes from the source database into your Cloud Storage bucket, a Dataflow job will process the files containing the changes and transfer the changes into the BigQuery datasets.

Creating connection profiles in Datastream

In this section, you create connection profiles in Datastream for a source database and a destination. As part of creating the connection profiles, you'll select MySQL as the profile type for your source connection profile, and Cloud Storage as the profile type for your destination connection profile.

Datastream uses the information defined in the connection profiles to connect to both the source and the destination so that it can to stream data from the source database into your destination bucket in Cloud Storage.

Creating a source connection profile for MySQL database

  1. Go to the Connection profiles page for Datastream in the Google Cloud console.

    Go to the Connection profiles page

  2. Click CREATE PROFILE.

  3. In the Create a connection profile page, click the MySQL profile type (because you want to create a source connection profile for MySQL database).

  4. Supply the following information in the Define connection settings section of the Create MySQL profile page:

    • Enter My Source Connection Profile as the Connection profile name for your source database.
    • Keep the auto-generated Connection profile ID.
    • Select the Region where the connection profile will be stored.

    • Enter Connection details:

      • In the Hostname or IP field, enter a hostname or public IP address that Datastream can use to connect to the source database. You're providing a public IP address because IP allowlisting will be used as the network connectivity method for this tutorial.
      • In the Port field, enter the port number that's reserved for the source database. For a MySQL database, the default port is typically 3306.
      • Enter a Username and Password to authenticate to your source database.
  5. In the Define connection settings section, click CONTINUE. The Secure your connection to your source section of the Create MySQL profile page is active.

  6. From the Encryption type menu, select None. For more information about this menu, see Create a connection profile for MySQL database.

  7. In the Secure your connection to your source section, click CONTINUE. The Define connectivity method section of the Create MySQL profile page is active.

  8. Choose the networking method that you'd like to use to establish connectivity between the source database and your destination bucket in Cloud Storage. For this tutorial, use the Connectivity method drop-down list to select IP allowlisting as the networking method.

  9. Configure your source database to allow incoming connections from the Datastream public IP addresses that appear.

  10. In the Define connectivity method section, click CONTINUE. The Test connection profile section of the Create MySQL profile page is active.

  11. Click RUN TEST to verify that the source database and Datastream can communicate with each other.

  12. Verify that you see the "Test passed" status.

  13. Click CREATE.

Creating a destination connection profile for Cloud Storage

  1. Go to the Connection profiles page for Datastream in the Google Cloud console.

    Go to the Connection profiles page

  2. Click CREATE PROFILE.

  3. In the Create a connection profile page, click the Cloud Storage profile type (because you want to create a destination connection profile for Cloud Storage).

  4. Supply the following information in the Create Cloud Storage profile page:

    • Enter My Destination Connection Profile as the Connection profile name for your destination Cloud Storage service.
    • Keep the auto-generated Connection profile ID.
    • Select the Region where the connection profile will be stored.
    • In the Connection details pane, click BROWSE to select the my-integration-bucket that you created earlier in this tutorial. This is the bucket into which Datastream will transfer data from the source database. After making your selection, click SELECT.

      Your bucket appears in the Bucket name field of the Connection details pane.

    • In the Connection profile path prefix field, provide a prefix for the path that will be appended to the bucket name when Datastream streams data to the destination. For this tutorial, enter /integration/tutorial in the field.

  5. Click CREATE.

After creating a source connection profile for MySQL database and a destination connection profile for Cloud Storage, you can use them to create a stream.

Creating a stream in Datastream

In this section, you create a stream. This stream will transfer data from a source MySQL database into a destination bucket in Cloud Storage.

Creating a stream includes:

  • Defining settings for the stream.
  • Selecting the connection profile that you created for your source database (the source connection profile). For this tutorial, this is My Source Connection Profile.
  • Configuring information about the source database for the stream by specifying the tables and schemas in the source database that Datastream:
    • Can transfer into the destination.
    • Is restricted from transferring into the destination.
  • Determining whether Datastream will backfill historical data, as well as stream ongoing changes into the destination, or stream only changes to the data.
  • Selecting the connection profile that you created for Cloud Storage (the destination connection profile). For this tutorial, this is My Destination Connection Profile.
  • Configuring information about the destination bucket for the stream. This information includes:
    • The folder of the destination bucket into which Datastream will transfer schemas, tables, and data from the source database.
    • The output format of files written to Cloud Storage. Datastream currently supports two output formats: Avro and JSON. For this tutorial, Avro is the file format.

Defining settings for the stream

  1. Go to the Streams page for Datastream in the Google Cloud console.

    Go to the Streams page

  2. Click CREATE STREAM.

  3. Supply the following information in the Define stream details panel of the Create stream page:

    • Enter My Stream as the Stream name.
    • Keep the auto-generated Stream ID.
    • From the Region menu, select the region where you created your source and destination connection profiles.
    • From the Source type menu, select the MySQL profile type.
    • From the Destination type menu, select the Cloud Storage profile type.
  4. Review the required prerequisites that are generated automatically to reflect how your environment must be prepared for a stream. These prerequisites can include how to configure the source database and how to connect Datastream to the destination bucket in Cloud Storage.

  5. Click CONTINUE. The Define MySQL connection profile panel of the Create stream page appears.

Specifying information about the source connection profile

  1. From the Source connection profile menu, select your source connection profile for MySQL database.

  2. Click RUN TEST to verify that the source database and Datastream can communicate with each other.

    If the test fails, then the issue associated with the connection profile appears. Make the necessary changes to correct the issue, and then retest.

  3. Click CONTINUE. The Configure stream source panel of the Create stream page appears.

Configuring information about the source database for the stream

  1. Use the Objects to include menu to specify the tables and schemas in your source database that Datastream can transfer into a folder in the destination bucket in Cloud Storage.

    For this tutorial, you want Datastream to transfer all tables and schemas. Therefore, select All tables from all schemas from the menu.

  2. Click CONTINUE. The Define Cloud Storage connection profile panel of the Create stream page appears.

Selecting a destination connection profile

  1. From the Destination connection profile menu, select your destination connection profile for Cloud Storage.

  2. Click CONTINUE. The Configure stream destination panel of the Create stream page appears.

Configuring information about the destination for the stream

  1. In the Output format field, select the format of files written to Cloud Storage. For this tutorial, Avro is the file format.

  2. Click CONTINUE. The Review stream details and create panel of the Create stream page appears.

Creating the stream

  1. Verify details about the stream as well as the source and destination connection profiles that the stream will use to transfer data from a source MySQL database to a destination bucket in Cloud Storage.

  2. Click RUN VALIDATION to validate the stream. By validating a stream, Datastream checks that the source is configured properly, validates that the stream can connect to both the source and the destination, and verifies the end-to-end configuration of the stream.

  3. After all validation checks pass, click CREATE.

  4. In the Create stream? dialog box, click CREATE.

After creating a stream, you can start it.

Starting the stream

In the previous section of the tutorial, you created a stream, but you didn't start it. You can do this now.

For this tutorial, you create and start a stream separately in case the stream creation process incurs an increased load on your source database. To put off that load, you create the stream without starting it, and then start the stream when the load can be incurred.

By starting the stream, Datastream can transfer data, schemas, and tables from the source database to the destination.

  1. Go to the Streams page for Datastream in the Google Cloud console.

    Go to the Streams page

  2. Select the check box to the left of the stream that you want to start. For this tutorial, this is My Stream.

  3. Click START.

  4. In the dialog box, click START. The status of the stream changes from Not started to Starting to Running.

After starting a stream, you can verify that Datastream transferred data from the source database to the destination.

Verifying the stream

In this section, you confirm that Datastream transfers the data from all tables of a source MySQL database into the /integration/tutorial folder of your Cloud Storage destination bucket. For this tutorial, the name of your bucket is my-integration-bucket.

  1. Go to the Streams page for Datastream in the Google Cloud console.

    Go to the Streams page

  2. Click the stream that you created. For this tutorial, this is My Stream.

  3. In the Stream details page, click the link that appears below the Destination write path field. The Bucket details page of Cloud Storage opens in a separate tab.

  4. Verify that you see folders that represent tables of the source database.

  5. Click one of the table folders and drill down until you see data that's associated with the table.

Creating a Dataflow job

In this section, you create a job in Dataflow. After Datastream streams data changes from a source MySQL database into your Cloud Storage bucket, notifications will be sent to Dataflow about new files containing the changes. The Dataflow job processes the files and transfers the changes into BigQuery.

  1. Go to the Jobs page for Dataflow in the Google Cloud console.

    Go to the Jobs page

  2. Click CREATE JOB FROM TEMPLATE.

  3. In the Job name field of the Create job from template page, enter a name for the Dataflow job that you're creating. For this tutorial, enter my-dataflow-integration-job in the field.

  4. From the Regional endpoint menu, select the region where the job will be stored. This is the same region that you selected for the source connection profile, destination connection profile, and stream that you created.

  5. From the Dataflow template menu, select the template that you're using to create the job. For this tutorial, select Datastream to BigQuery.

    After making this selection, additional fields related to this template appear.

  6. In the File location for Datastream file output in Cloud Storage. field, enter the path that contains the name of your Cloud Storage bucket. For this tutorial, enter gs://my-integration-bucket in the field.

  7. In the Pub/Sub subscription being used in a Cloud Storage notification policy. field, enter the path that contains the name of your Pub/Sub subscription. For this tutorial, enter projects/project-name/subscriptions/my_integration_notifs_sub in the field.

  8. In the Datastream output file format (avro/json). field, enter avro because, for this tutorial, Avro is the file format of files that Datastream writes to Cloud Storage.

  9. In the Name or template for the dataset to contain staging tables. field, enter My_integration_dataset_log because Dataflow will use this dataset to stage the data changes that it receives from Datastream.

  10. In the Template for the dataset to contain replica tables. field, enter My_integration_dataset_final because this is the dataset where the changes that are staged in the My_integration_dataset_log dataset will be merged to create a 1-to-1 replica of the tables in the source database.

  11. In the Dead letter queue directory. field, enter the path that contains the name of your Cloud Storage bucket and a folder for a dead letter queue. Any data changes that Dataflow fails to transfer into BigQuery will be stored in the queue. You can fix the content in the queue so that Dataflow can reprocess it.

    For this tutorial, enter gs://my-integration-bucket/dlq in the Dead letter queue directory. field (where dlq is the folder for the dead letter queue).

  12. Click RUN JOB.

Verifying the integration

In the Verifying the stream section of this tutorial, you confirmed that Datastream transferred the data from all tables of a source MySQL database into the /integration/tutorial folder of your Cloud Storage destination bucket.

In this section, you verify that Dataflow processes the files containing changes associated with this data, and transfers the changes into BigQuery. As a result, you have an end-to-end integration between Datastream and BigQuery.

  1. Go to the SQL workspace page for BigQuery in the Google Cloud console.

    Go to the SQL workspace page

  2. In the Explorer pane, expand the node to the left of the name of your Google Cloud project.

  3. Expand the nodes to the left of the My_integration_dataset_log and My_integration_dataset_final datasets.

  4. Verify that each dataset now contains data. This confirms that Dataflow processed the files containing changes associated with the data that Datastream streamed into Cloud Storage, and transferred these changes into BigQuery.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, follow these steps.

  1. Use the Google Cloud console to delete your project, Datastream stream, and Datastream connection profiles, stop the Dataflow job, and delete the BigQuery datasets, Pub/Sub topic and subscription, and Cloud Storage bucket.

By cleaning up the resources that you created on Datastream, Dataflow, BigQuery, Pub/Sub, and Cloud Storage, they won't take up quota and you won't be billed for them in the future. The following sections describe how to delete or turn off these resources.

Deleting your project

The easiest way to eliminate billing is to delete the project that you created for this tutorial.

  1. In the Cloud console, go to the Manage resources page.

    Go to the Manage resources page

  2. In the project list, select the project that you want to delete, and then click Delete.

  3. In the dialog box, type the project ID, and then click Shut down to delete the project.

Deleting the stream

  1. Go to the Streams page for Datastream in the Google Cloud console.

    Go to the Streams page

  2. Click the stream that you want to delete. For this tutorial, this is My Stream.

  3. Click PAUSE.

  4. In the dialog box, click PAUSE.

  5. In the Stream status pane of the Stream details page, verify that the status of the stream is Paused.

  6. Click DELETE.

  7. In the dialog box, enter Delete in the text field, and then click DELETE.

Deleting the connection profiles

  1. Go to the Connection profiles page for Datastream in the Google Cloud console.

    Go to the Connection profiles page

  2. Select the check box for each connection profile that you want to delete. For this tutorial, select the check boxes for My Source Connection Profile and My Destination Connection Profile.

  3. Click DELETE.

  4. In the dialog box, click DELETE.

Stopping the Dataflow job

  1. Go to the Jobs page for Dataflow in the Google Cloud console.

    Go to the Jobs page

  2. Click the job that you want to stop. For this tutorial, this is my-dataflow-integration-job.

  3. Click STOP.

  4. In the Stop job dialog box, select the Drain option, and then click STOP JOB.

Deleting the BigQuery datasets

  1. Go to the SQL workspace page for BigQuery in the Google Cloud console.

    Go to the SQL workspace page

  2. In the Explorer pane, expand the node to the left of your Google Cloud project name.

  3. Click the View actions button to the right of one of the datasets that you created in Creating Datasets in BigQuery. This button looks like a vertical ellipsis.

    For this tutorial, click the View actions button to the right of My_integration_dataset_log.

  4. Select Delete from the drop-down menu that appears.

  5. In the Delete dataset? dialog box, enter delete in the text field, and then click DELETE.

  6. Use the steps in this procedure to delete the second dataset that you created: My_integration_dataset_final.

Deleting the Pub/Sub subscription and topic

  1. Go to the Subscriptions page for Pub/Sub in the Google Cloud console.

    Go to the Subscriptions page

  2. Click the check box to the left of the subscription that you want to delete. For this tutorial, click the check box to the left of my_integration_notifs_sub.

  3. Click DELETE.

  4. In the Delete subscription dialog box, click DELETE.

  5. Go to the Topics page for Pub/Sub in the Google Cloud console.

    Go to the Topics page

  6. Click the check box to the left of the topic that you want to delete. For this tutorial, click the check box to the left of my_integration_notifs.

  7. Click DELETE.

  8. In the Delete topic dialog box, enter delete in the text field, and then click DELETE.

Deleting your Cloud Storage bucket

  1. Go to the Browser page for Cloud Storage in the Google Cloud console.

    Go to the Browser page

  2. Select the check box to the left of your bucket. For this tutorial, this is my-integration-bucket.

  3. Click DELETE.

  4. In the dialog box, enter DELETE in the text field, and then click DELETE.

What's next

  • Learn more about Datastream.
  • Try out other Google Cloud features for yourself. Have a look at our tutorials.