Create a BigQuery Engine for Apache Flink job with the CLI

Learn how to create a BigQuery Engine for Apache Flink job, monitor the status of the job, and view the results. In this quickstart, you can create a job that uses either Java or Python.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Update and install gcloud components:

    gcloud components update
    gcloud components install managed-flink-client
  5. Create a Google Cloud project.

    gcloud projects create PROJECT_ID

    Replace PROJECT_ID with a name for the Google Cloud project you are creating.

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the BigQuery Engine for Apache Flink APIs:

    gcloud services enable managedflink.googleapis.com compute.googleapis.com
  8. Create local authentication credentials for your user account:

    gcloud auth application-default login
  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/managedflink.developer

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  10. Install the Google Cloud CLI.
  11. To initialize the gcloud CLI, run the following command:

    gcloud init
  12. Update and install gcloud components:

    gcloud components update
    gcloud components install managed-flink-client
  13. Create a Google Cloud project.

    gcloud projects create PROJECT_ID

    Replace PROJECT_ID with a name for the Google Cloud project you are creating.

  14. Make sure that billing is enabled for your Google Cloud project.

  15. Enable the BigQuery Engine for Apache Flink APIs:

    gcloud services enable managedflink.googleapis.com compute.googleapis.com
  16. Create local authentication credentials for your user account:

    gcloud auth application-default login
  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/managedflink.developer

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  18. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  19. Create a Cloud Storage bucket by running the gcloud storage buckets create command:

    gcloud storage buckets create gs://BUCKET_NAME --location=US

    Replace BUCKET_NAME with a name for the bucket. For information about bucket naming requirements, see Bucket names.

Prepare the pipeline code

Java

  1. Download and install a Java Development Kit (JDK). Verify that the JAVA_HOME environment variable is set and points to your JDK installation.

  2. Clone or download the apache/flink GitHub repository and change into the flink/flink-examples/flink-examples-streaming directory.

    git clone https://github.com/apache/flink.git --branch release-1.19.1
    cd flink/flink-examples/flink-examples-streaming
    
  3. Build the JAR file for the example pipeline:

    ../../mvnw clean package
    
  4. Verify that this command built a JAR file named WordCount.jar.

    ls target/WordCount.jar
    

Python

  1. Install Python 3.11 and pip.

  2. Clone or download the apache/flink GitHub repository and change into the flink/flink-python/pyflink/examples/table directory.

    git clone https://github.com/apache/flink.git
    cd flink/flink-python/pyflink/examples/table
    
  3. Create an archive file that packages the Python virtual environment for the job:

    python -m venv pyflink_venv
    source pyflink_venv/bin/activate
    pip install "apache-flink==1.19.0" venv-pack
    venv-pack -o pyflink_venv.tar.gz
    
  4. Upload the archive file to Cloud Storage:

    gcloud storage cp pyflink_venv.tar.gz gs://BUCKET_NAME/pyflink_venv.tar.gz
    

    Replace BUCKET_NAME with the name of your Cloud Storage bucket.

    For more information about packaging the Python virtual environment, see Python virtual environment.

Create a network and subnet

Use the networks create command to create a VPC in your project.

gcloud compute networks create NETWORK_NAME \
  --project=PROJECT_ID

Replace the following:

  • NETWORK_NAME: a name for the VPC, for example vpc-1.
  • PROJECT_ID: your project ID.

Use the subnets create command to add a subnet with Private Google Access enabled.

gcloud compute networks subnets create SUBNET_NAME \
    --network=NETWORK_NAME \
    --project=PROJECT_ID \
    --range=10.0.0.0/24 \
    --region=us-central1 \
    --enable-private-ip-google-access

Replace the following:

  • SUBNET_NAME: a name for the subnet, for example subnet-1.

Create a deployment

In this step, you create a deployment, which is a dedicated and isolated environment where your Apache Flink jobs run.

The first time you create either a deployment or an on-demand job in a project or in a subnet, the creation can take 30 minutes or more to complete. After that, it takes less time to create a new deployment or job.

To create the deployment, use the gcloud alpha managed-flink deployments create command:

gcloud alpha managed-flink deployments create my-deployment \
  --project=PROJECT_ID \
  --location=us-central1 \
  --network-config-vpc=NETWORK_NAME \
  --network-config-subnetwork=SUBNET_NAME \
  --max-slots=4

Replace the following:

  • PROJECT_ID: your project ID.
  • NETWORK_NAME: the name of the VPC.
  • SUBNET_NAME: the name of the subnet.

Although the default network has configurations that allow deployments to run jobs, for security reasons, we recommend that you create a separate network for BigQuery Engine for Apache Flink. The default network is not secure, because it is pre-populated with firewall rules that allow incoming connections to instances.

Grant service account permissions

Grant the Managed Flink Default Workload Identity read and write permissions to the Cloud Storage bucket, by running the following command:

gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
  --member="serviceAccount:gmf-PROJECT_NUMBER-default@gcp-sa-managedflink-wi.iam.gserviceaccount.com" \
  --role=roles/storage.objectAdmin

Replace the following:

Create a job

In this step, you create a BigQuery Engine for Apache Flink job that runs the example pipeline. To create the job, use the gcloud alpha managed-flink jobs create command:

Java

gcloud alpha managed-flink jobs create ./target/WordCount.jar \
--name=my-job \
--location=us-central1 \
--deployment=my-deployment \
--project=PROJECT_ID \
--staging-location=gs://BUCKET_NAME/jobs/ \
--min-parallelism=1 \
--max-parallelism=4 \
-- --output gs://BUCKET_NAME/

Replace the following:

  • PROJECT_ID: your project ID
  • BUCKET_NAME: the name of your Cloud Storage bucket

The -- option specifies command-line arguments for the pipeline, which are defined by the pipeline code. In the WordCount example, output specifies the location to write the output.

Python

gcloud alpha managed-flink jobs create  ${PWD}/word_count.py \
--name=word-count \
--location=us-central1 \
--deployment=my-deployment \
--project=PROJECT_ID \
--staging-location=gs://BUCKET_NAME/jobs/ \
--min-parallelism=1 \
--max-parallelism=4 \
--python-venv=gs://BUCKET_NAME/pyflink_venv.tar.gz \
-- --output gs://BUCKET_NAME/

Replace the following:

  • PROJECT_ID: your project ID
  • BUCKET_NAME: the name of your Cloud Storage bucket

The -- option specifies command-line arguments for the pipeline, which are defined by the pipeline code. In this example, --output specifies the location to write the output.

The --python-venv option specifies the Cloud Storage location of the Python virtual environment archive.

While the job is being submitted, the gcloud CLI output shows the operation as pending. If the job is successfully submitted, the gcloud CLI output shows the following:

Create request issued for JOB_ID.

The value of JOB_ID is the job ID, which you can use to update or delete the job. For more information, see Create and manage jobs.

Monitor the job

  1. In the Google Cloud console, go to the BigQuery Engine for Apache Flink Jobs page.

    Go to Jobs

    The Jobs page lists the available jobs, including the job name, job ID, status, and creation time.

  2. To see additional job details, click the job name.

  3. Wait for the job to complete. When the job completes, the job status is Finished.

Examine the pipeline output

When the job completes, perform the following steps to see the output from the pipeline:

  1. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  2. In the bucket list, click the name of the bucket that you created in Before you begin. The Bucket details page opens, with the Objects tab selected.

  3. The pipeline creates a folder with the naming pattern YYYYY-MM-DD--HH. Click the folder name.

  4. If the pipeline ran successfully, the folder contains a file with the prefix part-; for example, part-4253227c-4a45-4c6e-8918-0106d95bbf86-0. Click this file.

  5. In the Object details page, click the authenticated URL to view the contents of the output file. The output looks similar to the following:

    Java

    (to,1)
    (be,1)
    (or,1)
    (not,1)
    (to,2)
    (be,2)
    (that,1)
    [....]
    

    Python

    {"data":[{"word":"To","count":1}],"type":"INSERT"}
    {"data":[{"word":"be,","count":1}],"type":"INSERT"}
    {"data":[{"word":"or","count":1}],"type":"INSERT"}
    [....]
    

Clean up

To avoid incurring charges to your Google Cloud account for the resources used on this page, delete the Google Cloud project with the resources.

Delete the project

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

What's next