本教程介绍了如何将 适用于 Apache Flink 的 BigQuery Engine 与 Apache Kafka 托管式服务和 BigQuery 搭配使用,以创建端到端流式传输流水线。
目标
本教程将介绍如何执行以下操作:
- 创建包含主题的 Managed Service for Apache Kafka 集群。
- 运行一个适用于 Apache Flink 的 BigQuery 引擎作业,将消息写入该主题。
- 运行第二个适用于 Apache Flink 的 BigQuery Engine 作业,该作业从主题中读取数据、处理消息数据,并将结果写入 BigQuery 表。
费用
在本文档中,您将使用 Google Cloud 的以下收费组件:
您可使用价格计算器根据您的预计使用情况来估算费用。
完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理。
准备工作
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Update and install
gcloud
components:gcloud components update
gcloud components install managed-flink-client -
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the BigQuery, Managed Service for Apache Kafka, and BigQuery Engine for Apache Flink APIs:
gcloud services enable bigquery.googleapis.com
managedflink.googleapis.com managedkafka.googleapis.com compute.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/managedflink.developer
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Update and install
gcloud
components:gcloud components update
gcloud components install managed-flink-client -
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the BigQuery, Managed Service for Apache Kafka, and BigQuery Engine for Apache Flink APIs:
gcloud services enable bigquery.googleapis.com
managedflink.googleapis.com managedkafka.googleapis.com compute.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/managedflink.developer
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again. - 下载并安装
Java 开发工具包 (JDK)。验证
JAVA_HOME
环境变量是否已设置并指向您的 JDK 安装。
构建流水线代码
克隆或下载 google/flink-connector-gcp
GitHub 代码库,然后切换到 flink-connector-gcp
目录。
git clone https://github.com/google/flink-connector-gcp.git
cd flink-connector-gcp
为示例流水线构建 JAR 文件:
./mvnw clean package
创建 Google Cloud 资源
在本部分中,您将创建本教程所需的 Google Cloud 资源,包括虚拟私有云资源、Managed Service for Apache Kafka 集群和 BigQuery 表。
创建网络和子网
本教程需要一个启用了专用 Google 访问通道的虚拟私有云子网。您可以使用项目中现有的子网,也可以按如下步骤创建新的子网:
使用
networks create
命令在项目中创建 VPC。gcloud compute networks create NETWORK_NAME \ --project=PROJECT_ID
替换以下内容:
NETWORK_NAME
:VPC 的名称,例如vpc-1
。PROJECT_ID
:您的项目 ID。
使用
subnets create
命令添加已启用专用 Google 访问通道的子网。gcloud compute networks subnets create SUBNET_NAME \ --network=NETWORK_NAME \ --project=PROJECT_ID \ --range=10.0.0.0/24 \ --region=us-central1 \ --enable-private-ip-google-access
替换以下内容:
SUBNET_NAME
:子网的名称,例如subnet-1
。
如需了解详情,请参阅指定网络和子网。
创建 Cloud Storage 存储桶
创建一个 Cloud Storage 存储桶,以用作适用于 Apache Flink 的 BigQuery 引擎作业的暂存位置。
gcloud storage buckets create gs://BUCKET_NAME --location=US
将 BUCKET_NAME
替换为存储桶的名称。如需了解存储桶命名要求,请参阅存储桶名称。
创建 Managed Service for Apache Kafka 集群
在此步骤中,您将创建一个新的 Managed Service for Apache Kafka 集群并添加一个主题。
gcloud beta managed-kafka clusters create CLUSTER \
--location=us-central1 \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/us-central1/subnetworks/SUBNET_NAME \
--async
替换以下内容:
CLUSTER
:集群的名称PROJECT_ID
:您的项目 IDSUBNET_NAME
:您要部署集群的子网
创建集群通常需要 20-30 分钟。如需监控进度,请使用Google Cloud 控制台:
创建集群后,运行以下命令以创建主题:
gcloud beta managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=us-central1 \
--partitions=10 \
--replication-factor=3
替换以下内容:
TOPIC_NAME
:要创建的主题的名称
创建 BigQuery 表
在此步骤中,您将创建一个 BigQuery 表,用于写入 Kafka 主题中的数据。
首先,运行以下命令以创建 BigQuery 数据集:
bq mk --dataset --location=us-central1 PROJECT_ID:DATASET_NAME
替换以下内容:
PROJECT_ID
:您的项目 IDDATASET_NAME
:要创建的数据集的名称
接下来,运行以下命令以创建 BigQuery 表:
bq mk --table \
PROJECT_ID:DATASET_NAME.TABLE_NAME \
word:STRING,countStr:STRING
替换以下内容:
PROJECT_ID
:您的项目 IDDATASET_NAME
:数据集的名称TABLE_NAME
:要创建的表的名称
添加 IAM 角色
向托管式 Flink 默认工作负载身份授予以下 Identity and Access Management 角色:
roles/bigquery.dataEditor
roles/managedkafka.client
roles/storage.objectAdmin
针对每个角色运行以下命令:
gcloud projects add-iam-policy-binding PROJECT_ID \
--member=serviceAccount:gmf-PROJECT_NUMBER-default@gcp-sa-managedflink-wi.iam.gserviceaccount.com \
--role=SERVICE_ACCOUNT_ROLE
替换以下内容:
PROJECT_ID
:您的项目 IDPROJECT_NUMBER
:您的项目编号。如需查找项目编号,请参阅识别项目或使用gcloud projects describe
命令。SERVICE_ACCOUNT_ROLE
:角色
这些角色可让适用于 Apache Flink 的 BigQuery 引擎访问本教程中的 Google Cloud资源。
运行写入 Managed Service for Apache Kafka 的作业
在此步骤中,您将创建一个用于将消息写入 Kafka 主题的 BigQuery Engine for Apache Flink 作业。
如需创建作业,请使用 gcloud alpha managed-flink jobs create
命令:
gcloud alpha managed-flink jobs create ./flink-examples-gcp/target/flink-examples-gcp-0.0.0-shaded.jar \
--enable-output \
--name kafka-load-job \
--location=us-central1 \
--project=PROJECT_ID \
--staging-location=gs://BUCKET_NAME \
--autotuning-mode elastic \
--min-parallelism=1 \
--max-parallelism=2 \
--network-config-vpc=NETWORK_NAME \
--network-config-subnetwork=SUBNET_NAME \
--class=flink.connector.gcp.GMKLoadGenerator \
-- --brokers bootstrap.CLUSTER.us-central1.managedkafka.PROJECT_ID.cloud.goog:9092 \
--oauth true \
--kafka-topic TOPIC_NAME \
--project-id PROJECT_ID \
--messagesPerSecond 100 \
--pattern sin
替换以下内容:
PROJECT_ID
:您的项目 IDBUCKET_NAME
:Cloud Storage 存储桶的名称。NETWORK_NAME
:集群所在的 VPCSUBNET_NAME
:集群所在的子网CLUSTER
:Managed Service for Apache Kafka 集群的名称TOPIC_NAME
:Managed Service for Apache Kafka 主题的名称
运行写入 BigQuery 的作业
在此步骤中,您将创建一个作业,该作业使用 适用于 Apache Flink 的 BigQuery 连接器从 Kafka 主题读取消息,并将数据写入 BigQuery 表。
如需创建作业,请运行以下命令:
gcloud alpha managed-flink jobs create ./flink-examples-gcp/target/flink-examples-gcp-0.0.0-shaded.jar \
--name write-to-bq \
--location=us-central1 \
--project=PROJECT_ID \
--staging-location=gs://BUCKET_NAME \
--autotuning-mode elastic \
--min-parallelism=1 \
--max-parallelism=2 \
--network-config-vpc=NETWORK_NAME \
--network-config-subnetwork=SUBNET_NAME \
--class=flink.connector.gcp.GMKToBQWordCount \
-- --brokers bootstrap.CLUSTER.us-central1.managedkafka.PROJECT_ID.cloud.goog:9092 \
--oauth true \
--kafka-topic TOPIC_NAME \
--project-id PROJECT_ID \
--dataset-name DATASET_NAME \
--table-name TABLE_NAME
替换以下内容:
PROJECT_ID
:您的项目 IDBUCKET_NAME
:Cloud Storage 存储桶的名称NETWORK_NAME
:集群所在的 VPCSUBNET_NAME
:集群所在的子网CLUSTER
:Managed Service for Apache Kafka 集群的名称TOPIC_NAME
:Managed Service for Apache Kafka 主题的名称DATASET_NAME
:BigQuery 数据集的名称TABLE_NAME
:BigQuery 表的名称
查看作业的输出结果
当这两个作业都运行时,您可以查看写入 BigQuery 的数据:
在 Google Cloud 控制台中,前往 BigQuery 页面。
在探索器面板中,展开您的项目。
展开您创建的数据集,然后选择表。
在详细信息面板中,点击预览。BigQuery 会显示表的前几行。
清理
为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。
删除项目
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
删除各个资源
- 删除这两个适用于 Apache Flink 的 BigQuery 引擎作业。
将 JOB_ID 替换为作业 ID。如需获取作业 ID,请前往适用于 Apache Flink 的 BigQuery 引擎监控界面。gcloud alpha managed-flink jobs delete JOB_ID --location=us-central1
-
删除存储分区:
gcloud storage buckets delete BUCKET_NAME
- 删除 BigQuery 数据集和表。
gcloud alpha bq datasets delete DATASET_NAME --remove-tables
- 删除 Managed Service for Apache Kafka 集群。
gcloud beta managed-kafka clusters delete CLUSTER --location=us-central1
后续步骤
- 创建和管理适用于 Apache Flink 的 BigQuery 引擎作业
- 监控 Managed Service for Apache Kafka 集群
- 探索有关 Google Cloud 的参考架构、图表和最佳做法。查看我们的 Cloud 架构中心。