Cloud Composer로 실행 예약

이 문서에서는 Cloud Composer 2를 사용하여 Dataform SQL 워크플로의 예약 실행을 실행하는 방법을 보여줍니다.

Cloud Composer 2를 사용하여 Dataform 실행을 예약할 수 있습니다. Dataform은 Cloud Composer 1을 지원하지 않습니다.

Cloud Composer 2로 Dataform 실행 일정을 관리하려면 Airflow 방향성 비순환 그래프(DAG)에서 Dataform 연산자를 사용할 수 있습니다. Dataform 워크플로 호출을 예약하는 Airflow DAG를 만들 수 있습니다.

Dataform은 다양한 Airflow 연산자를 제공합니다. 여기에는 컴파일 결과 가져오기, 워크플로 호출 가져오기, 워크플로 호출 취소를 위한 연산자가 포함됩니다. 사용 가능한 Dataform Airflow 연산자의 전체 목록을 보려면 Google Dataform 연산자를 참조하세요.

시작하기 전에

  1. Dataform 저장소를 만들거나 선택합니다.
  2. BigQuery에 Dataform에 대한 액세스 권한을 부여합니다.
  3. Dataform 작업공간을 만들거나 선택합니다.
  4. 테이블을 하나 이상 만듭니다.
  5. Cloud Composer 2 환경을 만듭니다.
    1. Cloud Composer 환경의 서비스 계정roles/composer.workerroles/dataform.editor 역할을 부여합니다.

google-cloud-dataform PyPI 패키지 설치

Cloud Composer 2 버전 2.0.25 이상을 사용하는 경우 이 패키지는 환경에 사전 설치되므로 설치하지 않아도 됩니다.

이전 버전의 Cloud Composer 2를 사용하는 경우 google-cloud-dataform PyPi 패키지를 설치합니다.

PyPI 패키지 섹션에서 ==0.2.0 버전을 지정합니다.

Dataform 워크플로 호출을 예약하는 Airflow DAG 만들기

Cloud Composer 2로 Dataform SQL 워크플로의 예약 실행을 관리하려면 Dataform Airflow 연산자를 사용하여 DAG를 작성한 다음 환경의 버킷에 업로드합니다.

다음 코드 샘플은 Dataform 컴파일 결과를 만들고 Dataform 워크플로 호출을 시작하는 Airflow DAG를 보여줍니다.

from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )

create_compilation_result >> create_workflow_invocation

다음을 바꿉니다.

  • PROJECT_ID: Dataform Google Cloud 프로젝트 ID
  • REPOSITORY_ID: Dataform 저장소의 이름
  • REGION: Dataform 저장소가 있는 리전
  • COMPILATION_RESULT: 이 워크플로 호출에 사용할 컴파일 결과의 이름
  • GIT_COMMITISH: 사용하려는 코드 버전의 원격 Git 저장소에 있는 Git commitish(예: 브랜치 또는 Git SHA)

다음 코드 샘플은 다음과 같은 Airflow DAG를 보여줍니다.

  1. Dataform 컴파일 결과를 만듭니다.
  2. 비동기 Dataform 워크플로 호출을 시작합니다.
  3. DataformWorkflowInvocationStateSensor를 사용하여 예상 상태가 될 때까지 워크플로 상태를 폴링합니다.
from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": COMPILATION_RESULT
    }
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)


create_compilation_result >> create_workflow_invocation

다음을 바꿉니다.

  • PROJECT_ID: Dataform Google Cloud 프로젝트 ID
  • REPOSITORY_ID: Dataform 저장소의 이름
  • REGION: Dataform 저장소가 있는 리전
  • COMPILATION_RESULT: 이 워크플로 호출에 사용할 컴파일 결과의 이름
  • GIT_COMMITISH: 사용하려는 코드 버전의 원격 Git 저장소에 있는 Git commitish(예: 브랜치 또는 Git SHA)
  • COMPILATION_RESULT: 이 워크플로 호출에 사용할 컴파일 결과의 이름

컴파일 구성 매개변수 추가

create_compilation_result Airflow DAG 객체에 추가 컴파일 구성 매개변수를 추가할 수 있습니다. 사용 가능한 매개변수에 대한 자세한 내용은 CodeCompilationConfig Dataform API 참조를 확인하세요.

  • create_compilation_result Airflow DAG 객체에 컴파일 구성 매개변수를 추가하려면 다음 형식으로 선택한 매개변수를 code_compilation_config에 추가합니다.
    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
            "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
        },
    )

다음을 바꿉니다.

  • PROJECT_ID: Dataform Google Cloud 프로젝트 ID
  • REPOSITORY_ID: Dataform 저장소의 이름
  • REGION: Dataform 저장소가 있는 리전
  • GIT_COMMITISH: 사용하려는 코드 버전의 원격 Git 저장소에 있는 Git commitish(예: 브랜치 또는 Git SHA)
  • PARAMETER: 선택한 CodeCompilationConfig 매개변수 여러 매개변수를 추가할 수 있습니다.
  • PARAMETER_VALUE: 선택한 매개변수의 값

다음 코드 샘플은 create_compilation_result Airflow DAG 객체에 추가된 defaultDatabase 매개변수를 보여줍니다.

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": REMOTE_BRANCH,
            "code_compilation_config": { "default_database": "my-custom-gcp-project"}
        },
    )

워크플로 호출 구성 매개변수 추가

워크플로 호출 구성 매개변수를 create_workflow_invocation Airflow DAG 객체에 더 추가할 수 있습니다. 사용 가능한 매개변수에 대한 자세한 내용은 InvocationConfig Dataform API 참조를 확인하세요.

  • create_workflow_invocation Airflow DAG 객체에 워크플로 구성 매개변수를 추가하려면 다음 형식으로 선택한 매개변수를 invocation_config에 추가합니다.
    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            
            "invocation_config": { "PARAMETER": PARAMETER_VALUE }
        },

    )

다음을 바꿉니다.

  • PROJECT_ID: Dataform Google Cloud 프로젝트 ID
  • REPOSITORY_ID: Dataform 저장소의 이름
  • REGION: Dataform 저장소가 있는 리전
  • PARAMETER: 선택한 InvocationConfig 매개변수. 여러 매개변수를 추가할 수 있습니다.
  • PARAMETER_VALUE: 선택한 매개변수의 값

다음 코드 샘플은 create_workflow_invocation Airflow DAG 객체에 추가된 includedTags[]transitiveDependenciesIncluded 매개변수를 보여줍니다.

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
        },
    )

다음 단계