Cloud Composer を使用するワークフロー

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

  • Dataproc
  • Compute Engine
  • Cloud Composer

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

プロジェクトを設定する

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Google Cloud プロジェクトで課金が有効になっていることを確認します

  4. Dataproc, Compute Engine, and Cloud Composer API を有効にします。

    API を有効にする

  5. Google Cloud CLI をインストールします。
  6. gcloud CLI を初期化するには:

    gcloud init
  7. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  8. Google Cloud プロジェクトで課金が有効になっていることを確認します

  9. Dataproc, Compute Engine, and Cloud Composer API を有効にします。

    API を有効にする

  10. Google Cloud CLI をインストールします。
  11. gcloud CLI を初期化するには:

    gcloud init

Dataproc ワークフロー テンプレートの作成

ローカル ターミナルウィンドウまたは Cloud Shell で以下に表示されているコマンドをコピーして実行し、ワークフロー テンプレートを作成して定義します。

  1. sparkpi ワークフロー テンプレートを作成します。
    gcloud dataproc workflow-templates create sparkpi \
        --region=us-central1
          
  2. Spark ジョブを sparkpi ワークフロー テンプレートに追加します。「compute」 step-id フラグは、SparkPi ジョブを示します。
    gcloud dataproc workflow-templates add-job spark \
        --workflow-template=sparkpi \
        --step-id=compute \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --region=us-central1 \
        -- 1000
          
  3. ワークフローを実行するには、マネージド単一ノードクラスタを使用します。Dataproc によってクラスタが作成され、ワークフローがクラスタ上で実行され、ワークフローの完了時にクラスタが削除されます。
    gcloud dataproc workflow-templates set-managed-cluster sparkpi \
        --cluster-name=sparkpi \
        --single-node \
        --region=us-central1
          
  4. ワークフロー テンプレートの作成を確認します。

    Console

    Google Cloud コンソールの [Dataproc ワークフロー] ページで sparkpi 名をクリックし、[ワークフロー テンプレートの詳細] ページを開きます。ワークフロー テンプレートの名前をクリックして、sparkpi テンプレート属性を確認します。

    gcloud コマンド

    次のコマンドを実行します。

    gcloud dataproc workflow-templates describe sparkpi --region=us-central1
        

DAG を作成して Cloud Storage にアップロードする

  1. Cloud Composer 環境を作成するか、既存の環境を使用します。
  2. 環境変数を設定します。

    Airflow UI

    1. ツールバーで、[Admin] > [Variables] をクリックします。
    2. [作成] をクリックします。
    3. 次の情報を入力します。
      • Key:project_id
      • Val: PROJECT_ID - Google Cloud プロジェクト ID
    4. [保存] をクリックします。

    gcloud コマンド

    次のコマンドを入力します。

    • ENVIRONMENT は、Cloud Composer 環境の名前です。
    • LOCATION は、Cloud Composer 環境が配置されているリージョンです。
    • PROJECT_ID は、Cloud Composer 環境を含むプロジェクトのプロジェクト ID です。
        gcloud composer environments run ENVIRONMENT --location LOCATION variables set -- project_id PROJECT_ID
        
  3. 次の DAG コードを、ローカルで「composer-dataproc-dag.py」という名前のファイルにコピーします。このコードは、DataprocWorkflowTemplateInstantiateOperator を使用します。

    Airflow 2

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.providers.google.cloud.operators.dataproc import (
        DataprocInstantiateWorkflowTemplateOperator,
    )
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = DataprocInstantiateWorkflowTemplateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            region="us-central1",
        )
    

    Airflow 1

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.contrib.operators import dataproc_operator
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            # For more info on regions where Dataflow is available see:
            # https://cloud.google.com/dataflow/docs/resources/locations
            region="us-central1",
        )
    
  4. Cloud Storage の環境フォルダに DAG をアップロードします。アップロードが正常に完了したら、Cloud Composer 環境のページにある DAG フォルダのリンクをクリックします。

タスクのステータスの表示

Airflow UI

  1. Airflow ウェブ インターフェースを開きます。
  2. DAG ページで、DAG 名(例: dataproc_workflow_dag)をクリックします。
  3. DAG の詳細ページで、[Graph View] をクリックします。
  4. ステータスを確認します。
    • Failed: タスクの周囲に赤いボックスが表示されます。 タスクにカーソルを合わせて [State: Failed] を探すこともできます。タスクの周囲に失敗したことを示す赤いボックスが表示されている
    • Success: タスクの周囲に緑色のボックスが表示されます。 タスクにカーソルを合わせて [State: Success] を確認することもできます。タスクの周囲に成功したことを示す緑色のボックスが表示されている

Console

[Workflow] タブをクリックして、ワークフローのステータスを確認します。

gcloud コマンド

gcloud dataproc operations list \
    --region=us-central1 \
    --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
    

クリーンアップ

Google Cloud アカウントに課金されないようにするには、このチュートリアルで使用したリソースを削除します。

  1. Cloud Composer 環境を削除します。

  2. ワークフロー テンプレートを削除します。

次のステップ