使用 CLI 创建适用于 Apache Flink 的 BigQuery 引擎作业

了解如何创建适用于 Apache Flink 的 BigQuery 引擎作业、监控作业的状态以及查看结果。在本快速入门中,您可以创建使用 Java 或 Python 的作业。

准备工作

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Update and install gcloud components:

    gcloud components update
    gcloud components install managed-flink-client
  5. Create a Google Cloud project.

    gcloud projects create PROJECT_ID

    Replace PROJECT_ID with a name for the Google Cloud project you are creating.

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the BigQuery Engine for Apache Flink APIs:

    gcloud services enable managedflink.googleapis.com compute.googleapis.com
  8. Create local authentication credentials for your user account:

    gcloud auth application-default login
  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/managedflink.developer

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  10. Install the Google Cloud CLI.
  11. To initialize the gcloud CLI, run the following command:

    gcloud init
  12. Update and install gcloud components:

    gcloud components update
    gcloud components install managed-flink-client
  13. Create a Google Cloud project.

    gcloud projects create PROJECT_ID

    Replace PROJECT_ID with a name for the Google Cloud project you are creating.

  14. Make sure that billing is enabled for your Google Cloud project.

  15. Enable the BigQuery Engine for Apache Flink APIs:

    gcloud services enable managedflink.googleapis.com compute.googleapis.com
  16. Create local authentication credentials for your user account:

    gcloud auth application-default login
  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/managedflink.developer

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  18. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  19. 通过运行 gcloud storage buckets create 命令创建 Cloud Storage 存储桶:

    gcloud storage buckets create gs://BUCKET_NAME --location=US

    BUCKET_NAME 替换为存储桶的名称。 如需了解存储桶命名要求,请参阅存储桶名称

准备流水线代码

Java

  1. 下载并安装 Java Development Kit (JDK)。验证 JAVA_HOME 环境变量是否已设置并指向您的 JDK 安装。

  2. 克隆或下载 apache/flink GitHub 代码库,然后切换到 flink/flink-examples/flink-examples-streaming 目录。

    git clone https://github.com/apache/flink.git --branch release-1.19.1
    cd flink/flink-examples/flink-examples-streaming
    
  3. 为示例流水线构建 JAR 文件:

    ../../mvnw clean package
    
  4. 验证此命令是否构建了一个名为 WordCount.jar 的 JAR 文件。

    ls target/WordCount.jar
    

Python

  1. 安装 Python 3.11 和 pip

  2. 克隆或下载 apache/flink GitHub 代码库,然后切换到 flink/flink-python/pyflink/examples/table 目录。

    git clone https://github.com/apache/flink.git
    cd flink/flink-python/pyflink/examples/table
    
  3. 创建一个归档文件,用于为作业打包 Python 虚拟环境:

    python -m venv pyflink_venv
    source pyflink_venv/bin/activate
    pip install "apache-flink==1.19.0" venv-pack
    venv-pack -o pyflink_venv.tar.gz
    
  4. 将归档文件上传到 Cloud Storage:

    gcloud storage cp pyflink_venv.tar.gz gs://BUCKET_NAME/pyflink_venv.tar.gz
    

    BUCKET_NAME 替换为您的 Cloud Storage 存储分区的名称。

    如需详细了解如何打包 Python 虚拟环境,请参阅 Python 虚拟环境

创建网络和子网

使用 networks create 命令在项目中创建 VPC。

gcloud compute networks create NETWORK_NAME \
  --project=PROJECT_ID

替换以下内容:

  • NETWORK_NAME:VPC 的名称,例如 vpc-1
  • PROJECT_ID:您的项目 ID。

使用 subnets create 命令添加已启用专用 Google 访问通道的子网。

gcloud compute networks subnets create SUBNET_NAME \
    --network=NETWORK_NAME \
    --project=PROJECT_ID \
    --range=10.0.0.0/24 \
    --region=us-central1 \
    --enable-private-ip-google-access

替换以下内容:

  • SUBNET_NAME:子网的名称,例如 subnet-1

创建部署

在此步骤中,您将创建一个部署,这是用于运行 Apache Flink 作业的专属独立环境。

首次在项目或子网中创建部署或按需作业时,创建过程可能需要 30 分钟或更长时间才能完成。之后,创建新的部署或作业所需的时间会缩短。

如需创建部署,请使用 gcloud alpha managed-flink deployments create 命令:

gcloud alpha managed-flink deployments create my-deployment \
  --project=PROJECT_ID \
  --location=us-central1 \
  --network-config-vpc=NETWORK_NAME \
  --network-config-subnetwork=SUBNET_NAME \
  --max-slots=4

替换以下内容:

  • PROJECT_ID:您的项目 ID。
  • NETWORK_NAME:VPC 的名称。
  • SUBNET_NAME:子网的名称。

虽然 default 网络具有允许部署运行作业的配置,但出于安全考虑,我们建议您为适用于 Apache Flink 的 BigQuery 引擎创建单独的网络。default 网络不安全,因为它预先填充了一些允许连接传入实例的防火墙规则。

授予服务账号权限

运行以下命令,向托管式 Flink 默认工作负载身份授予对 Cloud Storage 存储桶的读写权限:

gcloud storage buckets add-iam-policy-binding gs://BUCKET_NAME \
  --member="serviceAccount:gmf-PROJECT_NUMBER-default@gcp-sa-managedflink-wi.iam.gserviceaccount.com" \
  --role=roles/storage.objectAdmin

替换以下内容:

创建作业

在此步骤中,您将创建一个用于运行示例流水线的 适用于 Apache Flink 的 BigQuery 引擎作业。如需创建作业,请使用 gcloud alpha managed-flink jobs create 命令:

Java

gcloud alpha managed-flink jobs create ./target/WordCount.jar \
--name=my-job \
--location=us-central1 \
--deployment=my-deployment \
--project=PROJECT_ID \
--staging-location=gs://BUCKET_NAME/jobs/ \
--min-parallelism=1 \
--max-parallelism=4 \
-- --output gs://BUCKET_NAME/

替换以下内容:

  • PROJECT_ID:您的项目 ID
  • BUCKET_NAME:Cloud Storage 存储桶的名称。

-- 选项用于指定流水线的命令行参数,这些参数由流水线代码定义。在 WordCount 示例中,output 指定了写入输出的位置。

Python

gcloud alpha managed-flink jobs create  ${PWD}/word_count.py \
--name=word-count \
--location=us-central1 \
--deployment=my-deployment \
--project=PROJECT_ID \
--staging-location=gs://BUCKET_NAME/jobs/ \
--min-parallelism=1 \
--max-parallelism=4 \
--python-venv=gs://BUCKET_NAME/pyflink_venv.tar.gz \
-- --output gs://BUCKET_NAME/

替换以下内容:

  • PROJECT_ID:您的项目 ID
  • BUCKET_NAME:Cloud Storage 存储桶的名称。

-- 选项用于指定流水线的命令行参数,这些参数由流水线代码定义。在此示例中,--output 指定了写入输出的位置。

--python-venv 选项用于指定 Python 虚拟环境归档文件的 Cloud Storage 位置。

在提交作业时,gcloud CLI 输出会显示操作为待处理。如果作业成功提交,gcloud CLI 输出将显示以下内容:

Create request issued for JOB_ID.

JOB_ID 的值是作业 ID,可用于更新或删除作业。如需了解详情,请参阅创建和管理作业

监控作业

  1. 在 Google Cloud 控制台中,前往“适用于 Apache Flink 的 BigQuery Engine 作业”页面。

    转到作业

    作业页面列出了可用作业,包括作业名称、作业 ID、状态和创建时间。

  2. 如需查看其他作业详情,请点击作业名称。

  3. 等待作业完成。作业完成后,作业状态为 Finished

检查流水线输出

作业完成后,请执行以下步骤查看流水线的输出:

  1. 在 Google Cloud 控制台中,前往 Cloud Storage 存储分区页面。

    进入“存储桶”

  2. 在存储桶列表中,点击您在准备工作中创建的存储桶的名称。此时会打开“存储分区详情”页面,其中“对象”标签页已选中。

  3. 流水线会创建一个命名模式为 YYYYY-MM-DD--HH 的文件夹。点击相应文件夹名称。

  4. 如果流水线成功运行,该文件夹中会包含前缀为 part- 的文件;例如 part-4253227c-4a45-4c6e-8918-0106d95bbf86-0。点击此文件。

  5. 对象详情页面中,点击经过身份验证的网址以查看输出文件的内容。输出类似于以下内容:

    Java

    (to,1)
    (be,1)
    (or,1)
    (not,1)
    (to,2)
    (be,2)
    (that,1)
    [....]
    

    Python

    {"data":[{"word":"To","count":1}],"type":"INSERT"}
    {"data":[{"word":"be,","count":1}],"type":"INSERT"}
    {"data":[{"word":"or","count":1}],"type":"INSERT"}
    [....]
    

清理

为避免因本页面中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的 Google Cloud 项目。

删除项目

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

后续步骤