处理来自 Managed Service for Apache Kafka 的实时数据


本教程介绍了如何将 适用于 Apache Flink 的 BigQuery EngineApache Kafka 托管式服务BigQuery 搭配使用,以创建端到端流式传输流水线。

目标

本教程将介绍如何执行以下操作:

  • 创建包含主题的 Managed Service for Apache Kafka 集群。
  • 运行一个适用于 Apache Flink 的 BigQuery 引擎作业,将消息写入该主题。
  • 运行第二个适用于 Apache Flink 的 BigQuery Engine 作业,该作业从主题中读取数据、处理消息数据,并将结果写入 BigQuery 表。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理

准备工作

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Update and install gcloud components:

    gcloud components update
    gcloud components install managed-flink-client
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the BigQuery, Managed Service for Apache Kafka, and BigQuery Engine for Apache Flink APIs:

    gcloud services enable bigquery.googleapis.com  managedflink.googleapis.com managedkafka.googleapis.com compute.googleapis.com
  8. Create local authentication credentials for your user account:

    gcloud auth application-default login
  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/managedflink.developer

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  10. Install the Google Cloud CLI.
  11. To initialize the gcloud CLI, run the following command:

    gcloud init
  12. Update and install gcloud components:

    gcloud components update
    gcloud components install managed-flink-client
  13. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Make sure that billing is enabled for your Google Cloud project.

  15. Enable the BigQuery, Managed Service for Apache Kafka, and BigQuery Engine for Apache Flink APIs:

    gcloud services enable bigquery.googleapis.com  managedflink.googleapis.com managedkafka.googleapis.com compute.googleapis.com
  16. Create local authentication credentials for your user account:

    gcloud auth application-default login
  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/managedflink.developer

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  18. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  19. 下载并安装 Java 开发工具包 (JDK)。验证 JAVA_HOME 环境变量是否已设置并指向您的 JDK 安装。

构建流水线代码

克隆或下载 google/flink-connector-gcp GitHub 代码库,然后切换到 flink-connector-gcp 目录。

git clone https://github.com/google/flink-connector-gcp.git
cd flink-connector-gcp

为示例流水线构建 JAR 文件:

./mvnw clean package

创建 Google Cloud 资源

在本部分中,您将创建本教程所需的 Google Cloud 资源,包括虚拟私有云资源、Managed Service for Apache Kafka 集群和 BigQuery 表。

创建网络和子网

本教程需要一个启用了专用 Google 访问通道的虚拟私有云子网。您可以使用项目中现有的子网,也可以按如下步骤创建新的子网:

  1. 使用 networks create 命令在项目中创建 VPC。

    gcloud compute networks create NETWORK_NAME \
      --project=PROJECT_ID
    

    替换以下内容:

    • NETWORK_NAME:VPC 的名称,例如 vpc-1
    • PROJECT_ID:您的项目 ID。
  2. 使用 subnets create 命令添加已启用专用 Google 访问通道的子网。

    gcloud compute networks subnets create SUBNET_NAME \
        --network=NETWORK_NAME \
        --project=PROJECT_ID \
        --range=10.0.0.0/24 \
        --region=us-central1 \
        --enable-private-ip-google-access
    

    替换以下内容:

    • SUBNET_NAME:子网的名称,例如 subnet-1

如需了解详情,请参阅指定网络和子网

创建 Cloud Storage 存储桶

创建一个 Cloud Storage 存储桶,以用作适用于 Apache Flink 的 BigQuery 引擎作业的暂存位置。

gcloud storage buckets create gs://BUCKET_NAME --location=US

BUCKET_NAME 替换为存储桶的名称。如需了解存储桶命名要求,请参阅存储桶名称

创建 Managed Service for Apache Kafka 集群

在此步骤中,您将创建一个新的 Managed Service for Apache Kafka 集群并添加一个主题。

gcloud beta managed-kafka clusters create CLUSTER \
--location=us-central1 \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/us-central1/subnetworks/SUBNET_NAME \
--async

替换以下内容:

  • CLUSTER:集群的名称
  • PROJECT_ID:您的项目 ID
  • SUBNET_NAME:您要部署集群的子网

创建集群通常需要 20-30 分钟。如需监控进度,请使用Google Cloud 控制台:

前往“集群”

创建集群后,运行以下命令以创建主题:

gcloud beta managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=us-central1 \
--partitions=10 \
--replication-factor=3

替换以下内容:

  • TOPIC_NAME:要创建的主题的名称

创建 BigQuery 表

在此步骤中,您将创建一个 BigQuery 表,用于写入 Kafka 主题中的数据。

首先,运行以下命令以创建 BigQuery 数据集:

bq mk --dataset --location=us-central1 PROJECT_ID:DATASET_NAME

替换以下内容:

  • PROJECT_ID:您的项目 ID
  • DATASET_NAME:要创建的数据集的名称

接下来,运行以下命令以创建 BigQuery 表:

bq mk --table \
  PROJECT_ID:DATASET_NAME.TABLE_NAME \
  word:STRING,countStr:STRING

替换以下内容:

  • PROJECT_ID:您的项目 ID
  • DATASET_NAME:数据集的名称
  • TABLE_NAME:要创建的表的名称

添加 IAM 角色

托管式 Flink 默认工作负载身份授予以下 Identity and Access Management 角色:

  • roles/bigquery.dataEditor
  • roles/managedkafka.client
  • roles/storage.objectAdmin

针对每个角色运行以下命令:

gcloud projects add-iam-policy-binding PROJECT_ID \
    --member=serviceAccount:gmf-PROJECT_NUMBER-default@gcp-sa-managedflink-wi.iam.gserviceaccount.com \
    --role=SERVICE_ACCOUNT_ROLE

替换以下内容:

  • PROJECT_ID:您的项目 ID
  • PROJECT_NUMBER:您的项目编号。如需查找项目编号,请参阅识别项目或使用 gcloud projects describe 命令。
  • SERVICE_ACCOUNT_ROLE:角色

这些角色可让适用于 Apache Flink 的 BigQuery 引擎访问本教程中的 Google Cloud资源。

运行写入 Managed Service for Apache Kafka 的作业

在此步骤中,您将创建一个用于将消息写入 Kafka 主题的 BigQuery Engine for Apache Flink 作业

如需创建作业,请使用 gcloud alpha managed-flink jobs create 命令:

gcloud alpha managed-flink jobs create ./flink-examples-gcp/target/flink-examples-gcp-0.0.0-shaded.jar  \
--enable-output \
--name kafka-load-job \
--location=us-central1  \
--project=PROJECT_ID \
--staging-location=gs://BUCKET_NAME \
--autotuning-mode elastic \
--min-parallelism=1  \
--max-parallelism=2  \
--network-config-vpc=NETWORK_NAME \
--network-config-subnetwork=SUBNET_NAME  \
--class=flink.connector.gcp.GMKLoadGenerator \
-- --brokers bootstrap.CLUSTER.us-central1.managedkafka.PROJECT_ID.cloud.goog:9092  \
--oauth true  \
--kafka-topic TOPIC_NAME \
--project-id PROJECT_ID \
--messagesPerSecond 100  \
--pattern sin

替换以下内容:

  • PROJECT_ID:您的项目 ID
  • BUCKET_NAME:Cloud Storage 存储桶的名称。
  • NETWORK_NAME:集群所在的 VPC
  • SUBNET_NAME:集群所在的子网
  • CLUSTER:Managed Service for Apache Kafka 集群的名称
  • TOPIC_NAME:Managed Service for Apache Kafka 主题的名称

运行写入 BigQuery 的作业

在此步骤中,您将创建一个作业,该作业使用 适用于 Apache Flink 的 BigQuery 连接器从 Kafka 主题读取消息,并将数据写入 BigQuery 表。

如需创建作业,请运行以下命令:

gcloud alpha managed-flink jobs create ./flink-examples-gcp/target/flink-examples-gcp-0.0.0-shaded.jar \
--name write-to-bq \
--location=us-central1 \
--project=PROJECT_ID \
--staging-location=gs://BUCKET_NAME \
--autotuning-mode elastic \
--min-parallelism=1 \
--max-parallelism=2 \
--network-config-vpc=NETWORK_NAME \
--network-config-subnetwork=SUBNET_NAME \
--class=flink.connector.gcp.GMKToBQWordCount \
-- --brokers bootstrap.CLUSTER.us-central1.managedkafka.PROJECT_ID.cloud.goog:9092  \
--oauth true \
--kafka-topic TOPIC_NAME \
--project-id PROJECT_ID \
--dataset-name DATASET_NAME  \
--table-name TABLE_NAME

替换以下内容:

  • PROJECT_ID:您的项目 ID
  • BUCKET_NAME:Cloud Storage 存储桶的名称
  • NETWORK_NAME:集群所在的 VPC
  • SUBNET_NAME:集群所在的子网
  • CLUSTER:Managed Service for Apache Kafka 集群的名称
  • TOPIC_NAME:Managed Service for Apache Kafka 主题的名称
  • DATASET_NAME:BigQuery 数据集的名称
  • TABLE_NAME:BigQuery 表的名称

查看作业的输出结果

当这两个作业都运行时,您可以查看写入 BigQuery 的数据:

  1. 在 Google Cloud 控制台中,前往 BigQuery 页面。

    转到 BigQuery

  2. 探索器面板中,展开您的项目。

  3. 展开您创建的数据集,然后选择表。

  4. 在详细信息面板中,点击预览。BigQuery 会显示表的前几行。

清理

为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除项目

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

删除各个资源

  1. 删除这两个适用于 Apache Flink 的 BigQuery 引擎作业。
    gcloud alpha managed-flink jobs delete JOB_ID --location=us-central1
    JOB_ID 替换为作业 ID。如需获取作业 ID,请前往适用于 Apache Flink 的 BigQuery 引擎监控界面
  2. 删除存储分区:
    gcloud storage buckets delete BUCKET_NAME
  3. 删除 BigQuery 数据集和表。
    gcloud alpha bq datasets delete DATASET_NAME --remove-tables
  4. 删除 Managed Service for Apache Kafka 集群。
    gcloud beta managed-kafka clusters delete CLUSTER --location=us-central1

后续步骤