Alur kerja menggunakan Cloud Composer

Dalam dokumen ini, Anda akan menggunakan komponen Google Cloudyang dapat ditagih berikut:

  • Dataproc
  • Compute Engine
  • Cloud Composer

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga.

Pengguna Google Cloud baru mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Sebelum memulai

Menyiapkan project

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  5. Install the Google Cloud CLI.

  6. Jika Anda menggunakan penyedia identitas (IdP) eksternal, Anda harus login ke gcloud CLI dengan identitas gabungan Anda terlebih dahulu.

  7. Untuk melakukan inisialisasi gcloud CLI, jalankan perintah berikut:

    gcloud init
  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  9. Verify that billing is enabled for your Google Cloud project.

  10. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  11. Install the Google Cloud CLI.

  12. Jika Anda menggunakan penyedia identitas (IdP) eksternal, Anda harus login ke gcloud CLI dengan identitas gabungan Anda terlebih dahulu.

  13. Untuk melakukan inisialisasi gcloud CLI, jalankan perintah berikut:

    gcloud init
  14. Buat template alur kerja Dataproc

    Salin dan jalankan perintah berikut di jendela terminal lokal atau di Cloud Shell untuk membuat dan menentukan template alur kerja.

    1. Buat template alur kerja sparkpi.
      gcloud dataproc workflow-templates create sparkpi \
          --region=us-central1
            
    2. Tambahkan tugas spark ke template alur kerja sparkpi. Flag "compute" step-id mengidentifikasi tugas SparkPi.
      gcloud dataproc workflow-templates add-job spark \
          --workflow-template=sparkpi \
          --step-id=compute \
          --class=org.apache.spark.examples.SparkPi \
          --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
          --region=us-central1 \
          -- 1000
            
    3. Gunakan cluster terkelola, node tunggal untuk menjalankan alur kerja. Dataproc akan membuat cluster, menjalankan alur kerja di cluster tersebut, lalu menghapus cluster saat alur kerja selesai.
      gcloud dataproc workflow-templates set-managed-cluster sparkpi \
          --cluster-name=sparkpi \
          --single-node \
          --region=us-central1
            
    4. Konfirmasi pembuatan template alur kerja.

      Konsol

      Klik nama sparkpi di halaman Workflows Dataproc di konsol Google Cloud untuk membuka halaman Workflow template details. Klik nama template alur kerja Anda untuk mengonfirmasi atribut template sparkpi.

      Perintah gcloud

      Jalankan perintah berikut:

      gcloud dataproc workflow-templates describe sparkpi --region=us-central1
          

    Membuat dan mengupload DAG ke Cloud Storage

    1. Buat atau gunakan lingkungan Cloud Composer yang sudah ada.
    2. Menetapkan variabel lingkungan.

      UI Airflow

      1. Di toolbar, klik Admin > Variabel.
      2. Klik Buat.
      3. Masukkan informasi berikut:
        • Kunci:project_id
        • Val: PROJECT_ID — Google Cloud project ID Anda
      4. Klik Simpan.

      Perintah gcloud

      Masukkan perintah berikut:

      • ENVIRONMENT adalah nama lingkungan Cloud Composer
      • LOCATION adalah region tempat lingkungan Cloud Composer berada
      • PROJECT_ID adalah project ID untuk project yang berisi lingkungan Cloud Composer
          gcloud composer environments run ENVIRONMENT --location LOCATION variables set -- project_id PROJECT_ID
          
    3. Salin kode DAG berikut secara lokal ke dalam file berjudul "composer-dataproc-dag.py", yang menggunakan DataprocInstantiateWorkflowTemplateOperator.

      Airflow 2

      
      """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
      Spark Pi Job.
      
      This DAG relies on an Airflow variable
      https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
      * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
      """
      
      import datetime
      
      from airflow import models
      from airflow.providers.google.cloud.operators.dataproc import (
          DataprocInstantiateWorkflowTemplateOperator,
      )
      from airflow.utils.dates import days_ago
      
      project_id = "{{var.value.project_id}}"
      
      
      default_args = {
          # Tell airflow to start one day ago, so that it runs as soon as you upload it
          "start_date": days_ago(1),
          "project_id": project_id,
      }
      
      # Define a DAG (directed acyclic graph) of tasks.
      # Any task you create within the context manager is automatically added to the
      # DAG object.
      with models.DAG(
          # The id you will see in the DAG airflow page
          "dataproc_workflow_dag",
          default_args=default_args,
          # The interval with which to schedule the DAG
          schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
      ) as dag:
          start_template_job = DataprocInstantiateWorkflowTemplateOperator(
              # The task id of your job
              task_id="dataproc_workflow_dag",
              # The template id of your workflow
              template_id="sparkpi",
              project_id=project_id,
              # The region for the template
              region="us-central1",
          )
      

      Aliran udara 1

      
      """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
      Spark Pi Job.
      
      This DAG relies on an Airflow variable
      https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
      * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
      """
      
      import datetime
      
      from airflow import models
      from airflow.contrib.operators import dataproc_operator
      from airflow.utils.dates import days_ago
      
      project_id = "{{var.value.project_id}}"
      
      
      default_args = {
          # Tell airflow to start one day ago, so that it runs as soon as you upload it
          "start_date": days_ago(1),
          "project_id": project_id,
      }
      
      # Define a DAG (directed acyclic graph) of tasks.
      # Any task you create within the context manager is automatically added to the
      # DAG object.
      with models.DAG(
          # The id you will see in the DAG airflow page
          "dataproc_workflow_dag",
          default_args=default_args,
          # The interval with which to schedule the DAG
          schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
      ) as dag:
          start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
              # The task id of your job
              task_id="dataproc_workflow_dag",
              # The template id of your workflow
              template_id="sparkpi",
              project_id=project_id,
              # The region for the template
              # For more info on regions where Dataflow is available see:
              # https://cloud.google.com/dataflow/docs/resources/locations
              region="us-central1",
          )
      
    4. Upload DAG ke folder lingkungan Anda di Cloud Storage. Setelah upload berhasil diselesaikan, klik link Folder DAG di halaman Lingkungan Cloud Composer.

    Melihat status tugas

    UI Airflow

    1. Buka antarmuka web Airflow.
    2. Di halaman DAG, klik nama DAG (misalnya, dataproc_workflow_dag).
    3. Di halaman Detail DAG, klik Graph View.
    4. Periksa status:
      • Gagal: Tugas memiliki kotak merah di sekelilingnya. Anda juga dapat menahan kursor di atas tugas dan mencari Status: Gagal. tugas memiliki kotak merah di sekelilingnya, yang menunjukkan bahwa tugas tersebut gagal
      • Berhasil: Tugas memiliki kotak hijau di sekelilingnya. Anda juga dapat mengarahkan kursor ke tugas dan memeriksa Status: Berhasil. tugas memiliki kotak hijau di sekitarnya, yang menunjukkan bahwa tugas berhasil

    Konsol

    Klik tab Workflows untuk melihat status alur kerja.

    Perintah gcloud

    gcloud dataproc operations list \
        --region=us-central1 \
        --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
        

    Pembersihan

    Agar tidak menimbulkan biaya pada akun Google Cloud Anda, Anda dapat menghapus resource yang digunakan dalam tutorial ini:

    1. Hapus lingkungan Cloud Composer.

    2. Hapus template alur kerja.

    Langkah berikutnya