Cloud Composer を使用するワークフロー

このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。

  • Dataproc
  • Compute Engine
  • Cloud Composer

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。新しい Google Cloud ユーザーは無料トライアルをご利用いただけます。

始める前に

プロジェクトを設定する

  1. Google アカウントにログインします。

    Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。

  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    [プロジェクトの選択] ページに移動

  3. Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する

  4. Dataproc, Compute Engine, and Cloud Composer API を有効にします。

    API を有効にする

  5. Cloud SDK をインストールして初期化します。

Dataproc ワークフロー テンプレートの作成

ローカル ターミナルウィンドウまたは Cloud Shell で以下に表示されているコマンドをコピーして実行し、ワークフロー テンプレートを作成して定義します。

  1. sparkpi ワークフロー テンプレートを作成します。
    gcloud dataproc workflow-templates create sparkpi \
        --region=us-central1
          
  2. Spark ジョブを sparkpi ワークフロー テンプレートに追加します。「compute」 step-id フラグは、SparkPi ジョブを示します。
    gcloud dataproc workflow-templates add-job spark \
        --workflow-template=sparkpi \
        --step-id=compute \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --region=us-central1 \
        -- 1000
          
  3. ワークフローを実行するには、マネージド単一ノードクラスタを使用します。Dataproc によってクラスタが作成され、ワークフローがクラスタ上で実行され、ワークフローの完了時にクラスタが削除されます。
    gcloud dataproc workflow-templates set-managed-cluster sparkpi \
        --cluster-name=sparkpi \
        --single-node \
        --region=us-central1
          
  4. ワークフロー テンプレートの作成を確認します。

    Console

    Cloud Console の [Dataproc ワークフロー] ページで sparkpi 名をクリックし、[ワークフロー テンプレートの詳細] ページを開きます。ワークフロー テンプレートの名前をクリックして、sparkpi テンプレートの属性を確認します。

    gcloud コマンド

    次のコマンドを実行します。

    gcloud dataproc workflow-templates describe sparkpi --region=us-central1
        

DAG を作成して Cloud Storage にアップロードする

  1. 既存の Cloud Composer 環境を作成または使用します。
  2. 環境変数を設定します。

    Airflow UI

    1. ツールバーで、[Admin] > [Variables] をクリックします。
    2. [作成] をクリックします。
    3. 次の情報を入力します。
      • Key:project_id
      • Val: PROJECT_ID - お客様の Google Cloud プロジェクト ID
    4. [保存] をクリックします。

    gcloud コマンド

    次のコマンドを入力します。

    • ENVIRONMENT は、Cloud Composer 環境の名前です。
    • LOCATION は、Cloud Composer 環境が配置されているリージョンです。
    gcloud composer environments run ENVIRONMENT \
        --location LOCATION
        
  3. 次の DAG コードを、DataprocWorkflowTemplateInstantiateOperator を使用する「composer-dataproc-dag.py」という名前のファイルにローカルにコピーします。

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/concepts.html#variables
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.contrib.operators import dataproc_operator
    from airflow.utils.dates import days_ago
    
    project_id = models.Variable.get("project_id")
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
    
        start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            region="us-central1",
        )
    
  4. Cloud Storage の環境フォルダに DAG をアップロードします。アップロードが正常に完了したら、Cloud Composer 環境のページで [DAG Folder] リンクをクリックします。

タスクのステータスの表示

Airflow UI

  1. Airflow ウェブ インターフェースを開きます。
  2. DAG ページで、DAG 名(dataproc_workflow_dag など)をクリックします。
  3. DAG の詳細ページで、[Graph View] をクリックします。
  4. ステータスを確認します。
    • Failed: タスクの周囲に赤いボックスが表示されます。 タスクにカーソルを合わせて [State: Failed] を探すこともできます。タスクの周囲に失敗したことを示す赤いボックスが表示されている
    • Success: タスクの周囲に緑色のボックスが表示されます。 タスクにカーソルを合わせて [State: Success] を確認することもできます。タスクの周囲に成功したことを示す緑色のボックスが表示されている

Console

[Workflow] タブをクリックして、ワークフローのステータスを確認します。

gcloud コマンド

gcloud dataproc operations list \
    --region=us-central1 \
    --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
    

クリーンアップ

Google Cloud アカウントに課金されないようにするには、このチュートリアルで使用したリソースを削除します。

  1. Cloud Composer 環境を削除します。

  2. ワークフロー テンプレートを削除します。

次のステップ