Use BigQuery Engine for Apache Flink to run a SQL query

Learn how to create a BigQuery Engine for Apache Flink job that executes an Apache Flink SQL query.

Apache Flink SQL lets you define a stream processing pipeline by using SQL statements. Apache Flink SQL is a separate dialect of SQL from GoogleSQL, which BigQuery uses. For more information, see SQL in the Apache Flink documentation.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Update and install gcloud components:

    gcloud components update
    gcloud components install managed-flink-client
  5. Create a Google Cloud project.

    gcloud projects create PROJECT_ID

    Replace PROJECT_ID with a name for the Google Cloud project you are creating.

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the BigQuery Engine for Apache Flink APIs:

    gcloud services enable managedflink.googleapis.com compute.googleapis.com
  8. Create local authentication credentials for your user account:

    gcloud auth application-default login
  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/managedflink.developer

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  10. Install the Google Cloud CLI.
  11. To initialize the gcloud CLI, run the following command:

    gcloud init
  12. Update and install gcloud components:

    gcloud components update
    gcloud components install managed-flink-client
  13. Create a Google Cloud project.

    gcloud projects create PROJECT_ID

    Replace PROJECT_ID with a name for the Google Cloud project you are creating.

  14. Make sure that billing is enabled for your Google Cloud project.

  15. Enable the BigQuery Engine for Apache Flink APIs:

    gcloud services enable managedflink.googleapis.com compute.googleapis.com
  16. Create local authentication credentials for your user account:

    gcloud auth application-default login
  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/managedflink.developer

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  18. Create a Cloud Storage bucket by running the gcloud storage buckets create command:

    gcloud storage buckets create gs://BUCKET_NAME --location=US

    Replace BUCKET_NAME with a name for the bucket. For information about bucket naming requirements, see Bucket names.

Create the SQL query file

  1. Use a text editor to create a new file named create-table.sql.

  2. Paste the following into the file:

    CREATE TABLE table1 (
       UserId int,
       Name varchar(256)
    ) WITH (
       'connector' = 'filesystem',
       'path' = 'gs://BUCKET_NAME/output/',
       'format' = 'csv'
    );
    
    INSERT INTO table1 VALUES(1, 'Alice'),(2, 'Bob'),(3, 'Charles');
    

    Replace BUCKET_NAME with the name of your Cloud Storage bucket.

  3. Save the file.

This query creates a table that is backed by Cloud Storage and inserts some data into the table. When the query runs, it writes the table data as one or more CSV files.

Create a network and subnet

Use the networks create command to create a VPC in your project.

gcloud compute networks create NETWORK_NAME \
  --project=PROJECT_ID

Replace the following:

  • NETWORK_NAME: a name for the VPC, for example vpc-1.
  • PROJECT_ID: your project ID.

Use the subnets create command to add a subnet with Private Google Access enabled.

gcloud compute networks subnets create SUBNET_NAME \
    --network=NETWORK_NAME \
    --project=PROJECT_ID \
    --range=10.0.0.0/24 \
    --region=us-central1 \
    --enable-private-ip-google-access

Replace the following:

  • SUBNET_NAME: a name for the subnet, for example subnet-1.

Create a deployment

In this step, you create a deployment, which is a dedicated and isolated environment where your Apache Flink jobs run.

The first time you create either a deployment or an on-demand job in a project or in a subnet, the creation can take 30 minutes or more to complete. After that, it takes less time to create a new deployment or job.

To create the deployment, use the gcloud alpha managed-flink deployments create command:

gcloud alpha managed-flink deployments create my-deployment \
  --project=PROJECT_ID \
  --location=us-central1 \
  --network-config-vpc=NETWORK_NAME \
  --network-config-subnetwork=SUBNET_NAME \
  --max-slots=4

Replace the following:

  • PROJECT_ID: your project ID.
  • NETWORK_NAME: the name of the VPC.
  • SUBNET_NAME: the name of the subnet.

Although the default network has configurations that allow deployments to run jobs, for security reasons, we recommend that you create a separate network for BigQuery Engine for Apache Flink. The default network is not secure, because it is pre-populated with firewall rules that allow incoming connections to instances.

Grant service account permissions

Grant the Managed Flink Default Workload Identity read and write permissions to the Cloud Storage bucket, by running the following command:

gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
  --member="serviceAccount:gmf-PROJECT_NUMBER-default@gcp-sa-managedflink-wi.iam.gserviceaccount.com" \
  --role=roles/storage.objectAdmin

Replace the following:

Create a job

In this step, you create a BigQuery Engine for Apache Flink job that runs the SQL query. To create the job, use the gcloud alpha managed-flink jobs create command:

gcloud alpha managed-flink jobs create ./create-table.sql \
  --name=my-job \
  --location=us-central1 \
  --deployment=my-deployment \
  --project=PROJECT_ID \
  --staging-location=gs://BUCKET_NAME/jobs/ \
  --min-parallelism=1 \
  --max-parallelism=4

Replace the following:

  • PROJECT_ID: your project ID
  • BUCKET_NAME: the name of your Cloud Storage bucket

While the job is being submitted, the gcloud CLI output shows the operation as pending. If the job is successfully submitted, the gcloud CLI output shows the following:

Create request issued for JOB_ID.

The value of JOB_ID is the job ID, which you can use to update or delete the job. For more information, see Create and manage jobs.

Monitor the job

  1. In the Google Cloud console, go to the BigQuery Engine for Apache Flink Jobs page.

    Go to Jobs

    The Jobs page lists the available jobs, including the job name, job ID, status, and creation time.

  2. To see additional job details, click the job name.

  3. Wait for the job to complete.

Examine the pipeline output

When the job completes, perform the following steps to see the output from the pipeline:

  1. Run the following command to download the pipeline output:

    gsutil cp gs://BUCKET_NAME/output/* .
    

    Replace BUCKET_NAME with the name of your Cloud Storage bucket.

  2. The job creates a file with the prefix part-; for example, part-4253227c-4a45-4c6e-8918-0106d95bbf86-0. Examine the contents of this file.

    more part-*
    

    The output looks similar to the following:

    1,Alice
    2,Bob
    3,Charles
    

Clean up

To avoid incurring charges to your Google Cloud account for the resources used on this page, delete the Google Cloud project with the resources.

Delete the project

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

What's next