使用适用于 Apache Flink 的 BigQuery 引擎运行 SQL 查询

了解如何创建用于执行 Apache Flink SQL 查询的适用于 Apache Flink 的 BigQuery 引擎作业。

借助 Apache Flink SQL,您可以使用 SQL 语句定义流处理流水线。Apache Flink SQL 是与 BigQuery 使用的 GoogleSQL 不同的 SQL 方言。如需了解详情,请参阅 Apache Flink 文档中的 SQL


  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Update and install gcloud components:

    gcloud components update
    gcloud components install managed-flink-client
  5. Create a Google Cloud project.

    gcloud projects create PROJECT_ID

    Replace PROJECT_ID with a name for the Google Cloud project you are creating.

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the BigQuery Engine for Apache Flink APIs:

    gcloud services enable managedflink.googleapis.com compute.googleapis.com
  8. Create local authentication credentials for your user account:

    gcloud auth application-default login
  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/managedflink.developer

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  18. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  19. 通过运行 gcloud storage buckets create 命令创建 Cloud Storage 存储桶:

    gcloud storage buckets create gs://BUCKET_NAME --location=US

    BUCKET_NAME 替换为存储桶的名称。 如需了解存储桶命名要求,请参阅存储桶名称

创建 SQL 查询文件

  1. 使用文本编辑器创建名为 create-table.sql 的新文件。

  2. 将以下内容粘贴到文件中:

    CREATE TABLE table1 (
       UserId int,
       Name varchar(256)
    ) WITH (
       'connector' = 'filesystem',
       'path' = 'gs://BUCKET_NAME/output/',
       'format' = 'csv'
    INSERT INTO table1 VALUES(1, 'Alice'),(2, 'Bob'),(3, 'Charles');

    BUCKET_NAME 替换为您的 Cloud Storage 存储分区的名称。

  3. 保存文件。

此查询会创建一个由 Cloud Storage 支持的表,并将一些数据插入该表中。查询运行时,它会将表数据写入一个或多个 CSV 文件。


使用 networks create 命令在项目中创建 VPC。

gcloud compute networks create NETWORK_NAME \


  • NETWORK_NAME:VPC 的名称,例如 vpc-1
  • PROJECT_ID:您的项目 ID。

使用 subnets create 命令添加已启用专用 Google 访问通道的子网。

gcloud compute networks subnets create SUBNET_NAME \
    --network=NETWORK_NAME \
    --project=PROJECT_ID \
    --range= \
    --region=us-central1 \


  • SUBNET_NAME:子网的名称,例如 subnet-1


在此步骤中,您将创建一个部署,这是用于运行 Apache Flink 作业的专属独立环境。

首次在项目或子网中创建部署或按需作业时,创建过程可能需要 30 分钟或更长时间才能完成。之后,创建新的部署或作业所需的时间会缩短。

如需创建部署,请使用 gcloud alpha managed-flink deployments create 命令:

gcloud alpha managed-flink deployments create my-deployment \
  --project=PROJECT_ID \
  --location=us-central1 \
  --network-config-vpc=NETWORK_NAME \
  --network-config-subnetwork=SUBNET_NAME \


  • PROJECT_ID:您的项目 ID。
  • SUBNET_NAME:子网的名称。

虽然 default 网络具有允许部署运行作业的配置,但出于安全考虑,我们建议您为适用于 Apache Flink 的 BigQuery 引擎创建单独的网络。default 网络不安全,因为它预先填充了一些允许连接传入实例的防火墙规则。


运行以下命令,向托管式 Flink 默认工作负载身份授予对 Cloud Storage 存储桶的读写权限:

gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
  --member="serviceAccount:gmf-PROJECT_NUMBER-default@gcp-sa-managedflink-wi.iam.gserviceaccount.com" \



在此步骤中,您将创建一个用于运行 SQL 查询的 适用于 Apache Flink 的 BigQuery Engine 作业。如需创建作业,请使用 gcloud alpha managed-flink jobs create 命令:

gcloud alpha managed-flink jobs create ./create-table.sql \
  --name=my-job \
  --location=us-central1 \
  --deployment=my-deployment \
  --project=PROJECT_ID \
  --staging-location=gs://BUCKET_NAME/jobs/ \
  --min-parallelism=1 \


  • PROJECT_ID:您的项目 ID
  • BUCKET_NAME:Cloud Storage 存储桶的名称

在提交作业时,gcloud CLI 输出会显示操作为待处理。如果作业成功提交,gcloud CLI 输出将显示以下内容:

Create request issued for JOB_ID.

JOB_ID 的值是作业 ID,您可以使用该 ID 更新或删除作业。如需了解详情,请参阅创建和管理作业


  1. 在 Google Cloud 控制台中,前往“适用于 Apache Flink 的 BigQuery Engine”作业页面。


    作业页面列出了可用作业,包括作业名称、作业 ID、状态和创建时间。

  2. 如需查看其他作业详情,请点击作业名称。

  3. 等待作业完成。



  1. 运行以下命令以下载流水线输出:

    gsutil cp gs://BUCKET_NAME/output/* .

    BUCKET_NAME 替换为您的 Cloud Storage 存储分区的名称。

  2. 作业会创建一个前缀为 part- 的文件;例如 part-4253227c-4a45-4c6e-8918-0106d95bbf86-0。检查此文件的内容。

    more part-*




为避免因本页面中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的 Google Cloud 项目。


  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.
