Create a BigQuery Engine for Apache Flink job with the CLI
Learn how to create a BigQuery Engine for Apache Flink job, monitor the status of the job, and view the results. In this quickstart, you can create a job that uses either Java or Python.
Before you begin
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Update and install
gcloud
components:gcloud components update
gcloud components install managed-flink-client -
Create a Google Cloud project.
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Make sure that billing is enabled for your Google Cloud project.
-
Enable the BigQuery Engine for Apache Flink APIs:
gcloud services enable managedflink.googleapis.com
compute.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/managedflink.developer
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Update and install
gcloud
components:gcloud components update
gcloud components install managed-flink-client -
Create a Google Cloud project.
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Make sure that billing is enabled for your Google Cloud project.
-
Enable the BigQuery Engine for Apache Flink APIs:
gcloud services enable managedflink.googleapis.com
compute.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/managedflink.developer
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again. Create a Cloud Storage bucket by running the
gcloud storage buckets create
command:gcloud storage buckets create gs://BUCKET_NAME --location=US
Replace
BUCKET_NAME
with a name for the bucket. For information about bucket naming requirements, see Bucket names.
Prepare the pipeline code
Java
Download and install a Java Development Kit (JDK). Verify that the
JAVA_HOME
environment variable is set and points to your JDK installation.Clone or download the
apache/flink
GitHub repository and change into theflink/flink-examples/flink-examples-streaming
directory.git clone https://github.com/apache/flink.git --branch release-1.19.1 cd flink/flink-examples/flink-examples-streaming
Build the JAR file for the example pipeline:
../../mvnw clean package
Verify that this command built a JAR file named
WordCount.jar
.ls target/WordCount.jar
Python
Install Python 3.11 and
pip
.Clone or download the
apache/flink
GitHub repository and change into theflink/flink-python/pyflink/examples/table
directory.git clone https://github.com/apache/flink.git cd flink/flink-python/pyflink/examples/table
Create an archive file that packages the Python virtual environment for the job:
python -m venv pyflink_venv source pyflink_venv/bin/activate pip install "apache-flink==1.19.0" venv-pack venv-pack -o pyflink_venv.tar.gz
Upload the archive file to Cloud Storage:
gcloud storage cp pyflink_venv.tar.gz gs://BUCKET_NAME/pyflink_venv.tar.gz
Replace
BUCKET_NAME
with the name of your Cloud Storage bucket.For more information about packaging the Python virtual environment, see Python virtual environment.
Create a network and subnet
Use the networks create
command to create a VPC in your project.
gcloud compute networks create NETWORK_NAME \
--project=PROJECT_ID
Replace the following:
NETWORK_NAME
: a name for the VPC, for examplevpc-1
.PROJECT_ID
: your project ID.
Use the
subnets create
command to add a subnet with Private Google Access enabled.
gcloud compute networks subnets create SUBNET_NAME \
--network=NETWORK_NAME \
--project=PROJECT_ID \
--range=10.0.0.0/24 \
--region=us-central1 \
--enable-private-ip-google-access
Replace the following:
SUBNET_NAME
: a name for the subnet, for examplesubnet-1
.
Create a deployment
In this step, you create a deployment, which is a dedicated and isolated environment where your Apache Flink jobs run.
The first time you create either a deployment or an on-demand job in a project or in a subnet, the creation can take 30 minutes or more to complete. After that, it takes less time to create a new deployment or job.
To create the deployment, use the
gcloud alpha managed-flink deployments create
command:
gcloud alpha managed-flink deployments create my-deployment \
--project=PROJECT_ID \
--location=us-central1 \
--network-config-vpc=NETWORK_NAME \
--network-config-subnetwork=SUBNET_NAME \
--max-slots=4
Replace the following:
PROJECT_ID
: your project ID.NETWORK_NAME
: the name of the VPC.SUBNET_NAME
: the name of the subnet.
Although the default
network has configurations that allow deployments to run
jobs, for security reasons, we recommend that you create a separate network for
BigQuery Engine for Apache Flink. The default
network is not secure, because it is
pre-populated with firewall rules that allow incoming connections to instances.
Grant service account permissions
Grant the Managed Flink Default Workload Identity read and write permissions to the Cloud Storage bucket, by running the following command:
gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
--member="serviceAccount:gmf-PROJECT_NUMBER-default@gcp-sa-managedflink-wi.iam.gserviceaccount.com" \
--role=roles/storage.objectAdmin
Replace the following:
BUCKET_NAME
: the name of the bucket.PROJECT_NUMBER
: your project number. To find your project number, see Identify projects or use thegcloud projects describe
command.
Create a job
In this step, you create a
BigQuery Engine for Apache Flink job that runs the
example pipeline. To create the job, use the
gcloud alpha managed-flink jobs create
command:
Java
gcloud alpha managed-flink jobs create ./target/WordCount.jar \
--name=my-job \
--location=us-central1 \
--deployment=my-deployment \
--project=PROJECT_ID \
--staging-location=gs://BUCKET_NAME/jobs/ \
--min-parallelism=1 \
--max-parallelism=4 \
-- --output gs://BUCKET_NAME/
Replace the following:
PROJECT_ID
: your project IDBUCKET_NAME
: the name of your Cloud Storage bucket
The --
option specifies command-line arguments for the pipeline, which are
defined by the pipeline code. In the WordCount
example, output
specifies
the location to write the output.
Python
gcloud alpha managed-flink jobs create ${PWD}/word_count.py \
--name=word-count \
--location=us-central1 \
--deployment=my-deployment \
--project=PROJECT_ID \
--staging-location=gs://BUCKET_NAME/jobs/ \
--min-parallelism=1 \
--max-parallelism=4 \
--python-venv=gs://BUCKET_NAME/pyflink_venv.tar.gz \
-- --output gs://BUCKET_NAME/
Replace the following:
PROJECT_ID
: your project IDBUCKET_NAME
: the name of your Cloud Storage bucket
The --
option specifies command-line arguments for the pipeline, which are
defined by the pipeline code. In this example, --output
specifies the
location to write the output.
The --python-venv
option specifies the Cloud Storage location of
the Python virtual environment archive.
While the job is being submitted, the gcloud CLI output shows the operation as pending. If the job is successfully submitted, the gcloud CLI output shows the following:
Create request issued for JOB_ID.
The value of JOB_ID is the job ID, which you can use to update or delete the job. For more information, see Create and manage jobs.
Monitor the job
In the Google Cloud console, go to the BigQuery Engine for Apache Flink Jobs page.
The Jobs page lists the available jobs, including the job name, job ID, status, and creation time.
To see additional job details, click the job name.
Wait for the job to complete. When the job completes, the job status is
Finished
.
Examine the pipeline output
When the job completes, perform the following steps to see the output from the pipeline:
In the Google Cloud console, go to the Cloud Storage Buckets page.
In the bucket list, click the name of the bucket that you created in Before you begin. The Bucket details page opens, with the Objects tab selected.
The pipeline creates a folder with the naming pattern
YYYYY-MM-DD--HH
. Click the folder name.If the pipeline ran successfully, the folder contains a file with the prefix
part-
; for example,part-4253227c-4a45-4c6e-8918-0106d95bbf86-0
. Click this file.In the Object details page, click the authenticated URL to view the contents of the output file. The output looks similar to the following:
Java
(to,1) (be,1) (or,1) (not,1) (to,2) (be,2) (that,1) [....]
Python
{"data":[{"word":"To","count":1}],"type":"INSERT"} {"data":[{"word":"be,","count":1}],"type":"INSERT"} {"data":[{"word":"or","count":1}],"type":"INSERT"} [....]
Clean up
To avoid incurring charges to your Google Cloud account for the resources used on this page, delete the Google Cloud project with the resources.
Delete the project
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
What's next
- Learn how to create and manage jobs.
- Learn how to use the BigQuery Engine for Apache Flink monitoring interface.