Cloud Functions 및 Pub/Sub 메시지를 사용하여 DAG 트리거

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

이 페이지에서는 Pub/Sub 주제 변경에 대한 응답으로 Cloud Composer DAG를 트리거하여 이벤트 기반 푸시 아키텍처를 만드는 방법을 설명합니다. 이 튜토리얼의 예시에서는 DAG 프로세스의 일부로 구독 관리를 포함한 Pub/Sub 관리의 전체 주기를 처리하는 방법을 보여줍니다. DAG를 트리거해야 하지만 추가 액세스 권한을 설정하지 않으려는 경우와 같은 일반적인 사용 사례에 적합합니다.

예를 들어 보안상의 이유로 Cloud Composer 환경에 직접 액세스할 수 없게 하려면 Pub/Sub를 통해 전송된 메시지를 솔루션으로 사용할 수 있습니다. Pub/Sub 메시지를 만들어 Pub/Sub 주제에 게시하는 Cloud Run 함수를 구성할 수 있습니다. 그런 다음 Pub/Sub 메시지를 가져온 후 이러한 메시지를 처리하는 DAG를 만들 수 있습니다.

이 특정 예시에서는 Cloud Run 함수를 만들고 두 DAG를 배포합니다. 첫 번째 DAG는 Pub/Sub 메시지를 가져와서 Pub/Sub 메시지 콘텐츠에 따라 두 번째 DAG를 트리거합니다.

이 튜토리얼에서는 사용자가 Python 및 Google Cloud 콘솔에 대해 잘 알고 있다고 가정합니다.

목표

비용

이 튜토리얼에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.

이 튜토리얼을 마치면 만든 리소스를 삭제하여 비용이 계속 청구되지 않도록 할 수 있습니다. 자세한 내용은 삭제를 참조하세요.

시작하기 전에

이 튜토리얼에는 Google Cloud 프로젝트가 필요합니다. 다음과 같은 방법으로 프로젝트를 구성합니다.

  1. Google Cloud 콘솔에서 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기로 이동

  2. 프로젝트에 결제가 사용 설정되어 있는지 확인합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.

  3. Google Cloud 프로젝트 사용자에게 필요한 리소스를 만들 수 있는 다음 역할이 있는지 확인합니다.

    • 서비스 계정 사용자(roles/iam.serviceAccountUser)
    • Pub/Sub 편집자(roles/pubsub.editor)
    • 환경 및 스토리지 객체 관리자(roles/composer.environmentAndStorageObjectAdmin)
    • Cloud Run 함수 관리자(roles/cloudfunctions.admin)
    • 로그 뷰어(roles/logging.viewer)
  4. Cloud Run 함수를 실행하는 서비스 계정에 프로젝트에서 Pub/Sub에 액세스할 수 있는 충분한 권한이 있는지 확인합니다. 기본적으로 Cloud Run 함수는 App Engine 기본 서비스 계정을 사용합니다. 이 서비스 계정에는 이 튜토리얼에 대한 충분한 권한이 있는 편집자 역할이 있습니다.

프로젝트에 API 사용 설정

콘솔

Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs.

Enable the APIs

gcloud

Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs:

gcloud services enable composer.googleapis.com cloudfunctions.googleapis.com pubsub.googleapis.com

Terraform

Terraform 스크립트에 다음 리소스 정의를 추가하여 프로젝트에서 Cloud Composer API를 사용 설정합니다.

resource "google_project_service" "composer_api" {
  project = "<PROJECT_ID>"
  service = "composer.googleapis.com"
  // Disabling Cloud Composer API might irreversibly break all other
  // environments in your project.
  // This parameter prevents automatic disabling
  // of the API when the resource is destroyed.
  // We recommend to disable the API only after all environments are deleted.
  disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
  check_if_service_has_usage_on_destroy = true
}

resource "google_project_service" "pubsub_api" {
  project = "<PROJECT_ID>"
  service = "pubsub.googleapis.com"
  disable_on_destroy = false
}

resource "google_project_service" "functions_api" {
  project = "<PROJECT_ID>"
  service = "cloudfunctions.googleapis.com"
  disable_on_destroy = false
}

<PROJECT_ID>를 해당 프로젝트의 프로젝트 ID로 바꿉니다. 예를 들면 example-project입니다.

Cloud Composer 환경 만들기

Cloud Composer 2 환경을 만듭니다.

이 절차의 일부로 Cloud Composer v2 API 서비스 에이전트 확장(roles/composer.ServiceAgentV2Ext) 역할을 Composer 서비스 에이전트 계정에 부여합니다. Cloud Composer는 이 계정을 사용하여 Google Cloud 프로젝트에서 작업을 수행합니다.

Pub/Sub 주제 만들기

이 예시에서는 Pub/Sub 주제에 푸시된 메시지에 대한 응답으로 DAG를 트리거합니다. 이 예시에서 사용할 Pub/Sub 주제를 만듭니다.

콘솔

  1. Google Cloud 콘솔에서 Pub/Sub 주제 페이지로 이동합니다.

    Pub/Sub 주제로 이동

  2. 주제 만들기를 클릭합니다.

  3. 주제 ID 필드에 주제의 ID로 dag-topic-trigger를 입력합니다.

  4. 다른 옵션은 기본값으로 유지합니다.

  5. 주제 만들기를 클릭합니다.

gcloud

주제를 만들려면 Google Cloud CLI에서 gcloud pubsub topics create 명령어를 실행합니다.

gcloud pubsub topics create dag-topic-trigger

Terraform

Terraform 스크립트에 다음 리소스 정의를 추가합니다.

resource "google_pubsub_topic" "trigger" {
  project                    = "<PROJECT_ID>"
  name                       = "dag-topic-trigger"
  message_retention_duration = "86600s"
}

<PROJECT_ID>를 해당 프로젝트의 프로젝트 ID로 바꿉니다. 예를 들면 example-project입니다.

DAG 업로드

DAG를 환경에 업로드합니다.

  1. 다음 DAG 파일을 로컬 컴퓨터에 저장합니다.
  2. <PROJECT_ID>를 해당 프로젝트의 프로젝트 ID로 바꿉니다. 예를 들면 example-project입니다.
  3. 환경에 수정한 DAG 파일을 업로드합니다.
from __future__ import annotations

from datetime import datetime
import time

from airflow import DAG
from airflow import XComArg
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.google.cloud.operators.pubsub import (
    PubSubCreateSubscriptionOperator,
    PubSubPullOperator,
)

PROJECT_ID = "<PROJECT_ID>"
TOPIC_ID = "dag-topic-trigger"
SUBSCRIPTION = "trigger_dag_subscription"


def handle_messages(pulled_messages, context):
    dag_ids = list()
    for idx, m in enumerate(pulled_messages):
        data = m.message.data.decode("utf-8")
        print(f"message {idx} data is {data}")
        dag_ids.append(data)
    return dag_ids


# This DAG will run minutely and handle pub/sub messages by triggering target DAG
with DAG(
    "trigger_dag",
    start_date=datetime(2021, 1, 1),
    schedule_interval="* * * * *",
    max_active_runs=1,
    catchup=False,
) as trigger_dag:
    # If subscription exists, we will use it. If not - create new one
    subscribe_task = PubSubCreateSubscriptionOperator(
        task_id="subscribe_task",
        project_id=PROJECT_ID,
        topic=TOPIC_ID,
        subscription=SUBSCRIPTION,
    )

    subscription = subscribe_task.output

    # Proceed maximum 50 messages in callback function handle_messages
    # Here we acknowledge messages automatically. You can use PubSubHook.acknowledge to acknowledge in downstream tasks
    # https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/pubsub/index.html#airflow.providers.google.cloud.hooks.pubsub.PubSubHook.acknowledge
    pull_messages_operator = PubSubPullOperator(
        task_id="pull_messages_operator",
        project_id=PROJECT_ID,
        ack_messages=True,
        messages_callback=handle_messages,
        subscription=subscription,
        max_messages=50,
    )

    # Here we use Dynamic Task Mapping to trigger DAGs according to messages content
    # https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html
    trigger_target_dag = TriggerDagRunOperator.partial(task_id="trigger_target").expand(
        trigger_dag_id=XComArg(pull_messages_operator)
    )

    (subscribe_task >> pull_messages_operator >> trigger_target_dag)


def _some_heavy_task():
    print("Do some operation...")
    time.sleep(1)
    print("Done!")


# Simple target DAG
with DAG(
    "target_dag",
    start_date=datetime(2022, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
    catchup=False,
) as target_dag:
    some_heavy_task = PythonOperator(
        task_id="some_heavy_task", python_callable=_some_heavy_task
    )

    (some_heavy_task)

샘플 코드에는 2개의 DAG trigger_dagtarget_dag가 포함되어 있습니다.

trigger_dag DAG는 Pub/Sub 주제를 구독하고 Pub/Sub 메시지를 가져오고 Pub/Sub 메시지 데이터의 DAG ID에 지정된 다른 DAG를 트리거합니다. 이 예시에서 trigger_dag는 태스크 로그에 메시지를 출력하는 target_dag DAG를 트리거합니다.

trigger_dag DAG에는 다음 태스크가 포함되어 있습니다.

  • subscribe_task: Pub/Sub 주제를 구독합니다.
  • pull_messages_operator: PubSubPullOperator로 Pub/Sub 메시지 데이터를 읽습니다.
  • trigger_target_dag: Pub/Sub 주제에서 가져온 메시지의 데이터에 따라 다른 DAG(이 예시에서는 target_dag)를 트리거합니다.

target_dag DAG에는 output_to_logs 태스크 하나만 포함되어 있습니다. 이 태스크는 1초 지연으로 메시지를 태스크 로그에 출력합니다.

Pub/Sub 주제에 메시지를 게시하는 Cloud Run 함수 배포

이 섹션에서는 Pub/Sub 주제에 메시지를 게시하는 Cloud Run 함수를 배포합니다.

Cloud Run 함수 만들기 및 구성 지정

콘솔

  1. Google Cloud 콘솔에서 Cloud Run 함수 페이지로 이동합니다.

    Cloud Run 함수로 이동

  2. 함수 만들기를 클릭합니다.

  3. 환경 필드에서 1세대를 선택합니다.

  4. 함수 이름 필드에 함수 이름 pubsub-publisher를 입력합니다.

  5. 트리거 유형 필드에서 HTTP를 선택합니다.

  6. 인증 섹션에서 인증되지 않은 호출 허용을 선택합니다. 이 옵션은 인증되지 않은 사용자에게 HTTP 함수를 호출할 수 있는 권한을 부여합니다.

  7. 저장을 클릭합니다.

  8. 다음을 클릭하여 코드 단계로 이동합니다.

Terraform

Terraform에서 함수의 소스 코드를 간단하게 관리할 수 있는 방법이 없으므로 이 단계에서는 Google Cloud 콘솔을 사용하는 것이 좋습니다.

이 예시에서는 Cloud Storage 버킷을 만들고 이 버킷에 파일을 저장한 다음 버킷의 파일을 Cloud Run 함수의 소스로 사용하여 로컬 zip 파일에서 Cloud Run 함수를 업로드하는 방법을 보여줍니다. 이 방법을 사용하면 새 보관 파일을 만들어도 Terraform에서 함수의 소스 코드를 자동으로 업데이트하지 않습니다. 함수 코드를 다시 업로드하려면 보관 파일의 파일 이름을 변경하면 됩니다.

  1. pubsub_publisher.pyrequirements.txt 파일을 다운로드합니다.
  2. pubsub_publisher.py 파일에서 <PROJECT_ID>를 프로젝트의 프로젝트 ID로 바꿉니다. 예를 들면 example-project입니다.
  3. pbusub_publisner.pyrequirements.txt 파일을 사용하여 pubsub_function.zip이라는 ZIP 파일을 만듭니다.
  4. Terraform 스크립트가 저장된 디렉터리에 ZIP 파일을 저장합니다.
  5. Terraform 스크립트에 다음 리소스 정의를 추가하고 <PROJECT_ID>를 프로젝트의 프로젝트 ID로 바꿉니다.
resource "google_storage_bucket" "cloud_function_bucket" {
  project        = <PROJECT_ID>
  name           = "<PROJECT_ID>-cloud-function-source-code"
  location       = "US"
  force_destroy  = true
  uniform_bucket_level_access = true
}

resource "google_storage_bucket_object" "cloud_function_source" {
  name   = "pubsub_function.zip"
  bucket = google_storage_bucket.cloud_function_bucket.name
  source = "./pubsub_function.zip"
}

resource "google_cloudfunctions_function" "pubsub_function" {
  project = <PROJECT_ID>
  name    = "pubsub-publisher"
  runtime = "python310"
  region  = "us-central1"

  available_memory_mb   = 128
  source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
  source_archive_object = "pubsub_function.zip"
  timeout               = 60
  entry_point           = "pubsub_publisher"
  trigger_http          = true
}

Cloud Run 함수 코드 매개변수 지정

콘솔

  1. 코드 단계의 런타임 필드에서 함수가 사용하는 언어 런타임을 선택합니다. 이 예시에서는 Python 3.10을 선택합니다.

  2. 진입점 필드에 pubsub_publisher를 입력합니다. 이는 Cloud Run 함수가 실행될 때 실행되는 코드입니다. 이 플래그의 값은 소스 코드에 있는 함수 이름 또는 정규화된 클래스 이름이어야 합니다.

Terraform

이 단계는 건너뜁니다. Cloud Run 함수 매개변수는 이미 google_cloudfunctions_function 리소스에 정의되어 있습니다.

Cloud Run 함수 코드 업로드

콘솔

소스 코드 필드에서 함수 소스 코드 제공 방법에 적합한 옵션을 선택합니다. 이 튜토리얼에서는 Cloud Run 함수 인라인 편집기를 사용하여 함수 코드를 추가합니다. 또는 ZIP 파일을 업로드하거나 Cloud Source Repositories를 사용할 수 있습니다.

  1. 다음 코드 예시를 main.py 파일에 넣습니다.
  2. <PROJECT_ID>를 해당 프로젝트의 프로젝트 ID로 바꿉니다. 예를 들면 example-project입니다.
from google.cloud import pubsub_v1

project = "<PROJECT_ID>"
topic = "dag-topic-trigger"


def pubsub_publisher(request):
    """Publish message from HTTP request to Pub/Sub topic.
    Args:
        request (flask.Request): HTTP request object.
    Returns:
        The response text with message published into Pub/Sub topic
        Response object using
        `make_response <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
    """
    request_json = request.get_json()
    print(request_json)
    if request.args and "message" in request.args:
        data_str = request.args.get("message")
    elif request_json and "message" in request_json:
        data_str = request_json["message"]
    else:
        return "Message content not found! Use 'message' key to specify"

    publisher = pubsub_v1.PublisherClient()
    # The `topic_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/topics/{topic_id}`
    topic_path = publisher.topic_path(project, topic)

    # The required data format is a bytestring
    data = data_str.encode("utf-8")
    # When you publish a message, the client returns a future.
    message_length = len(data_str)
    future = publisher.publish(topic_path, data, message_length=str(message_length))
    print(future.result())

    return f"Message {data} with message_length {message_length} published to {topic_path}."

Terraform

이 단계는 건너뜁니다. Cloud Run 함수 매개변수는 이미 google_cloudfunctions_function 리소스에 정의되어 있습니다.

Cloud Run 함수 종속 항목 지정

콘솔

requirements.txt 메타데이터 파일에 함수 종속 항목을 지정합니다.

requests-toolbelt==1.0.0
google-auth==2.19.1
google-cloud-pubsub==2.21.5

함수를 배포하면 Cloud Run 함수가 requirements.txt 파일에 선언된 종속 항목을 패키지당 한 줄로 다운로드하여 설치합니다. 이 파일은 함수 코드를 포함하는 main.py 파일과 동일한 디렉터리에 있어야 합니다. 자세한 내용은 pip 문서의 요구사항 파일을 참조하세요.

Terraform

이 단계는 건너뜁니다. Cloud Run 함수 종속 항목은 pubsub_function.zip 보관 파일의 requirements.txt 파일에 정의되어 있습니다.

Cloud Run 함수 배포

콘솔

배포를 클릭합니다. 배포가 성공적으로 완료되면 Google Cloud 콘솔의 Cloud Run 함수 페이지에서 함수가 녹색 체크표시로 표시됩니다.

Cloud Run 함수를 실행하는 서비스 계정에 프로젝트에서 Pub/Sub에 액세스할 수 있는 충분한 권한이 있는지 확인합니다.

Terraform

  1. Terraform을 초기화합니다.

    terraform init
    
  2. 구성을 검토하고 Terraform에서 만들거나 업데이트할 리소스가 예상과 일치하는지 확인합니다.

    terraform plan
    
  3. 구성이 유효한지 확인하려면 다음 명령어를 실행합니다.

    terraform validate
    
  4. 다음 명령어를 실행하고 프롬프트에 yes를 입력하여 Terraform 구성을 적용합니다.

    terraform apply
    

Terraform에 '적용 완료' 메시지가 표시될 때까지 기다립니다.

Google Cloud 콘솔에서 UI의 리소스로 이동하여 Terraform이 리소스를 만들었거나 업데이트했는지 확인합니다.

Cloud Run 함수 테스트

함수가 Pub/Sub 주제에 메시지를 게시하고 DAG 예시가 의도한 대로 작동하는지 확인하려면 다음 안내를 따르세요.

  1. DAG가 활성 상태인지 확인합니다.

    1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

      환경으로 이동

    2. 환경 목록에서 환경 이름을 클릭합니다. 환경 세부정보 페이지가 열립니다.

    3. DAG 탭으로 이동합니다.

    4. 상태 열에서 trigger_dagtarget_dag라는 DAG의 값을 확인합니다. 두 DAG는 모두 Active 상태여야 합니다.

  2. 테스트 Pub/Sub 메시지를 푸시합니다. 이 작업은 Cloud Shell에서 수행할 수 있습니다.

    1. Google Cloud 콘솔에서 Functions 페이지로 이동합니다.

      Cloud Run 함수로 이동

    2. 함수 이름 pubsub-publisher를 클릭합니다.

    3. 테스트 탭으로 이동합니다.

    4. 트리거 이벤트 구성 섹션에서 JSON 키-값 {"message": "target_dag"}를 입력합니다. 이 메시지는 나중에 테스트 DAG를 트리거하므로 키-값 쌍을 수정하지 마세요.

    5. 테스트 명령어 섹션에서 Cloud Shell에서 테스트를 클릭합니다.

    6. Cloud Shell 터미널에서 명령어가 자동으로 표시될 때까지 기다립니다. Enter를 눌러 이 명령어를 실행합니다.

    7. Cloud Shell 승인 메시지가 표시되면 승인을 클릭합니다.

    8. 메시지 콘텐츠가 Pub/Sub 메시지에 해당하는지 확인합니다. 이 예시에서 출력 메시지는 함수의 응답으로 Message b'target_dag' with message_length 10 published to로 시작해야 합니다.

  3. target_dag가 트리거되었는지 확인합니다.

    1. trigger_dag의 새 DAG 실행이 완료될 수 있도록 1분 이상 기다립니다.

    2. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

      환경으로 이동

    3. 환경 목록에서 환경 이름을 클릭합니다. 환경 세부정보 페이지가 열립니다.

    4. DAG 탭으로 이동합니다.

    5. trigger_dag를 클릭하여 DAG 세부정보 페이지로 이동합니다. 실행 탭에 trigger_dag DAG의 DAG 실행 목록이 표시됩니다.

      이 DAG는 1분마다 실행되며 함수에서 전송된 모든 Pub/Sub 메시지를 처리합니다. 메시지가 전송되지 않으면 DAG 태스크 로그에서 trigger_target 태스크가 Skipped로 표시됩니다. DAG가 트리거되면 trigger_target 태스크가 Success로 표시됩니다.

    6. 최근 여러 DAG 실행을 검토하여 세 가지 태스크(subscribe_task, pull_messages_operator, trigger_target)가 모두 Success 상태인 DAG 실행을 찾습니다.

    7. DAG 탭으로 돌아가서 target_dag DAG의 성공한 실행 열에 성공한 실행이 한 개 있는지 확인합니다.

요약

이 튜토리얼에서는 Cloud Run 함수를 사용하여 Pub/Sub 주제에 메시지를 게시하는 방법과 Pub/Sub 주제를 구독하고 Pub/Sub 메시지를 가져오고 메시지 데이터의 DAG ID에 지정된 다른 DAG를 트리거하는 DAG를 배포하는 방법을 학습했습니다.

Pub/Sub 구독 만들기 및 관리와 이 튜토리얼의 범위를 벗어나는 DAG 트리거를 위한 다른 방법도 있습니다. 예를 들어 지정된 이벤트가 발생할 때 Cloud Run 함수를 사용하여 Airflow DAG를 트리거할 수 있습니다. 튜토리얼을 참조하여 직접 다른 Google Cloud 기능을 사용해 보세요.

삭제

이 튜토리얼에서 사용된 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 리소스가 포함된 프로젝트를 삭제하거나 프로젝트를 유지하고 개별 리소스를 삭제하세요.

프로젝트 삭제

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

개별 리소스 삭제

여러 튜토리얼과 빠른 시작을 살펴보려는 경우 프로젝트를 재사용하면 프로젝트 할당량 한도 초과를 방지할 수 있습니다.

콘솔

  1. Cloud Composer 환경을 삭제합니다. 이 절차 중에 환경의 버킷도 삭제합니다.
  2. Pub/Sub 주제 dag-topic-trigger를 삭제합니다.
  3. Cloud Run 함수를 삭제합니다.

    1. Google Cloud 콘솔에서 Cloud Run 함수로 이동합니다.

      Cloud Run 함수로 이동

    2. 삭제할 함수 pubsub-publisher의 체크박스를 클릭합니다.

    3. 삭제를 클릭한 후 안내를 따릅니다.

Terraform

  1. 프로젝트에 여전히 필요한 리소스 항목이 Terraform 스크립트에 포함되어 있지 않은지 확인합니다. 예를 들어 일부 API를 사용하도록 설정하고 IAM 권한을 계속 할당된 상태로 유지할 수 있습니다(Terraform 스크립트에 이러한 정의를 추가한 경우).
  2. terraform destroy를 실행합니다.
  3. 환경의 버킷을 수동으로 삭제합니다. Cloud Composer는 자동으로 삭제하지 않습니다. 이 작업은 Google Cloud 콘솔 또는 Google Cloud CLI에서 수행할 수 있습니다.

다음 단계