使用 Cloud Run 函数的工作流

准备工作

如果您尚未设置 Google Cloud 项目和两个(2)Cloud Storage 存储分区,请执行此操作。

设置项目

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. 确保您的 Google Cloud 项目已启用结算功能

  4. Enable the Dataproc, Compute Engine, Cloud Storage, and Cloud Run functions APIs.

    Enable the APIs

  5. Install the Google Cloud CLI.
  6. To initialize the gcloud CLI, run the following command:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  8. 确保您的 Google Cloud 项目已启用结算功能

  9. Enable the Dataproc, Compute Engine, Cloud Storage, and Cloud Run functions APIs.

    Enable the APIs

  10. Install the Google Cloud CLI.
  11. To initialize the gcloud CLI, run the following command:

    gcloud init

在项目中创建或使用两个(2)Cloud Storage 存储分区

您的项目中需要两个 Cloud Storage 存储分区:一个用于输入文件,一个用于输出。

  1. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets page

  2. Click Create bucket.
  3. On the Create a bucket page, enter your bucket information. To go to the next step, click Continue.
    • For Name your bucket, enter a name that meets the bucket naming requirements.
    • For Choose where to store your data, do the following:
      • Select a Location type option.
      • Select a Location option.
    • For Choose a default storage class for your data, select a storage class.
    • For Choose how to control access to objects, select an Access control option.
    • For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
  4. Click Create.

创建工作流模板。

在本地终端窗口或 Cloud Shell 中复制并运行下面列出的命令,以创建和定义工作流模板

注意:

  • 这些命令指定“us-central1”区域。如果您之前运行过 gcloud config set compute/region 来设置区域属性,则可以指定其他区域或删除 --region 标志。
  • “-”(破折号空格)序列将参数传递到 jar 文件。wordcount input_bucket output_dir 命令将针对 Cloud Storage input_bucket 中包含的文本文件运行 jar 的 Wordcount 应用,然后将 Wordcount 文件输出到 output_bucket。您将对 wordcount 输入存储分区参数进行参数化,以允许您的函数提供此参数。

  1. 创建工作流模板。

    gcloud dataproc workflow-templates create wordcount-template \
        --region=us-central1
    

  2. 将 wordcount 作业添加到工作流模板。
    1. 在运行命令之前指定 output-bucket-name(您的函数将提供输入存储分区)。插入输出存储分区名称后,输出存储分区参数应如下所示:gs://your-output-bucket/wordcount-output"
    2. “count”步骤 ID 是必需的,用于标识已添加的作业。

    gcloud dataproc workflow-templates add-job hadoop \
        --workflow-template=wordcount-template \
        --step-id=count \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        --region=us-central1 \
        -- wordcount gs://input-bucket gs://output-bucket-name/wordcount-output
    

  3. 使用托管单节点集群运行工作流。Dataproc 将创建集群,对其运行工作流,然后在工作流完成时删除集群。

    gcloud dataproc workflow-templates set-managed-cluster wordcount-template \
        --cluster-name=wordcount \
        --single-node \
        --region=us-central1
    

  4. 点击 Dataproc 上的 wordcount-template 名称 工作流程 页面,以打开 工作流模板详情页面。确认 wordcount 模板属性。

参数化工作流模板。

参数化输入存储分区变量以传递给工作流模板。

  1. 将工作流模板导出到 wordcount.yaml 文本文件以进行参数化。
    gcloud dataproc workflow-templates export wordcount-template \
        --destination=wordcount.yaml \
        --region=us-central1
    
  2. 使用文本编辑器打开 wordcount.yaml,然后将 parameters 块添加到 YAML 文件的末尾,以便在工作流触发时,Cloud Storage INPUT_BUCKET_URI 可以作为 args[1] 传递到字数二进制文件。

    导出的 YAML 文件示例如下所示。您可以采用以下两种方法之一来更新模板:

    1. 在将 your-output_bucket 替换为您的输出存储分区名称后,复制然后粘贴整个文件以替换导出的 wordcount.yaml,或者
    2. 复制并粘贴 parameters 部分到导出的 wordcount.yaml 文件的末尾。
    .
    jobs:
    - hadoopJob:
        args:
        - wordcount
        - gs://input-bucket
        - gs://your-output-bucket/wordcount-output
        mainJarFileUri: file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar
      stepId: count
    placement:
      managedCluster:
        clusterName: wordcount
        config:
          softwareConfig:
            properties:
              dataproc:dataproc.allow.zero.workers: 'true'
    parameters:
    - name: INPUT_BUCKET_URI
      description: wordcount input bucket URI
      fields:
      - jobs['count'].hadoopJob.args[1]
    
  3. 导入参数化的 wordcount.yaml 文本文件。当系统要求您覆盖模板时,请键入“Y”。
    gcloud dataproc workflow-templates import  wordcount-template \
        --source=wordcount.yaml \
        --region=us-central1
    

创建 Cloud Functions 函数

  1. 打开 Cloud Run functions 页面, Google Cloud 控制台,然后点击“创建函数”。

  2. 创建函数页面上,输入或选择以下信息:

    1. 名称:wordcount
    2. 分配的内存:保留默认选项。
    3. 触发器:
      • Cloud Storage
      • 事件类型:完成/创建
      • 存储分区:选择您的输入存储分区(请参阅在项目中创建 Cloud Storage 存储分区)。将文件添加到此存储分区时,该函数将触发工作流。该工作流将运行 Wordcount 应用,该应用将处理存储分区中的所有文本文件。
    4. 源代码:

      • 內嵌编辑器
      • Runtime:Node.js 8
      • INDEX.JS 标签:将默认代码段替换为以下代码,然后编辑 const projectId 行以提供 -your-project-id-(开头或结尾“-”)
      const dataproc = require('@google-cloud/dataproc').v1;
      
      exports.startWorkflow = (data) => {
       const projectId = '-your-project-id-'
       const region = 'us-central1'
       const workflowTemplate = 'wordcount-template'
      
      const client = new dataproc.WorkflowTemplateServiceClient({
         apiEndpoint: `${region}-dataproc.googleapis.com`,
      });
      
      const file = data;
      console.log("Event: ", file);
      
      const inputBucketUri = `gs://${file.bucket}/${file.name}`;
      
      const request = {
        name: client.projectRegionWorkflowTemplatePath(projectId, region, workflowTemplate),
        parameters: {"INPUT_BUCKET_URI": inputBucketUri}
      };
      
      client.instantiateWorkflowTemplate(request)
        .then(responses => {
          console.log("Launched Dataproc Workflow:", responses[1]);
        })
        .catch(err => {
          console.error(err);
        });
      };
      
      • PACKAGE.JSON 标签:将默认代码段替换为以下代码。
      {
        "name": "dataproc-workflow",
        "version": "1.0.0",
        "dependencies":{ "@google-cloud/dataproc": ">=1.0.0"}
      }
      
      • 要执行的函数:Insert:"startWorkflow"。
    5. 点击“创建”。

测试函数

  1. 将公开的文件 rose.txt 复制到您的存储分区以触发该函数。在命令中插入 your-input-bucket-name(用于触发函数的存储分区)。

    gcloud storage cp gs://pub/shakespeare/rose.txt gs://your-input-bucket-name
    

  2. 等待 30 秒,然后运行以下命令以验证函数是否已成功完成。

    gcloud functions logs read wordcount
    
    ...
    Function execution took 1348 ms, finished with status: 'ok'
    

  3. 如需从 Google Cloud 控制台中的函数列表页面查看函数日志,请点击 wordcount 函数名称,然后点击函数详情页面上的“查看日志”。

  4. 您可以在输出存储桶中查看“wordcount-output”文件夹 从 Cloud Storage 存储分区的 Storage 浏览器页面 Google Cloud 控制台。

  5. 工作流完成后,作业详情将保留在 Google Cloud 控制台中。点击 Dataproc 作业 页面上列出的 count... 作业以查看工作流作业详情。

清理

本教程中的工作流将在工作流完成时删除其托管集群。为避免重复收费,您可以删除与本教程关联的其他资源。

删除项目

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

删除 Cloud Storage 存储分区

  1. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  2. Click the checkbox for the bucket that you want to delete.
  3. To delete the bucket, click Delete, and then follow the instructions.

删除工作流模板

gcloud dataproc workflow-templates delete wordcount-template \
    --region=us-central1

删除 Cloud Functions 函数

在 Google Cloud 控制台中打开 Cloud Run 函数页面,选中 wordcount 函数左侧的框,然后点击“删除”。

后续步骤