Menjalankan tugas jumlah kata Hadoop di cluster Dataproc

Cloud Composer 1 | Cloud Composer 2

Tutorial ini menunjukkan cara menggunakan Cloud Composer untuk membuat DAG (Directed Acyclic Graph) Apache Airflow yang menjalankan tugas jumlah kata Apache Hadoop di cluster Dataproc.

Tujuan

  1. Akses lingkungan Cloud Composer Anda dan gunakan Airflow UI.
  2. Membuat dan melihat variabel lingkungan Airflow.
  3. Buat dan jalankan DAG yang mencakup tugas-tugas berikut:
    1. Membuat cluster Dataproc.
    2. Menjalankan tugas penghitungan kata Apache Hadoop di cluster.
    3. Menghasilkan hasil jumlah kata ke bucket Cloud Storage.
    4. Menghapus cluster.

Biaya

Dalam dokumen ini, Anda menggunakan komponen Google Cloud yang dapat ditagih berikut:

  • Cloud Composer
  • Dataproc
  • Cloud Storage

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga. Pengguna baru Google Cloud mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Sebelum memulai

  • Pastikan API berikut diaktifkan di project Anda:

    Konsol

    Enable the Dataproc, Cloud Storage APIs.

    Enable the APIs

    gcloud

    Enable the Dataproc, Cloud Storage APIs:

    gcloud services enable dataproc.googleapis.com storage-component.googleapis.com

  • Di project Anda, buat bucket Cloud Storage dari kelas dan region penyimpanan apa pun untuk menyimpan hasil tugas jumlah kata Hadoop.

  • Catat jalur bucket yang Anda buat, misalnya gs://example-bucket. Anda akan menentukan variabel Airflow untuk jalur ini dan menggunakan variabel tersebut dalam contoh DAG nanti dalam tutorial ini.

  • Buat lingkungan Cloud Composer dengan parameter default. Tunggu hingga pembuatan lingkungan selesai. Setelah selesai, tanda centang hijau akan ditampilkan di sebelah kiri nama lingkungan.

  • Catat region tempat Anda membuat lingkungan, misalnya us-central. Anda akan menentukan variabel Airflow untuk region ini dan menggunakannya dalam contoh DAG untuk menjalankan cluster Dataproc di region yang sama.

Menetapkan variabel Airflow

Tetapkan variabel Airflow untuk digunakan nanti dalam contoh DAG. Misalnya, Anda dapat menetapkan variabel Airflow di UI Airflow.

Variabel Airflow Nilai
gcp_project Project ID dari project yang Anda gunakan untuk tutorial ini, seperti example-project.
gcs_bucket Bucket Cloud Storage URI yang Anda buat untuk tutorial ini, seperti gs://example-bucket.
gce_region Region tempat Anda membuat lingkungan, seperti us-central1. Ini adalah region tempat cluster Dataproc Anda akan dibuat.

Lihat contoh alur kerja

DAG Airflow adalah kumpulan tugas terorganisir yang ingin Anda jadwalkan dan jalankan. DAG ditentukan dalam file Python standar. Kode yang ditampilkan di hadoop_tutorial.py adalah kode alur kerja.

Aliran udara 2

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = (
    os.path.join(
        "{{ var.value.gcs_bucket }}",
        "wordcount",
        datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
    )
    + os.sep
)
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
# Arguments to pass to Cloud Dataproc job.
input_file = "gs://pub/shakespeare/rose.txt"
wordcount_args = ["wordcount", input_file, output_file]

HADOOP_JOB = {
    "reference": {"project_id": "{{ var.value.gcp_project }}"},
    "placement": {"cluster_name": "composer-hadoop-tutorial-cluster-{{ ds_nodash }}"},
    "hadoop_job": {
        "main_jar_file_uri": WORDCOUNT_JAR,
        "args": wordcount_args,
    },
}

CLUSTER_CONFIG = {
    "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
    "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least 5 minutes
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{ var.value.gcp_project }}",
    "region": "{{ var.value.gce_region }}",
}

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc.DataprocCreateClusterOperator(
        task_id="create_dataproc_cluster",
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        cluster_config=CLUSTER_CONFIG,
        region="{{ var.value.gce_region }}",
    )

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc.DataprocSubmitJobOperator(
        task_id="run_dataproc_hadoop", job=HADOOP_JOB
    )

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc.DataprocDeleteClusterOperator(
        task_id="delete_dataproc_cluster",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        region="{{ var.value.gce_region }}",
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE,
    )

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Aliran udara 1

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_region - Google Compute Engine region where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
output_file = (
    os.path.join(
        "{{ var.value.gcs_bucket }}",
        "wordcount",
        datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
    )
    + os.sep
)
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
# Arguments to pass to Cloud Dataproc job.
input_file = "gs://pub/shakespeare/rose.txt"
wordcount_args = ["wordcount", input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least 5 minutes
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{ var.value.gcp_project }}",
}

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id="create_dataproc_cluster",
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        num_workers=2,
        region="{{ var.value.gce_region }}",
        master_machine_type="n1-standard-2",
        worker_machine_type="n1-standard-2",
    )

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id="run_dataproc_hadoop",
        main_jar=WORDCOUNT_JAR,
        region="{{ var.value.gce_region }}",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        arguments=wordcount_args,
    )

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id="delete_dataproc_cluster",
        cluster_name="composer-hadoop-tutorial-cluster-{{ ds_nodash }}",
        region="{{ var.value.gce_region }}",
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE,
    )

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Operator

Untuk mengorkestrasi tiga tugas dalam contoh alur kerja, DAG mengimpor tiga operator Airflow berikut:

  • DataprocClusterCreateOperator: Membuat cluster Dataproc.

  • DataProcHadoopOperator: Mengirim tugas jumlah kata Hadoop dan menulis hasilnya ke bucket Cloud Storage.

  • DataprocClusterDeleteOperator: Menghapus cluster untuk menghindari timbulnya biaya Compute Engine yang berkelanjutan.

Dependensi

Anda mengatur tugas yang ingin dijalankan dengan cara yang mencerminkan hubungan dan dependensinya. Tugas dalam DAG ini berjalan secara berurutan.

Aliran udara 2

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Aliran udara 1

# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Penjadwalan

Nama DAG adalah composer_hadoop_tutorial, dan DAG berjalan sekali setiap hari. Karena start_date yang diteruskan ke default_dag_args ditetapkan ke yesterday, Cloud Composer menjadwalkan alur kerja untuk dimulai segera setelah DAG diupload ke bucket lingkungan.

Aliran udara 2

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Aliran udara 1

with models.DAG(
    "composer_hadoop_tutorial",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Mengupload DAG ke bucket lingkungan

Cloud Composer menyimpan DAG di folder /dags di bucket lingkungan Anda.

Untuk mengupload DAG:

  1. Di komputer lokal Anda, simpan hadoop_tutorial.py.

  2. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  3. Dalam daftar lingkungan, pada kolom folder DAGs untuk lingkungan Anda, klik link DAGs.

  4. Klik Upload file.

  5. Pilih hadoop_tutorial.py di komputer lokal Anda dan klik Open.

Cloud Composer menambahkan DAG ke Airflow dan menjadwalkan DAG secara otomatis. Perubahan DAG terjadi dalam 3-5 menit.

Pelajari operasi DAG

Lihat status tugas

Saat Anda mengupload file DAG ke folder dags/ di Cloud Storage, Cloud Composer akan mengurai file tersebut. Jika berhasil diselesaikan, nama alur kerja akan muncul di listingan DAG, dan alur kerja akan dimasukkan ke dalam antrean untuk segera dijalankan.

  1. Untuk melihat status tugas, buka antarmuka web Airflow, lalu klik DAG di toolbar.

  2. Untuk membuka halaman detail DAG, klik composer_hadoop_tutorial. Halaman ini menyertakan representasi grafis dari tugas dan dependensi alur kerja.

  3. Untuk melihat status setiap tugas, klik Graph View, lalu arahkan kursor ke grafik untuk setiap tugas.

Mengantrekan alur kerja lagi

Untuk menjalankan alur kerja lagi dari Graph View:

  1. Di Tampilan Grafik UI Airflow, klik grafis create_dataproc_cluster.
  2. Untuk mereset ketiga tugas tersebut, klik Hapus, lalu klik Oke untuk mengonfirmasi.
  3. Klik create_dataproc_cluster lagi di Graph View.
  4. Untuk mengantrekan alur kerja lagi, klik Jalankan.

Melihat hasil tugas

Anda juga dapat memeriksa status dan hasil alur kerja composer_hadoop_tutorial dengan membuka halaman Konsol Google Cloud berikut:

  • Cluster Dataproc: untuk memantau pembuatan dan penghapusan cluster. Perhatikan bahwa cluster yang dibuat oleh alur kerja bersifat sementara: cluster hanya ada selama durasi alur kerja dan dihapus sebagai bagian dari tugas alur kerja terakhir.

    Buka Cluster Dataproc

  • Tugas Dataproc: untuk melihat atau memantau tugas jumlah kata Apache Hadoop. Klik ID Tugas untuk melihat output log tugas.

    Buka Tugas Dataproc

  • Browser Cloud Storage: untuk melihat hasil jumlah kata di folder wordcount di bucket Cloud Storage yang Anda buat untuk tutorial ini.

    Buka Browser Cloud Storage

Pembersihan

Hapus resource yang digunakan dalam tutorial ini:

  1. Menghapus lingkungan Cloud Composer, termasuk menghapus bucket lingkungan secara manual.

  2. Hapus bucket Cloud Storage yang menyimpan hasil tugas jumlah kata Hadoop.