Create and manage jobs

A job runs your Apache Flink pipeline. You can either run jobs within an existing deployment, or you can run an on-demand job.

To create a deployment for your jobs, see Create a deployment.

Required APIs

To create and manage a BigQuery Engine for Apache Flink job, you must enable the BigQuery Engine for Apache Flink API.

gcloud services enable managedflink.googleapis.com

You might need to enable additional APIs such as Cloud Storage if your pipeline requires it.

Required roles and permissions

This section describes the roles required to manage your jobs. For more information about BigQuery Engine for Apache Flink roles, see BigQuery Engine for Apache Flink predefined roles.

Create, update, and delete

To get the permissions that you need to create, update, and delete a job, ask your administrator to grant you the Managed Flink Developer (roles/managedflink.developer) IAM role on your project. For more information about granting roles, see Manage access to projects, folders, and organizations.

This predefined role contains the permissions required to create, update, and delete a job. To see the exact permissions that are required, expand the Required permissions section:

Required permissions

The following permissions are required to create, update, and delete a job:

  • Create a job: managedflink.jobs.create
  • Update a job: managedflink.jobs.update
  • Delete a job: managedflink.jobs.delete

You might also be able to get these permissions with custom roles or other predefined roles.

Get and list

To get the permissions that you need to retrieve information about a job and list jobs, ask your administrator to grant you the Managed Flink Viewer (roles/managedflink.viewer) IAM role on your project. For more information about granting roles, see Manage access to projects, folders, and organizations.

This predefined role contains the permissions required to retrieve information about a job and list jobs. To see the exact permissions that are required, expand the Required permissions section:

Required permissions

The following permissions are required to retrieve information about a job and list jobs:

  • Get details about a job: managedflink.jobs.get
  • List jobs: managedflink.jobs.list

You might also be able to get these permissions with custom roles or other predefined roles.

Properties of a job

BigQuery Engine for Apache Flink jobs have the following properties.

Job ID

The ID for the job. BigQuery Engine for Apache Flink automatically generates the job ID when you create the job.

Job name

An optional name for the job. Job names don't need to be unique.

Location

The location where the job runs. The location must be one of the supported Google Cloud regions. If the job is created within an existing deployment, the job location must match the deployment location. You can't change the location of a job. For a list of available locations, see BigQuery Engine for Apache Flink locations.

Project ID

The ID of the Google Cloud project for the job that you are creating. Your job is created in the project that you specify. If the job is created within an existing deployment, the job project must match the deployment project. You can't change the project of a job. For information about Google Cloud project IDs and project numbers, see Identifying projects.

Deployment ID

The name of the BigQuery Engine for Apache Flink deployment to use for this job. If you don't specify an existing deployment, a one-time deployment is created to run the job. That deployment only exists while the job is running and can't be used to run other jobs.

Job file

When you create a job, you specify a file that defines your Apache Flink pipeline. BigQuery Engine for Apache Flink uses this file to execute your job.

BigQuery Engine for Apache Flink supports JAR, Python, and SQL files.

For more information about these job types, see the following pages in the Apache Flink documentation:

Python virtual environment

For Python jobs, you must provide an archive file that packages the Python virtual environment for the job. Create the archive file as follows:

  1. Verify that you have Python version 3.11 installed. BigQuery Engine for Apache Flink requires version 3.11.

    python3 --version
    
  2. Create a Python virtual environment.

    python -m venv pyflink_venv
    source pyflink_venv/bin/activate
    
  3. Install the apache-flink Python library, along with any other dependencies that your job requires.

    pip install "apache-flink==1.19.0" venv-pack
    # Install other dependencies that your job needs
    # pip install ...
    
  4. Use the venv-pack tool to package the environment.

    venv-pack -o pyflink_venv.tar.gz
    
  5. Use the gcloud storage cp command to upload the archive file to Cloud Storage.

    gcloud storage cp pyflink_venv.tar.gz gs://BUCKET_NAME/pyflink_venv.tar.gz
    
  6. When you run the job, specify the Cloud Storage location in the --python-venv parameter of the gcloud alpha managed-flink jobs create command.

    --python-venv=gs://BUCKET_NAME/pyflink_venv.tar.gz \
    

Create an on-demand job

Follow these steps to create an on-demand job that. On-demand jobs aren't associated with existing deployments.

To create an on-demand job by using the gcloud CLI, use the gcloud alpha managed-flink jobs create command.

gcloud alpha managed-flink jobs create FILE \
  --location=REGION \
  --project=PROJECT_ID \
  --name=JOB_NAME \
  --staging-location=STAGING_LOCATION \
  --min-parallelism=MINIMUM_SLOTS \
  --max-parallelism=MAXIMUM_SLOTS \
  -- JOB_ARGUMENTS

Replace the following:

  • FILE: the absolute path to the job file. For JAR files, you can also specify a path to an artifact stored in Artifact Registry. For more information, see Use Artifact Registry.
  • REGION: a BigQuery Engine for Apache Flink region, like us-central1
  • PROJECT_ID: your BigQuery Engine for Apache Flink project ID
  • JOB_NAME: a name for the job
  • STAGING_LOCATION: the Cloud Storage location to stage job artifacts
  • MAXIMUM_SLOTS: the maximum number of task slots available to your job
  • MINIMUM_SLOTS: the minimum number of task slots available to your job
  • JOB_ARGUMENTS: a list of job arguments to pass to the Apache Flink job

Depending on your job, you might need to specify the following additional parameters:

  • --class: For Java, specifies the main class of the Apache Flink job. This parameter is required if the JAR file manifest doesn't contain a main class.
  • --jars: Specifies additional JAR files for the job.
  • --python-venv: For Python, specifies the Cloud Storage location of an archived virtual environment for the job. This parameter is required for Python jobs. For more information, see Python virtual environment.

To use service account impersonation, see Use service account impersonation.

The first time you create either a deployment or an on-demand job in a project or in a subnet, the creation can take 30 minutes or more to complete. After that, it takes less time to create a new deployment or job.

Create a job in an existing deployment

Follow these steps to create a job in an existing deployment. To create a deployment, see Create and manage deployments.

To create a job by using the gcloud CLI, use the gcloud alpha managed-flink jobs create command.

gcloud alpha managed-flink jobs create FILE \
  --location=REGION \
  --project=PROJECT_ID \
  --deployment=DEPLOYMENT_ID \
  --name=JOB_NAME \
  --staging-location=STAGING_LOCATION \
  --min-parallelism=MINIMUM_SLOTS \
  --max-parallelism=MAXIMUM_SLOTS \
  -- JOB_ARGUMENTS

Replace the following:

  • FILE: the absolute path to the job file. For JAR files, you can also specify a path to an artifact stored in Artifact Registry. For more information, see Use Artifact Registry.
  • REGION: a BigQuery Engine for Apache Flink region, like us-central1
  • PROJECT_ID: your BigQuery Engine for Apache Flink project ID
  • DEPLOYMENT_ID: The name of your BigQuery Engine for Apache Flink deployment.
  • JOB_NAME: a name for the job
  • STAGING_LOCATION: the Cloud Storage location to stage job artifacts
  • MAXIMUM_SLOTS: the maximum number of task slots available to your job
  • MINIMUM_SLOTS: the minimum number of task slots available to your job
  • JOB_ARGUMENTS: a list of job arguments to pass to the Apache Flink job

Depending on your job, you might need to specify the following additional parameters:

  • --class: For Java, specifies the main class of the Apache Flink job. This parameter is required if the JAR file manifest doesn't contain a main class.
  • --jars: Specifies additional JAR files for the job.
  • --python-venv: For Python, specifies the Cloud Storage location of an archived virtual environment for the job. This parameter is required for Python jobs. For more information, see Python virtual environment.

To use service account impersonation, see Use service account impersonation.

Use Artifact Registry

For Java jobs, you can use Artifact Registry to store and manage the JAR file for the job. Using Artifact Registry lets you run the job without having the JAR file on your local machine, and enables CI/CD pipelines to submit jobs without building or downloading the JAR. You need the Artifact Registry Reader Identity and Access Management role to submit the job.

To run a job with a JAR file stored in Artifact Registry, specify the Artifact Registry path as the job file in the gcloud alpha managed-flink jobs create command. Use the following format for the Artifact Registry path:

ar://PROJECT_ID/LOCATION/REPOSITORY/FILE_PATH

Replace the following:

  • PROJECT_ID: the project ID of the Artifact Registry
  • REGION: the region for the repository
  • REPOSITORY: the name of the repository
  • ARTIFACT: the artifact name
  • FILE_PATH: the path to the JAR file; for more information, see Listing files

Example:

gcloud alpha managed-flink jobs create  \
  ar://my-project/us-central1/my-repo/com/example/word-count/1.0/word-count-1.0-20241021.203909-1.jar
  ...

For more information about using Artifact Registry to manage Java packages, see Manage Java packages.

Update a job

You can modify the autotuning settings for your jobs. For more information, see BigQuery Engine for Apache Flink autoscaling.

Get details about a job

console

To get information about a job in the Google Cloud console, follow these steps:

  1. In the Google Cloud console, go to the BigQuery Engine for Apache Flink Jobs page.

    Go to Jobs

    The Jobs page displays details of your job, including the job status.

  2. To open the Job details page, click the name of a job. On the Job details page, you can see the job graph and job metrics.

gcloud

To retrieve information about a job by using the gcloud CLI, use the gcloud alpha managed-flink jobs describe command. This command retrieves the initial job implementation and the state of the job.

gcloud alpha managed-flink jobs describe \
  JOB_ID \
  --project=PROJECT_ID \
  --location=REGION

Replace the following:

  • JOB_ID: the ID of your BigQuery Engine for Apache Flink job
  • PROJECT_ID: your BigQuery Engine for Apache Flink project ID
  • REGION: the region that the BigQuery Engine for Apache Flink job is in

List jobs

console

To see a list of jobs, in the Google Cloud console, go to the BigQuery Engine for Apache Flink Jobs page.

Go to Jobs

gcloud

To list the jobs in a project by using the gcloud CLI, use the gcloud alpha managed-flink jobs list command. This command lists all of the jobs in the region and project specified.

gcloud alpha managed-flink jobs list \
  REGION \
  --project=PROJECT_ID

Replace the following:

  • REGION: the region that the BigQuery Engine for Apache Flink jobs are in
  • PROJECT_ID: your BigQuery Engine for Apache Flink project ID

Delete jobs

gcloud

To delete a job by using the gcloud CLI, use the gcloud alpha managed-flink jobs delete command.

gcloud alpha managed-flink jobs delete \
  JOB_ID \
  --project=PROJECT_ID \
  --location=REGION

Replace the following:

  • JOB_ID: the ID of your BigQuery Engine for Apache Flink job
  • PROJECT_ID: your BigQuery Engine for Apache Flink project ID
  • REGION: the region that the BigQuery Engine for Apache Flink job is in

Limitations

  • Your Apache Flink pipelines must be compatible with Apache Flink 1.19.
  • Python pipelines must use Python version 3.11.

What's next