使用 Cloud Composer 的工作流

本教程使用 Google Cloud 的以下收费组件:

  • Dataproc
  • Compute Engine
  • Cloud Composer

您可使用价格计算器根据您的预计使用量来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

设置项目

  1. 登录您的 Google 帐号。

    如果您还没有 Google 帐号,请注册新帐号

  2. 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。

    转到项目选择器页面

  3. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

  4. 启用 Dataproc, Compute Engine, and Cloud Composer API。

    启用 API

  5. 安装并初始化 Cloud SDK

创建 Dataproc 工作流模板

在本地终端窗口或 Cloud Shell 中复制并运行下面列出的命令,以创建和定义工作流模板

  1. 创建 sparkpi 工作流模板。
    gcloud dataproc workflow-templates create sparkpi \
        --region=us-central1
          
  2. 将 Spark 作业添加到 sparkpi 工作流模板。“compute”step-id 标志用于识别 SparkPi 作业。
    gcloud dataproc workflow-templates add-job spark \
        --workflow-template=sparkpi \
        --step-id=compute \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --region=us-central1 \
        -- 1000
          
  3. 使用托管单节点集群运行工作流。Dataproc 将创建集群,对其运行工作流,然后在工作流完成时删除集群。
    gcloud dataproc workflow-templates set-managed-cluster sparkpi \
        --cluster-name=sparkpi \
        --single-node \
        --region=us-central1
          
  4. 确认创建工作流模板。

    控制台

    点击 Cloud Console 中的 Dataproc 工作流页面上的 sparkpi名称,打开工作流模板详细信息页面。点击工作流模板的名称以确认 sparkpi 模板特性。

    gcloud 命令

    运行以下命令:

    gcloud dataproc workflow-templates describe sparkpi --region=us-central1
        

创建 DAG 并将其上传到 Cloud Storage

  1. 创建或使用现有的 Cloud Composer 环境
  2. 设置环境变量。

    Airflow 界面

    1. 在工具栏中,点击 Admin > Variables
    2. 点击创建
    3. 请输入以下信息:
      • Key:project_id
      • Val:PROJECT_ID - 您的 Google Cloud 项目 ID
    4. 点击保存

    gcloud 命令

    输入以下命令:

    • ENVIRONMENT 是 Cloud Composer 环境的名称
    • LOCATION 是 Cloud Composer 环境所在的地区
    gcloud composer environments run ENVIRONMENT \
        --location LOCATION
        
  3. 将以下 DAG 代码从本地复制到名为“composer-dataproc-dag.py”的文件中,该文件使用 DataprocWorkflowTemplateInstantiateOperator

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/concepts.html#variables
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.contrib.operators import dataproc_operator
    from airflow.utils.dates import days_ago
    
    project_id = models.Variable.get("project_id")
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
    
        start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            region="us-central1",
        )
    
  4. 将 DAG 上传到 Cloud Storage 中的环境文件夹。上传成功完成后,点击 Cloud Composer 环境页面上的 DAGs 文件夹链接。

查看任务的状态

Airflow 界面

  1. 打开 Airflow 网页界面
  2. 在 DAG 页面上,点击 DAG 名称(例如 dataproc_workflow_dag)。
  3. 在 DAG 详细信息页面上,点击 Graph View
  4. 查看状态:
    • 失败:任务被红色框圈起。 您还可以将指针悬停在任务上,然后查看 State: Failed任务被红色框圈起,表示任务失败
    • 成功:任务被绿色框圈起。您还可以将指针悬停在任务上,然后查看 State: Success任务被绿色框圈起,表示任务成功

控制台

点击“工作流”标签页以查看工作流状态。

gcloud 命令

gcloud dataproc operations list \
    --region=us-central1 \
    --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
    

清理

为避免系统向您的 Google Cloud 帐号收取费用,您可以删除本教程中使用的资源:

  1. 删除 Cloud Composer 环境。

  2. 删除工作流模板。

后续步骤