Alur kerja menggunakan Cloud Composer

Dalam dokumen ini, Anda akan menggunakan komponen Google Cloud yang dapat ditagih berikut:

  • Dataproc
  • Compute Engine
  • Cloud Composer

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga. Pengguna baru Google Cloud mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Sebelum memulai

Menyiapkan project

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  5. Install the Google Cloud CLI.
  6. To initialize the gcloud CLI, run the following command:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  8. Make sure that billing is enabled for your Google Cloud project.

  9. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  10. Install the Google Cloud CLI.
  11. To initialize the gcloud CLI, run the following command:

    gcloud init

Membuat template alur kerja Dataproc

Salin dan jalankan perintah yang tercantum di bawah ini di jendela terminal lokal atau di Cloud Shell untuk membuat dan menentukan template alur kerja.

  1. Buat template alur kerja sparkpi.
    gcloud dataproc workflow-templates create sparkpi \
        --region=us-central1
          
  2. Tambahkan tugas spark ke template alur kerja sparkpi. Flag step-id "compute" mengidentifikasi tugas SparkPi.
    gcloud dataproc workflow-templates add-job spark \
        --workflow-template=sparkpi \
        --step-id=compute \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --region=us-central1 \
        -- 1000
          
  3. Gunakan cluster node tunggal terkelola untuk menjalankan alur kerja. Dataproc akan membuat cluster, menjalankan alur kerja di dalamnya, lalu menghapus cluster saat alur kerja selesai.
    gcloud dataproc workflow-templates set-managed-cluster sparkpi \
        --cluster-name=sparkpi \
        --single-node \
        --region=us-central1
          
  4. Konfirmasi pembuatan template alur kerja.

    Konsol

    Klik nama sparkpi di halaman Workflows Dataproc di konsol Google Cloud untuk membuka halaman Workflow template details. Klik nama template alur kerja Anda untuk mengonfirmasi atribut template sparkpi.

    Perintah gcloud

    Jalankan perintah berikut:

    gcloud dataproc workflow-templates describe sparkpi --region=us-central1
        

Membuat dan Mengupload DAG ke Cloud Storage

  1. Buat atau gunakan lingkungan Cloud Composer yang ada.
  2. Menetapkan variabel lingkungan.

    UI Airflow

    1. Di toolbar, klik Admin > Variabel.
    2. Klik Create.
    3. Masukkan informasi berikut:
      • Kunci:project_id
      • Val: PROJECT_ID — Project ID Google Cloud Anda
    4. Klik Simpan.

    Perintah gcloud

    Masukkan perintah berikut:

    • ENVIRONMENT adalah nama lingkungan Cloud Composer
    • LOCATION adalah region tempat lingkungan Cloud Composer berada
    • PROJECT_ID adalah project ID untuk project yang berisi lingkungan Cloud Composer
        gcloud composer environments run ENVIRONMENT --location LOCATION variables set -- project_id PROJECT_ID
        
  3. Salin kode DAG berikut secara lokal ke dalam file berjudul "composer-dataproc-dag.py", yang menggunakan DataprocInstantiateWorkflowTemplateOperator.

    Airflow 2

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.providers.google.cloud.operators.dataproc import (
        DataprocInstantiateWorkflowTemplateOperator,
    )
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = DataprocInstantiateWorkflowTemplateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            region="us-central1",
        )
    

    Aliran udara 1

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.contrib.operators import dataproc_operator
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            # For more info on regions where Dataflow is available see:
            # https://cloud.google.com/dataflow/docs/resources/locations
            region="us-central1",
        )
    
  4. Upload DAG Anda ke folder lingkungan di Cloud Storage. Setelah upload berhasil selesai, klik link Folder DAG di halaman Lingkungan Cloud Composer.

Melihat status tugas

UI Airflow

  1. Buka antarmuka web Airflow.
  2. Di halaman DAG, klik nama DAG (misalnya, dataproc_workflow_dag).
  3. Di halaman Detail DAG, klik Tampilan Grafik.
  4. Periksa status:
    • Gagal: Tugas memiliki kotak merah di sekelilingnya. Anda juga dapat menahan kursor di atas tugas dan mencari Status: Gagal. tugas memiliki kotak merah di sekitarnya, yang menunjukkan bahwa tugas tersebut telah gagal
    • Berhasil: Tugas memiliki kotak hijau di sekelilingnya. Anda juga dapat mengarahkan kursor ke tugas dan memeriksa Status: Berhasil. tugas memiliki kotak hijau di sekitarnya, yang menunjukkan bahwa tugas tersebut telah berhasil

Konsol

Klik tab Alur kerja untuk melihat status alur kerja.

Perintah gcloud

gcloud dataproc operations list \
    --region=us-central1 \
    --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
    

Pembersihan

Agar tidak menimbulkan tagihan ke akun Google Cloud Anda, Anda dapat menghapus resource yang digunakan dalam tutorial ini:

  1. Hapus lingkungan Cloud Composer.

  2. Hapus template alur kerja.

Langkah selanjutnya