Cloud Composer로 Dataproc 서버리스 워크로드 실행

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

이 페이지에서는 Cloud Composer 2를 사용해서 Google Cloud에서 Dataproc Serverless 워크로드를 실행하는 방법을 설명합니다.

다음 섹션의 예시에서는 Dataproc 서버리스 일괄 워크로드를 관리하는 연산자를 사용하는 방법을 보여줍니다. Dataproc 서버리스 Spark 일괄 워크로드를 만들고, 삭제하고, 나열하고, 가져오는 DAG에서 다음 연산자를 사용합니다.

시작하기 전에

  1. Dataproc API 사용 설정:

    콘솔

    Enable the Dataproc API.

    Enable the API

    gcloud

    Enable the Dataproc API:

    gcloud services enable dataproc.googleapis.com

  2. 일괄 워크로드 파일의 위치를 선택합니다. 다음 옵션 중에서 사용할 수 있습니다.

    • 이 파일을 저장하는 Cloud Storage 버킷을 만듭니다.
    • 사용자 환경의 버킷을 사용합니다. 이 파일을 Airflow와 동기화할 필요가 없으므로 /dags 또는 /data 폴더 외부에 별개의 하위 폴더를 만들 수 있습니다. 예를 들면 /batches입니다.
    • 기존 버킷을 사용합니다.

파일 및 Airflow 변수 설정

이 섹션에서는 이 튜토리얼에서 파일을 설정하고 Airflow 변수를 구성하는 방법을 보여줍니다.

버킷에 Dataproc Serverless Spark ML 워크로드 파일 업로드

이 튜토리얼의 워크로드는 pyspark 스크립트를 실행합니다.

  1. 모든 pyspark 스크립트를 spark-job.py라는 로컬 파일에 저장합니다. 예를 들어 샘플 pyspark 스크립트를 사용할 수 있습니다.

  2. 시작하기 전에에서 선택한 위치에 파일을 업로드합니다.

Airflow 변수 설정

다음 섹션의 예시에는 Airflow 변수가 사용됩니다. Airflow에서 이러한 변수의 값을 설정하면 DAG 코드에서 이러한 값에 액세스할 수 있습니다.

이 튜토리얼의 예시에서는 다음 Airflow 변수를 사용합니다. 사용하는 예시에 따라 필요한 대로 설정할 수 있습니다.

DAG 코드에서 사용할 다음 Airflow 변수를 설정합니다.

  • project_id: 프로젝트 ID.
  • bucket_name: 워크로드의 기본 Python 파일(spark-job.py)이 있는 버킷의 URI입니다. 시작하기 전에에서 이 위치를 선택했습니다.
  • phs_cluster: 영구 기록 서버 클러스터 이름입니다. 영구 기록 서버 만들기를 수행할 때 이 변수를 설정합니다.
  • image_name: 커스텀 컨테이너 이미지(image:tag)의 이름과 태그입니다. DataprocCreateBatchOperator와 함께 커스텀 컨테이너 이미지를 사용할 때 이 변수를 설정합니다.
  • metastore_cluster: Dataproc Metastore 서비스 이름입니다. DataprocCreateBatchOperator와 함께 Dataproc Metastore 서비스 사용 시 이 변수를 설정합니다.
  • region_name: Dataproc Metastore 서비스가 있는 리전입니다. DataprocCreateBatchOperator와 함께 Dataproc Metastore 서비스 사용 시 이 변수를 설정합니다.

Google Cloud 콘솔 및 Airflow UI를 사용하여 각 Airflow 변수 설정

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. 환경 목록에서 해당 환경의 Airflow 링크를 클릭합니다. Airflow UI가 열립니다.

  3. Airflow UI에서 관리 > 변수를 선택합니다.

  4. 새 레코드 추가를 클릭합니다.

  5. 필드에 변수 이름을 지정하고 필드에 해당 값을 설정합니다.

  6. 저장을 클릭합니다.

영구 기록 서버 만들기

영구 기록 서버(PHS)를 사용하여 일괄 워크로드의 Spark 기록 파일을 확인합니다.

  1. 영구 기록 서버 만들기
  2. phs_cluster Airflow 변수에 PHS 클러스터의 이름을 지정했는지 확인합니다.

DataprocCreateBatchOperator

다음 DAG는 Dataproc 서버리스 일괄 워크로드를 시작합니다.

DataprocCreateBatchOperator 인수에 대한 자세한 내용은 연산자의 소스 코드를 참조하세요.

DataprocCreateBatchOperatorbatch 매개변수에 전달할 수 있는 속성에 대한 자세한 내용은 Batch 클래스 설명을 참조하세요.


"""
Examples below show how to use operators for managing Dataproc Serverless batch workloads.
 You use these operators in DAGs that create, delete, list, and get a Dataproc Serverless Spark batch workload.
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id is the Google Cloud Project ID to use for the Cloud Dataproc Serverless.
* bucket_name is the URI of a bucket where the main python file of the workload (spark-job.py) is located.
* phs_cluster is the Persistent History Server cluster name.
* image_name is the name and tag of the custom container image (image:tag).
* metastore_cluster is the Dataproc Metastore service name.
* region_name is the region where the Dataproc Metastore service is located.
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateBatchOperator,
    DataprocDeleteBatchOperator,
    DataprocGetBatchOperator,
    DataprocListBatchesOperator,
)
from airflow.utils.dates import days_ago

PROJECT_ID = "{{ var.value.project_id }}"
REGION = "{{ var.value.region_name}}"
BUCKET = "{{ var.value.bucket_name }}"
PHS_CLUSTER = "{{ var.value.phs_cluster }}"
METASTORE_CLUSTER = "{{var.value.metastore_cluster}}"
DOCKER_IMAGE = "{{var.value.image_name}}"

PYTHON_FILE_LOCATION = "gs://{{var.value.bucket_name }}/spark-job.py"
# for e.g.  "gs//my-bucket/spark-job.py"
# Start a single node Dataproc Cluster for viewing Persistent History of Spark jobs
PHS_CLUSTER_PATH = "projects/{{ var.value.project_id }}/regions/{{ var.value.region_name}}/clusters/{{ var.value.phs_cluster }}"
# for e.g. projects/my-project/regions/my-region/clusters/my-cluster"
SPARK_BIGQUERY_JAR_FILE = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
# use this for those pyspark jobs that need a spark-bigquery connector
# https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example
# Start a Dataproc MetaStore Cluster
METASTORE_SERVICE_LOCATION = "projects/{{var.value.project_id}}/locations/{{var.value.region_name}}/services/{{var.value.metastore_cluster }}"
# for e.g. projects/my-project/locations/my-region/services/my-cluster
CUSTOM_CONTAINER = "us.gcr.io/{{var.value.project_id}}/{{ var.value.image_name}}"
# for e.g. "us.gcr.io/my-project/quickstart-image",

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "project_id": PROJECT_ID,
    "region": REGION,
}
with models.DAG(
    "dataproc_batch_operators",  # The id you will see in the DAG airflow page
    default_args=default_args,  # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    create_batch = DataprocCreateBatchOperator(
        task_id="batch_create",
        batch={
            "pyspark_batch": {
                "main_python_file_uri": PYTHON_FILE_LOCATION,
                "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
            },
            "environment_config": {
                "peripherals_config": {
                    "spark_history_server_config": {
                        "dataproc_cluster": PHS_CLUSTER_PATH,
                    },
                },
            },
        },
        batch_id="batch-create-phs",
    )
    list_batches = DataprocListBatchesOperator(
        task_id="list-all-batches",
    )

    get_batch = DataprocGetBatchOperator(
        task_id="get_batch",
        batch_id="batch-create-phs",
    )
    delete_batch = DataprocDeleteBatchOperator(
        task_id="delete_batch",
        batch_id="batch-create-phs",
    )
    create_batch >> list_batches >> get_batch >> delete_batch

DataprocCreateBatchOperator와 함께 커스텀 컨테이너 이미지 사용

다음 예시에서는 커스텀 컨테이너 이미지를 사용하여 워크로드를 실행하는 방법을 보여줍니다. 예를 들어 커스텀 컨테이너를 사용하면 기본 컨테이너 이미지로 제공되지 않는 Python 종속 항목을 추가할 수 있습니다.

커스텀 컨테이너 이미지를 사용하려면 다음 안내를 따르세요.

  1. 커스텀 컨테이너 이미지를 만들고 Container Registry에 업로드합니다.

  2. image_name Airflow 변수에 이미지를 지정합니다.

  3. 커스텀 이미지에서 DataprocCreateBatchOperator를 사용합니다.

create_batch_with_custom_container = DataprocCreateBatchOperator(
    task_id="dataproc_custom_container",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
        "runtime_config": {
            "container_image": CUSTOM_CONTAINER,
        },
    },
    batch_id="batch-custom-container",
)
get_batch_custom = DataprocGetBatchOperator(
    task_id="get_batch_custom",
    batch_id="batch-custom-container",
)
delete_batch_custom = DataprocDeleteBatchOperator(
    task_id="delete_batch_custom",
    batch_id="batch-custom-container",
)
create_batch_with_custom_container >> get_batch_custom >> delete_batch_custom

DataprocCreateBatchOperator와 함께 Dataproc Metastore 서비스 사용

DAG에서 Dataproc Metastore 서비스를 사용하려면 다음 안내를 따르세요.

  1. 메타스토어 서비스가 이미 시작되었는지 확인합니다.

    메타스토어 서비스 시작에 대한 자세한 내용은 Dataproc Metastore 사용 설정 및 사용 중지를 참조하세요.

    구성을 만드는 일괄 연산자에 대한 자세한 내용은 PeripheralsConfig를 참조하세요.

  2. 메타스토어 서비스가 작동되어 실행되면 metastore_cluster 변수에 이름을 지정하고 region_name Airflow 변수에 리전을 지정합니다.

  3. DataprocCreateBatchOperator에서 메타스토어 서비스를 사용합니다.

create_batch_with_metastore = DataprocCreateBatchOperator(
    task_id="dataproc_metastore",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "metastore_service": METASTORE_SERVICE_LOCATION,
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
    },
    batch_id="dataproc-metastore",
)
get_batch_metastore = DataprocGetBatchOperator(
    task_id="get_batch_metatstore",
    batch_id="dataproc-metastore",
)
delete_batch_metastore = DataprocDeleteBatchOperator(
    task_id="delete_batch_metastore",
    batch_id="dataproc-metastore",
)

create_batch_with_metastore >> get_batch_metastore >> delete_batch_metastore

DataprocDeleteBatchOperator

DataprocDeleteBatchOperator를 사용하여 워크로드의 배치 ID를 기반으로 배치를 삭제할 수 있습니다.

delete_batch = DataprocDeleteBatchOperator(
    task_id="delete_batch",
    batch_id="batch-create-phs",
)

DataprocListBatchesOperator

DataprocDeleteBatchOperator는 지정된 project_id 및 리전 내에 존재하는 배치를 나열합니다.

list_batches = DataprocListBatchesOperator(
    task_id="list-all-batches",
)

DataprocGetBatchOperator

DataprocGetBatchOperator는 특정 일괄 워크로드 하나를 가져옵니다.

get_batch = DataprocGetBatchOperator(
    task_id="get_batch",
    batch_id="batch-create-phs",
)

다음 단계