使用 Cloud Composer 的工作流

在本文档中,您将使用 Google Cloud 的以下收费组件:

  • Dataproc
  • Compute Engine
  • Cloud Composer

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

设置项目

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  5. Install the Google Cloud CLI.
  6. To initialize the gcloud CLI, run the following command:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  8. Make sure that billing is enabled for your Google Cloud project.

  9. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  10. Install the Google Cloud CLI.
  11. To initialize the gcloud CLI, run the following command:

    gcloud init

创建 Dataproc 工作流模板

在本地终端窗口或 Cloud Shell 中复制并运行下面列出的命令,以创建和定义工作流模板

  1. 创建 sparkpi 工作流模板。
    gcloud dataproc workflow-templates create sparkpi \
        --region=us-central1
          
  2. 将 Spark 作业添加到 sparkpi 工作流模板。“compute”step-id 标志用于识别 SparkPi 作业。
    gcloud dataproc workflow-templates add-job spark \
        --workflow-template=sparkpi \
        --step-id=compute \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --region=us-central1 \
        -- 1000
          
  3. 使用托管单节点集群运行工作流。Dataproc 将创建集群,对其运行工作流,然后在工作流完成时删除集群。
    gcloud dataproc workflow-templates set-managed-cluster sparkpi \
        --cluster-name=sparkpi \
        --single-node \
        --region=us-central1
          
  4. 确认创建工作流模板。

    控制台

    点击 Google Cloud 控制台中 Dataproc 工作流页面上的 sparkpi 名称,打开工作流模板详细信息页面。点击工作流模板的名称以确认 sparkpi 模板属性。

    gcloud 命令

    运行以下命令:

    gcloud dataproc workflow-templates describe sparkpi --region=us-central1
        

创建 DAG 并将其上传到 Cloud Storage

  1. 创建或使用现有的 Cloud Composer 环境
  2. 设置环境变量。

    Airflow 界面

    1. 在工具栏中,点击 Admin > Variables
    2. 点击创建
    3. 请输入以下信息:
      • Key:project_id
      • Val:PROJECT_ID - 您的 Google Cloud 项目 ID
    4. 点击保存

    gcloud 命令

    输入以下命令:

    • ENVIRONMENT 是 Cloud Composer 环境的名称
    • LOCATION 是 Cloud Composer 环境所在的地区
    • PROJECT_ID 是包含 Cloud Composer 环境的项目的项目 ID
        gcloud composer environments run ENVIRONMENT --location LOCATION variables set -- project_id PROJECT_ID
        
  3. 将以下 DAG 代码从本地复制到名为“composer-dataproc-dag.py”的文件中,该文件使用 DataprocInstantiateWorkflowTemplateOperator

    Airflow 2

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.providers.google.cloud.operators.dataproc import (
        DataprocInstantiateWorkflowTemplateOperator,
    )
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = DataprocInstantiateWorkflowTemplateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            region="us-central1",
        )
    

    Airflow 1

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.contrib.operators import dataproc_operator
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            # For more info on regions where Dataflow is available see:
            # https://cloud.google.com/dataflow/docs/resources/locations
            region="us-central1",
        )
    
  4. 将 DAG 上传到 Cloud Storage 中的环境文件夹。上传成功完成后,点击 Cloud Composer 环境页面上的 DAGs 文件夹链接。

查看任务的状态

Airflow 界面

  1. 打开 Airflow 网页界面
  2. 在 DAG 页面上,点击 DAG 名称(例如 dataproc_workflow_dag)。
  3. 在 DAG 详细信息页面上,点击 Graph View
  4. 查看状态:
    • 失败:任务被红色框圈起。 您还可以将指针悬停在任务上,然后查看 State: Failed任务被红色框圈起,表示任务失败
    • 成功:任务被绿色框圈起。您还可以将指针悬停在任务上,然后查看 State: Success任务被绿色框圈起,表示任务成功

控制台

点击“工作流”标签页以查看工作流状态。

gcloud 命令

gcloud dataproc operations list \
    --region=us-central1 \
    --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
    

清理

为避免系统向您的 Google Cloud 账号收取费用,您可以删除本教程中使用的资源:

  1. 删除 Cloud Composer 环境。

  2. 删除工作流模板。

后续步骤