使用 CLI 创建适用于 Apache Flink 的 BigQuery 引擎作业
了解如何创建适用于 Apache Flink 的 BigQuery 引擎作业、监控作业的状态以及查看结果。在本快速入门中,您可以创建使用 Java 或 Python 的作业。
准备工作
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Update and install
gcloud
components:gcloud components update
gcloud components install managed-flink-client -
Create a Google Cloud project.
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Make sure that billing is enabled for your Google Cloud project.
-
Enable the BigQuery Engine for Apache Flink APIs:
gcloud services enable managedflink.googleapis.com
compute.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/managedflink.developer
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Update and install
gcloud
components:gcloud components update
gcloud components install managed-flink-client -
Create a Google Cloud project.
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Make sure that billing is enabled for your Google Cloud project.
-
Enable the BigQuery Engine for Apache Flink APIs:
gcloud services enable managedflink.googleapis.com
compute.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/managedflink.developer
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again. 通过运行
gcloud storage buckets create
命令创建 Cloud Storage 存储桶:gcloud storage buckets create gs://BUCKET_NAME --location=US
将
BUCKET_NAME
替换为存储桶的名称。 如需了解存储桶命名要求,请参阅存储桶名称。
准备流水线代码
Java
下载并安装 Java Development Kit (JDK)。验证
JAVA_HOME
环境变量是否已设置并指向您的 JDK 安装。克隆或下载
apache/flink
GitHub 代码库,然后切换到flink/flink-examples/flink-examples-streaming
目录。git clone https://github.com/apache/flink.git --branch release-1.19.1 cd flink/flink-examples/flink-examples-streaming
为示例流水线构建 JAR 文件:
../../mvnw clean package
验证此命令是否构建了一个名为
WordCount.jar
的 JAR 文件。ls target/WordCount.jar
Python
安装 Python 3.11 和
pip
。克隆或下载
apache/flink
GitHub 代码库,然后切换到flink/flink-python/pyflink/examples/table
目录。git clone https://github.com/apache/flink.git cd flink/flink-python/pyflink/examples/table
创建一个归档文件,用于为作业打包 Python 虚拟环境:
python -m venv pyflink_venv source pyflink_venv/bin/activate pip install "apache-flink==1.19.0" venv-pack venv-pack -o pyflink_venv.tar.gz
将归档文件上传到 Cloud Storage:
gcloud storage cp pyflink_venv.tar.gz gs://BUCKET_NAME/pyflink_venv.tar.gz
将
BUCKET_NAME
替换为您的 Cloud Storage 存储分区的名称。如需详细了解如何打包 Python 虚拟环境,请参阅 Python 虚拟环境。
创建网络和子网
使用 networks create
命令在项目中创建 VPC。
gcloud compute networks create NETWORK_NAME \
--project=PROJECT_ID
替换以下内容:
NETWORK_NAME
:VPC 的名称,例如vpc-1
。PROJECT_ID
:您的项目 ID。
使用 subnets create
命令添加已启用专用 Google 访问通道的子网。
gcloud compute networks subnets create SUBNET_NAME \
--network=NETWORK_NAME \
--project=PROJECT_ID \
--range=10.0.0.0/24 \
--region=us-central1 \
--enable-private-ip-google-access
替换以下内容:
SUBNET_NAME
:子网的名称,例如subnet-1
。
创建部署
在此步骤中,您将创建一个部署,这是用于运行 Apache Flink 作业的专属独立环境。
首次在项目或子网中创建部署或按需作业时,创建过程可能需要 30 分钟或更长时间才能完成。之后,创建新的部署或作业所需的时间会缩短。
如需创建部署,请使用 gcloud alpha managed-flink deployments create
命令:
gcloud alpha managed-flink deployments create my-deployment \
--project=PROJECT_ID \
--location=us-central1 \
--network-config-vpc=NETWORK_NAME \
--network-config-subnetwork=SUBNET_NAME \
--max-slots=4
替换以下内容:
PROJECT_ID
:您的项目 ID。NETWORK_NAME
:VPC 的名称。SUBNET_NAME
:子网的名称。
虽然 default
网络具有允许部署运行作业的配置,但出于安全考虑,我们建议您为适用于 Apache Flink 的 BigQuery 引擎创建单独的网络。default
网络不安全,因为它预先填充了一些允许连接传入实例的防火墙规则。
授予服务账号权限
运行以下命令,向托管式 Flink 默认工作负载身份授予对 Cloud Storage 存储桶的读写权限:
gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
--member="serviceAccount:gmf-PROJECT_NUMBER-default@gcp-sa-managedflink-wi.iam.gserviceaccount.com" \
--role=roles/storage.objectAdmin
替换以下内容:
BUCKET_NAME
:存储桶的名称。PROJECT_NUMBER
:您的项目编号。如需查找项目编号,请参阅识别项目或使用gcloud projects describe
命令。
创建作业
在此步骤中,您将创建一个用于运行示例流水线的 适用于 Apache Flink 的 BigQuery 引擎作业。如需创建作业,请使用 gcloud alpha managed-flink jobs create
命令:
Java
gcloud alpha managed-flink jobs create ./target/WordCount.jar \
--name=my-job \
--location=us-central1 \
--deployment=my-deployment \
--project=PROJECT_ID \
--staging-location=gs://BUCKET_NAME/jobs/ \
--min-parallelism=1 \
--max-parallelism=4 \
-- --output gs://BUCKET_NAME/
替换以下内容:
PROJECT_ID
:您的项目 IDBUCKET_NAME
:Cloud Storage 存储桶的名称。
--
选项用于指定流水线的命令行参数,这些参数由流水线代码定义。在 WordCount
示例中,output
指定了写入输出的位置。
Python
gcloud alpha managed-flink jobs create ${PWD}/word_count.py \
--name=word-count \
--location=us-central1 \
--deployment=my-deployment \
--project=PROJECT_ID \
--staging-location=gs://BUCKET_NAME/jobs/ \
--min-parallelism=1 \
--max-parallelism=4 \
--python-venv=gs://BUCKET_NAME/pyflink_venv.tar.gz \
-- --output gs://BUCKET_NAME/
替换以下内容:
PROJECT_ID
:您的项目 IDBUCKET_NAME
:Cloud Storage 存储桶的名称。
--
选项用于指定流水线的命令行参数,这些参数由流水线代码定义。在此示例中,--output
指定了写入输出的位置。
--python-venv
选项用于指定 Python 虚拟环境归档文件的 Cloud Storage 位置。
在提交作业时,gcloud CLI 输出会显示操作为待处理。如果作业成功提交,gcloud CLI 输出将显示以下内容:
Create request issued for JOB_ID.
JOB_ID 的值是作业 ID,可用于更新或删除作业。如需了解详情,请参阅创建和管理作业。
监控作业
在 Google Cloud 控制台中,前往“适用于 Apache Flink 的 BigQuery Engine 作业”页面。
作业页面列出了可用作业,包括作业名称、作业 ID、状态和创建时间。
如需查看其他作业详情,请点击作业名称。
等待作业完成。作业完成后,作业状态为
Finished
。
检查流水线输出
作业完成后,请执行以下步骤查看流水线的输出:
在 Google Cloud 控制台中,前往 Cloud Storage 存储分区页面。
在存储桶列表中,点击您在准备工作中创建的存储桶的名称。此时会打开“存储分区详情”页面,其中“对象”标签页已选中。
流水线会创建一个命名模式为
YYYYY-MM-DD--HH
的文件夹。点击相应文件夹名称。如果流水线成功运行,该文件夹中会包含前缀为
part-
的文件;例如part-4253227c-4a45-4c6e-8918-0106d95bbf86-0
。点击此文件。在对象详情页面中,点击经过身份验证的网址以查看输出文件的内容。输出类似于以下内容:
Java
(to,1) (be,1) (or,1) (not,1) (to,2) (be,2) (that,1) [....]
Python
{"data":[{"word":"To","count":1}],"type":"INSERT"} {"data":[{"word":"be,","count":1}],"type":"INSERT"} {"data":[{"word":"or","count":1}],"type":"INSERT"} [....]
清理
为避免因本页面中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的 Google Cloud 项目。
删除项目
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
后续步骤
- 了解如何创建和管理作业。
- 了解如何使用适用于 Apache Flink 的 BigQuery 引擎监控界面。