使用 CLI 创建适用于 Apache Flink 的 BigQuery 引擎作业
了解如何创建适用于 Apache Flink 的 BigQuery 引擎作业、监控作业的状态以及查看结果。在本快速入门中,您可以创建使用 Java 或 Python 的作业。
准备工作
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
After initializing the gcloud CLI, update it and install the required components:
gcloud components update gcloud components install managed-flink-client
-
Create a Google Cloud project.
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Verify that billing is enabled for your Google Cloud project.
-
Enable the BigQuery Engine for Apache Flink APIs:
gcloud services enable managedflink.googleapis.com
compute.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/managedflink.developer
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID
: your project ID.USER_IDENTIFIER
: the identifier for your user account—for example,myemail@example.com
.ROLE
: the IAM role that you grant to your user account.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
After initializing the gcloud CLI, update it and install the required components:
gcloud components update gcloud components install managed-flink-client
-
Create a Google Cloud project.
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Verify that billing is enabled for your Google Cloud project.
-
Enable the BigQuery Engine for Apache Flink APIs:
gcloud services enable managedflink.googleapis.com
compute.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/managedflink.developer
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID
: your project ID.USER_IDENTIFIER
: the identifier for your user account—for example,myemail@example.com
.ROLE
: the IAM role that you grant to your user account.
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again. 通过运行
gcloud storage buckets create
命令创建 Cloud Storage 存储桶:gcloud storage buckets create gs://BUCKET_NAME --location=US
将
BUCKET_NAME
替换为存储桶的名称。 如需了解存储桶命名要求,请参阅存储桶名称。
准备流水线代码
Java
下载并安装 Java Development Kit (JDK)。验证
JAVA_HOME
环境变量是否已设置并指向您的 JDK 安装。克隆或下载
apache/flink
GitHub 代码库,然后切换到flink/flink-examples/flink-examples-streaming
目录。git clone https://github.com/apache/flink.git --branch release-1.19.1 cd flink/flink-examples/flink-examples-streaming
为示例流水线构建 JAR 文件:
../../mvnw clean package
验证此命令是否构建了一个名为
WordCount.jar
的 JAR 文件。ls target/WordCount.jar
Python
安装 Python 3.11 和
pip
。克隆或下载
apache/flink
GitHub 代码库,然后切换到flink/flink-python/pyflink/examples/table
目录。git clone https://github.com/apache/flink.git cd flink/flink-python/pyflink/examples/table
创建一个归档文件,用于为作业打包 Python 虚拟环境:
python -m venv pyflink_venv source pyflink_venv/bin/activate pip install "apache-flink==1.19.0" venv-pack venv-pack -o pyflink_venv.tar.gz
将归档文件上传到 Cloud Storage:
gcloud storage cp pyflink_venv.tar.gz gs://BUCKET_NAME/pyflink_venv.tar.gz
将
BUCKET_NAME
替换为您的 Cloud Storage 存储分区的名称。如需详细了解如何打包 Python 虚拟环境,请参阅 Python 虚拟环境。
创建网络和子网
使用 networks create
命令在项目中创建 VPC。
gcloud compute networks create NETWORK_NAME \
--project=PROJECT_ID
替换以下内容:
NETWORK_NAME
:VPC 的名称,例如vpc-1
。PROJECT_ID
:您的项目 ID。
使用 subnets create
命令添加已启用专用 Google 访问通道的子网。
gcloud compute networks subnets create SUBNET_NAME \
--network=NETWORK_NAME \
--project=PROJECT_ID \
--range=10.0.0.0/24 \
--region=us-central1 \
--enable-private-ip-google-access
替换以下内容:
SUBNET_NAME
:子网的名称,例如subnet-1
。
创建部署
在此步骤中,您将创建一个部署,这是用于运行 Apache Flink 作业的专属独立环境。
首次在项目或子网中创建部署或按需作业时,创建过程可能需要 30 分钟或更长时间才能完成。之后,创建新的部署或作业所需的时间会缩短。
如需创建部署,请使用 gcloud alpha managed-flink deployments create
命令:
gcloud alpha managed-flink deployments create my-deployment \
--project=PROJECT_ID \
--location=us-central1 \
--network-config-vpc=NETWORK_NAME \
--network-config-subnetwork=SUBNET_NAME \
--max-slots=4
替换以下内容:
PROJECT_ID
:您的项目 ID。NETWORK_NAME
:VPC 的名称。SUBNET_NAME
:子网的名称。
虽然 default
网络具有允许部署运行作业的配置,但出于安全考虑,我们建议您为适用于 Apache Flink 的 BigQuery 引擎创建单独的网络。default
网络不安全,因为它预先填充了一些允许连接传入实例的防火墙规则。
授予服务账号权限
运行以下命令,向托管式 Flink 默认工作负载身份授予对 Cloud Storage 存储桶的读写权限:
gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
--member="serviceAccount:gmf-PROJECT_NUMBER-default@gcp-sa-managedflink-wi.iam.gserviceaccount.com" \
--role=roles/storage.objectAdmin
替换以下内容:
BUCKET_NAME
:存储桶的名称。PROJECT_NUMBER
:您的项目编号。如需查找项目编号,请参阅识别项目或使用gcloud projects describe
命令。
创建作业
在此步骤中,您将创建一个用于运行示例流水线的 适用于 Apache Flink 的 BigQuery 引擎作业。如需创建作业,请使用 gcloud alpha managed-flink jobs create
命令:
Java
gcloud alpha managed-flink jobs create ./target/WordCount.jar \
--name=my-job \
--location=us-central1 \
--deployment=my-deployment \
--project=PROJECT_ID \
--staging-location=gs://BUCKET_NAME/jobs/ \
--min-parallelism=1 \
--max-parallelism=4 \
-- --output gs://BUCKET_NAME/
替换以下内容:
PROJECT_ID
:您的项目 IDBUCKET_NAME
:Cloud Storage 存储桶的名称。
--
选项用于指定流水线的命令行参数,这些参数由流水线代码定义。在 WordCount
示例中,output
指定了写入输出的位置。
Python
gcloud alpha managed-flink jobs create ${PWD}/word_count.py \
--name=word-count \
--location=us-central1 \
--deployment=my-deployment \
--project=PROJECT_ID \
--staging-location=gs://BUCKET_NAME/jobs/ \
--min-parallelism=1 \
--max-parallelism=4 \
--python-venv=gs://BUCKET_NAME/pyflink_venv.tar.gz \
-- --output gs://BUCKET_NAME/
替换以下内容:
PROJECT_ID
:您的项目 IDBUCKET_NAME
:Cloud Storage 存储桶的名称。
--
选项用于指定流水线的命令行参数,这些参数由流水线代码定义。在此示例中,--output
指定了写入输出的位置。
--python-venv
选项用于指定 Python 虚拟环境归档文件的 Cloud Storage 位置。
在提交作业时,gcloud CLI 输出会显示操作为待处理。如果作业成功提交,gcloud CLI 输出将显示以下内容:
Create request issued for JOB_ID.
JOB_ID 的值是作业 ID,可用于更新或删除作业。如需了解详情,请参阅创建和管理作业。
监控作业
在 Google Cloud 控制台中,前往“适用于 Apache Flink 的 BigQuery Engine 作业”页面。
作业页面列出了可用作业,包括作业名称、作业 ID、状态和创建时间。
如需查看其他作业详情,请点击作业名称。
等待作业完成。作业完成后,作业状态为
Finished
。
检查流水线输出
作业完成后,请执行以下步骤查看流水线的输出:
在 Google Cloud 控制台中,前往 Cloud Storage 存储分区页面。
在存储桶列表中,点击您在准备工作中创建的存储桶的名称。此时会打开“存储分区详情”页面,其中“对象”标签页已选中。
流水线会创建一个命名模式为
YYYYY-MM-DD--HH
的文件夹。点击相应文件夹名称。如果流水线成功运行,该文件夹中会包含前缀为
part-
的文件;例如part-4253227c-4a45-4c6e-8918-0106d95bbf86-0
。点击此文件。在对象详情页面中,点击经过身份验证的网址以查看输出文件的内容。输出类似于以下内容:
Java
(to,1) (be,1) (or,1) (not,1) (to,2) (be,2) (that,1) [....]
Python
{"data":[{"word":"To","count":1}],"type":"INSERT"} {"data":[{"word":"be,","count":1}],"type":"INSERT"} {"data":[{"word":"or","count":1}],"type":"INSERT"} [....]
清理
为避免因本页面中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的 Google Cloud 项目。
删除项目
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
后续步骤
- 了解如何创建和管理作业。
- 了解如何使用适用于 Apache Flink 的 BigQuery 引擎监控界面。