使用适用于 Apache Flink 的 BigQuery 引擎运行 SQL 查询
了解如何创建用于执行 Apache Flink SQL 查询的适用于 Apache Flink 的 BigQuery 引擎作业。
借助 Apache Flink SQL,您可以使用 SQL 语句定义流处理流水线。Apache Flink SQL 是与 BigQuery 使用的 GoogleSQL 不同的 SQL 方言。如需了解详情,请参阅 Apache Flink 文档中的 SQL。
准备工作
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Update and install
gcloud
components:gcloud components update
gcloud components install managed-flink-client -
Create a Google Cloud project.
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Make sure that billing is enabled for your Google Cloud project.
-
Enable the BigQuery Engine for Apache Flink APIs:
gcloud services enable managedflink.googleapis.com
compute.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/managedflink.developer
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Update and install
gcloud
components:gcloud components update
gcloud components install managed-flink-client -
Create a Google Cloud project.
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Make sure that billing is enabled for your Google Cloud project.
-
Enable the BigQuery Engine for Apache Flink APIs:
gcloud services enable managedflink.googleapis.com
compute.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/managedflink.developer
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again. 通过运行
gcloud storage buckets create
命令创建 Cloud Storage 存储桶:gcloud storage buckets create gs://BUCKET_NAME --location=US
将
BUCKET_NAME
替换为存储桶的名称。 如需了解存储桶命名要求,请参阅存储桶名称。
创建 SQL 查询文件
使用文本编辑器创建名为
create-table.sql
的新文件。将以下内容粘贴到文件中:
CREATE TABLE table1 ( UserId int, Name varchar(256) ) WITH ( 'connector' = 'filesystem', 'path' = 'gs://BUCKET_NAME/output/', 'format' = 'csv' ); INSERT INTO table1 VALUES(1, 'Alice'),(2, 'Bob'),(3, 'Charles');
将
BUCKET_NAME
替换为您的 Cloud Storage 存储分区的名称。保存文件。
此查询会创建一个由 Cloud Storage 支持的表,并将一些数据插入该表中。查询运行时,它会将表数据写入一个或多个 CSV 文件。
创建网络和子网
使用 networks create
命令在项目中创建 VPC。
gcloud compute networks create NETWORK_NAME \
--project=PROJECT_ID
替换以下内容:
NETWORK_NAME
:VPC 的名称,例如vpc-1
。PROJECT_ID
:您的项目 ID。
使用 subnets create
命令添加已启用专用 Google 访问通道的子网。
gcloud compute networks subnets create SUBNET_NAME \
--network=NETWORK_NAME \
--project=PROJECT_ID \
--range=10.0.0.0/24 \
--region=us-central1 \
--enable-private-ip-google-access
替换以下内容:
SUBNET_NAME
:子网的名称,例如subnet-1
。
创建部署
在此步骤中,您将创建一个部署,这是用于运行 Apache Flink 作业的专属独立环境。
首次在项目或子网中创建部署或按需作业时,创建过程可能需要 30 分钟或更长时间才能完成。之后,创建新的部署或作业所需的时间会缩短。
如需创建部署,请使用 gcloud alpha managed-flink deployments create
命令:
gcloud alpha managed-flink deployments create my-deployment \
--project=PROJECT_ID \
--location=us-central1 \
--network-config-vpc=NETWORK_NAME \
--network-config-subnetwork=SUBNET_NAME \
--max-slots=4
替换以下内容:
PROJECT_ID
:您的项目 ID。NETWORK_NAME
:VPC 的名称。SUBNET_NAME
:子网的名称。
虽然 default
网络具有允许部署运行作业的配置,但出于安全考虑,我们建议您为适用于 Apache Flink 的 BigQuery 引擎创建单独的网络。default
网络不安全,因为它预先填充了一些允许连接传入实例的防火墙规则。
授予服务账号权限
运行以下命令,向托管式 Flink 默认工作负载身份授予对 Cloud Storage 存储桶的读写权限:
gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
--member="serviceAccount:gmf-PROJECT_NUMBER-default@gcp-sa-managedflink-wi.iam.gserviceaccount.com" \
--role=roles/storage.objectAdmin
替换以下内容:
BUCKET_NAME
:存储桶的名称。PROJECT_NUMBER
:您的项目编号。如需查找项目编号,请参阅识别项目或使用gcloud projects describe
命令。
创建作业
在此步骤中,您将创建一个用于运行 SQL 查询的 适用于 Apache Flink 的 BigQuery Engine 作业。如需创建作业,请使用 gcloud alpha managed-flink jobs create
命令:
gcloud alpha managed-flink jobs create ./create-table.sql \
--name=my-job \
--location=us-central1 \
--deployment=my-deployment \
--project=PROJECT_ID \
--staging-location=gs://BUCKET_NAME/jobs/ \
--min-parallelism=1 \
--max-parallelism=4
替换以下内容:
PROJECT_ID
:您的项目 IDBUCKET_NAME
:Cloud Storage 存储桶的名称
在提交作业时,gcloud CLI 输出会显示操作为待处理。如果作业成功提交,gcloud CLI 输出将显示以下内容:
Create request issued for JOB_ID.
JOB_ID 的值是作业 ID,您可以使用该 ID 更新或删除作业。如需了解详情,请参阅创建和管理作业。
监控作业
在 Google Cloud 控制台中,前往“适用于 Apache Flink 的 BigQuery Engine”作业页面。
作业页面列出了可用作业,包括作业名称、作业 ID、状态和创建时间。
如需查看其他作业详情,请点击作业名称。
等待作业完成。
检查流水线输出
作业完成后,请执行以下步骤查看流水线的输出:
运行以下命令以下载流水线输出:
gsutil cp gs://BUCKET_NAME/output/* .
将
BUCKET_NAME
替换为您的 Cloud Storage 存储分区的名称。作业会创建一个前缀为
part-
的文件;例如part-4253227c-4a45-4c6e-8918-0106d95bbf86-0
。检查此文件的内容。more part-*
输出类似于以下内容:
1,Alice 2,Bob 3,Charles
清理
为避免因本页面中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的 Google Cloud 项目。
删除项目
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
后续步骤
- 了解如何创建和管理作业。
- 了解如何使用适用于 Apache Flink 的 BigQuery 引擎监控界面。