Alur kerja menggunakan Cloud Composer

Dalam dokumen ini, Anda menggunakan komponen Google Cloud yang dapat ditagih berikut:

  • Dataproc
  • Compute Engine
  • Cloud Composer

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga. Pengguna baru Google Cloud mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Sebelum memulai

Menyiapkan project

  1. Login ke akun Google Cloud Anda. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
  2. Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat project Google Cloud.

    Buka pemilih project

  3. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  4. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  5. Menginstal Google Cloud CLI.
  6. Untuk initialize gcloud CLI, jalankan perintah berikut:

    gcloud init
  7. Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat project Google Cloud.

    Buka pemilih project

  8. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  9. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  10. Menginstal Google Cloud CLI.
  11. Untuk initialize gcloud CLI, jalankan perintah berikut:

    gcloud init

Membuat template alur kerja Dataproc

Salin dan jalankan perintah yang tercantum di bawah di jendela terminal lokal atau di Cloud Shell untuk membuat dan menentukan template alur kerja.

  1. Buat template alur kerja sparkpi.
    gcloud dataproc workflow-templates create sparkpi \
        --region=us-central1
          
  2. Tambahkan tugas spark ke template alur kerja sparkpi. Flag step-id "compute" mengidentifikasi tugas SparkPi.
    gcloud dataproc workflow-templates add-job spark \
        --workflow-template=sparkpi \
        --step-id=compute \
        --class=org.apache.spark.examples.SparkPi \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --region=us-central1 \
        -- 1000
          
  3. Gunakan cluster node tunggal terkelola untuk menjalankan alur kerja. Dataproc akan membuat cluster, menjalankan alur kerja di dalamnya, lalu menghapus cluster saat alur kerja selesai.
    gcloud dataproc workflow-templates set-managed-cluster sparkpi \
        --cluster-name=sparkpi \
        --single-node \
        --region=us-central1
          
  4. Konfirmasi pembuatan template alur kerja.

    Konsol

    Klik nama sparkpi di halaman Workflows Dataproc di konsol Google Cloud untuk membuka halaman Workflow template details. Klik nama template alur kerja Anda untuk mengonfirmasi atribut template sparkpi.

    Perintah gcloud

    Jalankan perintah berikut:

    gcloud dataproc workflow-templates describe sparkpi --region=us-central1
        

Membuat dan Mengupload DAG ke Cloud Storage

  1. Buat atau gunakan lingkungan Cloud Composer yang sudah ada.
  2. Menetapkan variabel lingkungan.

    UI Airflow

    1. Di toolbar, klik Admin > Variabel.
    2. Klik Create.
    3. Masukkan informasi berikut:
      • Tombol:project_id
      • Val: PROJECT_ID — Project ID Google Cloud Anda
    4. Klik Save.

    Perintah gcloud

    Masukkan perintah berikut:

    • ENVIRONMENT adalah nama lingkungan Cloud Composer
    • LOCATION adalah region tempat lingkungan Cloud Composer berada
    • PROJECT_ID adalah project ID untuk project yang berisi lingkungan Cloud Composer
        gcloud composer environments run ENVIRONMENT --location LOCATION variables set -- project_id PROJECT_ID
        
  3. Salin kode DAG berikut secara lokal ke dalam file berjudul "composer-dataproc-dag.py", yang menggunakan DataprocInstantiateWorkflowTemplateOperator.

    Aliran udara 2

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.providers.google.cloud.operators.dataproc import (
        DataprocInstantiateWorkflowTemplateOperator,
    )
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = DataprocInstantiateWorkflowTemplateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            region="us-central1",
        )
    

    Aliran udara 1

    
    """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
    Spark Pi Job.
    
    This DAG relies on an Airflow variable
    https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
    * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
    """
    
    import datetime
    
    from airflow import models
    from airflow.contrib.operators import dataproc_operator
    from airflow.utils.dates import days_ago
    
    project_id = "{{var.value.project_id}}"
    
    default_args = {
        # Tell airflow to start one day ago, so that it runs as soon as you upload it
        "start_date": days_ago(1),
        "project_id": project_id,
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        # The id you will see in the DAG airflow page
        "dataproc_workflow_dag",
        default_args=default_args,
        # The interval with which to schedule the DAG
        schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
    ) as dag:
        start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
            # The task id of your job
            task_id="dataproc_workflow_dag",
            # The template id of your workflow
            template_id="sparkpi",
            project_id=project_id,
            # The region for the template
            # For more info on regions where Dataflow is available see:
            # https://cloud.google.com/dataflow/docs/resources/locations
            region="us-central1",
        )
    
  4. Upload DAG ke folder lingkungan Anda di Cloud Storage. Setelah upload berhasil diselesaikan, klik link DAGs Folder di halaman Lingkungan Cloud Composer.

Melihat status tugas

UI Airflow

  1. Buka Airflow web interface.
  2. Di halaman DAG, klik nama DAG (misalnya, dataproc_workflow_dag).
  3. Di halaman Detail DAG, klik Graph View.
  4. Periksa status:
    • Gagal: Ada kotak merah di sekeliling tugas. Anda juga dapat menahan pointer pada tugas dan mencari State: Failed. terdapat kotak merah di sekeliling tugas, yang menunjukkan bahwa tugas gagal
    • Berhasil: Tugas memiliki kotak hijau di sekelilingnya. Anda juga dapat menahan pointer ke tugas dan memeriksa State: Success. tugas memiliki kotak hijau di sekelilingnya, yang menunjukkan bahwa tugas telah berhasil

Konsol

Klik tab Workflows untuk melihat status alur kerja.

Perintah gcloud

gcloud dataproc operations list \
    --region=us-central1 \
    --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
    

Pembersihan

Untuk menghindari timbulnya biaya pada akun Google Cloud Anda, Anda dapat menghapus resource yang digunakan dalam tutorial ini:

  1. Menghapus lingkungan Cloud Composer.

  2. Hapus template alur kerja.

Langkah selanjutnya