이 문서에서는 Dataform에서 다음을 수행하는 방법을 보여줍니다.
시작하기 전에
워크플로 구성으로 실행을 예약하거나 워크플로 및 Cloud Scheduler로 실행을 예약하려면 다음을 실행해야 합니다.
Cloud Composer로 실행을 예약하려면 다음을 실행하세요.
- Dataform 저장소를 만들거나 선택합니다.
- BigQuery에 Dataform에 대한 액세스 권한을 부여합니다.
- Dataform 작업공간을 만들거나 선택합니다.
- 테이블을 하나 이상 만듭니다.
- Cloud Composer 2 환경을 만듭니다.
필요한 역할
이 문서의 태스크를 완료하는 데 필요한 권한을 얻으려면 관리자에게 다음 IAM 역할을 부여해 달라고 요청하세요.
-
저장소에 대한Dataform 관리자(
roles/dataform.admin
) -
저장소 및 Cloud Composer 환경의 서비스 계정에 대한 Dataform 편집자 (
roles/dataform.editor
) -
Cloud Composer 환경의 서비스 계정에 있는 Composer Worker (
roles/composer.worker
)
역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.
커스텀 역할이나 다른 사전 정의된 역할을 통해 필요한 권한을 얻을 수도 있습니다.
기본 Dataform 서비스 계정 이외의 서비스 계정을 사용하려면 커스텀 서비스 계정에 액세스 권한을 부여합니다.
워크플로 구성으로 실행 예약
이 섹션에서는 Dataform에서 워크플로 실행을 예약하고 구성하기 위해 워크플로 구성을 만드는 방법을 보여줍니다. 워크플로 구성을 사용하여 일정에 따라 Dataform 워크플로를 실행할 수 있습니다.
워크플로 구성 정보
BigQuery에서 모든 또는 선택된 워크플로 작업에 대해 Dataform 실행을 예약하려면 워크플로 구성을 만들면 됩니다. 워크플로 구성에서 컴파일 출시 구성을 선택하고 실행할 워크플로 작업을 선택한 후 실행 일정을 설정합니다.
그러면 워크플로 구성 예약 실행 중에 Dataform에서 출시 구성의 최신 컴파일 결과에서 선택한 작업을 BigQuery에 배포합니다. Dataform API workflowConfigs를 사용하여 워크플로 구성 실행을 수동으로 트리거할 수도 있습니다.
Dataform 워크플로 구성에는 다음과 같은 실행 설정이 포함됩니다.
- 워크플로 구성의 ID입니다.
- 출시 구성
서비스 계정
워크플로 구성과 연결된 서비스 계정입니다. 기본 Dataform 서비스 계정 또는 Google Cloud 프로젝트와 연결된 서비스 계정을 선택하거나 다른 서비스 계정을 직접 입력할 수 있습니다. 기본적으로 워크플로 구성에는 저장소와 동일한 서비스 계정이 사용됩니다.
실행할 워크플로 작업:
- 모든 작업
- 작업 선택
- 태그 선택
실행 일정 및 시간대
워크플로 구성 만들기
Dataform 워크플로 구성을 만들려면 다음 단계를 따르세요.
- 저장소에서 출시 및 일정으로 이동합니다.
- 워크플로 구성 섹션에서 만들기를 클릭합니다.
워크플로 구성 만들기 창에서 구성 ID 필드에 워크플로 구성의 고유 ID를 입력합니다.
ID에는 숫자, 문자, 하이픈, 밑줄만 포함할 수 있습니다.
출시 구성 메뉴에서 컴파일 출시 구성을 선택합니다.
선택사항: 빈도 필드에 unix-cron 형식으로 실행 빈도를 입력합니다.
Dataform이 해당 출시 구성에서 최신 컴파일 결과를 실행하도록 하려면 컴파일 결과 생성 시간과 예약된 실행 시간 사이의 공백이 최소 1시간 이상 유지되도록 합니다.
서비스 계정 메뉴에서 워크플로 구성의 서비스 계정을 선택합니다.
메뉴에서 기본 Dataform 서비스 계정 또는 액세스 권한이 있는 Google Cloud 프로젝트와 연결된 서비스 계정을 선택할 수 있습니다. 서비스 계정을 선택하지 않으면 워크플로 구성에서 저장소 서비스 계정을 사용합니다.
선택사항: 시간대 메뉴에서 실행 시간대를 선택합니다.
기본 시간대는 UTC입니다.
실행할 워크플로 작업을 선택합니다.
- 전체 워크플로를 실행하려면 모든 작업을 클릭합니다.
- 워크플로에서 선택한 작업을 실행하려면 작업 선택을 클릭한 다음 작업을 선택합니다.
- 선택한 태그로 작업을 실행하려면 태그 선택을 클릭한 다음 태그를 선택합니다.
- 선택사항: 선택한 작업 또는 태그 및 종속 항목을 실행하려면 종속 항목 포함 옵션을 선택합니다.
- 선택사항: 선택한 작업 또는 태그 및 종속자를 실행하려면 종속자 포함 옵션을 선택합니다.
- 선택사항: 모든 테이블을 처음부터 다시 빌드하려면 전체 새로고침으로 실행 옵션을 선택합니다.
이 옵션이 없으면 Dataform은 증분 테이블을 처음부터 다시 빌드하지 않고 업데이트합니다.
만들기를 클릭합니다.
예를 들어 다음 워크플로 구성은 1시간(CEST 시간대)마다 hourly
태그가 있는 작업을 실행합니다.
- 구성 ID:
production-hourly
- 출시 구성: -
- 빈도:
0 * * * *
- 시간대:
Central European Summer Time (CEST)
- 워크플로 작업 선택: 태그 선택,
hourly
태그
워크플로 구성 수정
워크플로 구성을 수정하려면 다음 단계를 따르세요.
- 저장소에서 출시 및 일정으로 이동합니다.
- 수정하려는 워크플로 구성 옆에 있는 더보기 메뉴를 클릭한 다음 수정을 클릭합니다.
- 워크플로 구성 수정 창에서 출시 구성 설정을 수정한 후 저장을 클릭합니다.
워크플로 구성 삭제
워크플로 구성을 삭제하려면 다음 단계를 따르세요.
- 저장소에서 출시 및 일정으로 이동합니다.
- 삭제하려는 워크플로 구성 옆에 있는 더보기 메뉴를 클릭한 다음 삭제를 클릭합니다.
- 출시 구성 삭제 대화상자에서 삭제를 클릭합니다.
Workflows 및 Cloud Scheduler로 실행 예약
이 섹션에서는 Workflows 및 Cloud Scheduler를 사용하여 Dataform 워크플로 실행을 예약하는 방법을 보여줍니다.
예약된 워크플로 실행 정보
Workflows 워크플로를 트리거하는 Cloud Scheduler 작업을 만들어 Dataform 워크플로 실행 빈도를 설정할 수 있습니다. Workflows는 사용자가 정의한 조정 워크플로에서 서비스를 실행합니다.
Workflows는 데이터 양식 워크플로를 2단계 프로세스로 실행합니다. 먼저 Git 제공업체에서 Dataform 저장소 코드를 가져와서 컴파일 결과로 컴파일합니다. 그런 다음 컴파일 결과를 사용하여 Dataform 워크플로를 만들고 설정한 빈도로 실행합니다.
예약된 조정 워크플로 만들기
Dataform 워크플로 실행을 예약하려면 Workflows를 사용하여 조정 워크플로를 만들고 Cloud Scheduler 작업을 트리거로 추가합니다.
Workflows는 서비스 계정을 사용하여 워크플로에Google Cloud 리소스에 대한 액세스 권한을 부여합니다. 서비스 계정을 만들고 Dataform 편집자 (
roles/dataform.editor
) Identity and Access Management 역할과 오케스트레이션 워크플로를 관리하는 데 필요한 최소 권한을 부여합니다. 자세한 내용은 워크플로에 리소스에 대한 액세스 권한 부여를 참고하세요. Google Cloud조정 워크플로를 만들고 다음 YAML 소스 코드를 워크플로 정의로 사용합니다.
main: steps: - init: assign: - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID - createCompilationResult: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"} auth: type: OAuth2 body: gitCommitish: GIT_COMMITISH result: compilationResult - createWorkflowInvocation: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"} auth: type: OAuth2 body: compilationResult: ${compilationResult.body.name} result: workflowInvocation - complete: return: ${workflowInvocation.body.name}
다음을 바꿉니다.
- PROJECT_ID: Google Cloud 프로젝트의 ID입니다.
- REPOSITORY_LOCATION: Dataform 저장소의 위치
- REPOSITORY_ID: Dataform 저장소의 이름입니다.
- GIT_COMMITISH: Dataform 코드를 실행할 Git 브랜치입니다. 새로 만든 저장소의 경우
main
로 바꿉니다.
Dataform 워크플로 컴파일 결과 생성 요청 맞춤설정
기존 조정 워크플로를 업데이트하고 YAML 형식으로 Dataform 워크플로 만들기 컴파일 결과 요청 설정을 정의할 수 있습니다. 설정에 관한 자세한 내용은 projects.locations.repositories.compilationResults
REST 리소스 참조를 참고하세요.
예를 들어 컴파일 중에 모든 작업에 _dev
schemaSuffix
설정을 추가하려면 createCompilationResult
단계 본문을 다음 코드 스니펫으로 바꿉니다.
- createCompilationResult:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
auth:
type: OAuth2
body:
gitCommitish: GIT_COMMITISH
codeCompilationConfig:
schemaSuffix: dev
또한 Workflows 실행 요청에서 추가 설정을 런타임 인수로 전달하고 변수를 사용하여 이러한 인수에 액세스할 수 있습니다. 자세한 내용은 실행 요청에서 런타임 인수 전달을 참조하세요.
Dataform 워크플로 호출 요청 맞춤설정
기존 조정 워크플로를 업데이트하고 YAML 형식으로 Dataform 워크플로 호출 요청 설정을 정의할 수 있습니다. 호출 요청 설정에 관한 자세한 내용은 projects.locations.repositories.workflowInvocations
REST 리소스 참조를 참고하세요.
예를 들어 모든 전이 종속 항목이 포함된 hourly
태그로 작업만 실행하려면 createWorkflowInvocation
본문을 다음 코드 스니펫으로 바꿉니다.
- createWorkflowInvocation:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
auth:
type: OAuth2
body:
compilationResult: ${compilationResult.body.name}
invocationConfig:
includedTags:
- hourly
transitiveDependenciesIncluded: true
또한 Workflows 실행 요청에서 추가 설정을 런타임 인수로 전달하고 변수를 사용하여 이러한 인수에 액세스할 수 있습니다. 자세한 내용은 실행 요청에서 런타임 인수 전달을 참조하세요.
Cloud Composer로 실행 예약
Cloud Composer 2를 사용하여 Dataform 실행을 예약할 수 있습니다. Dataform은 Cloud Composer 1을 지원하지 않습니다.
Cloud Composer 2로 Dataform 실행 일정을 관리하려면 Airflow 방향성 비순환 그래프 (DAG)에서 Dataform 연산자를 사용할 수 있습니다. Dataform 워크플로 호출을 예약하는 Airflow DAG를 만들 수 있습니다.
Dataform은 다양한 Airflow 연산자를 제공합니다. 여기에는 컴파일 결과를 가져오고, 워크플로 호출을 가져오고, 워크플로 호출을 취소하는 연산자가 포함됩니다. 사용 가능한 Dataform Airflow 연산자의 전체 목록을 보려면 Google Dataform 연산자를 참고하세요.
google-cloud-dataform
PyPI 패키지 설치
Cloud Composer 2 버전 2.0.25
이상을 사용하는 경우 이 패키지는 환경에 사전 설치되므로 설치할 필요가 없습니다.
이전 버전의 Cloud Composer 2를 사용하는 경우 google-cloud-dataform
PyPi 패키지를 설치합니다.
PyPI 패키지 섹션에서 버전 ==0.2.0
를 지정합니다.
Dataform 워크플로 호출을 예약하는 Airflow DAG 만들기
Cloud Composer 2로 Dataform 워크플로의 예약된 실행을 관리하려면 Dataform Airflow 연산자를 사용하여 DAG를 작성한 다음 환경의 버킷에 업로드합니다.
다음 코드 샘플은 Dataform 컴파일 결과를 만들고 Dataform 워크플로 호출을 시작하는 Airflow DAG를 보여줍니다.
from datetime import datetime
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
다음을 바꿉니다.
- PROJECT_ID: Dataform Google Cloud 프로젝트 ID입니다.
- REPOSITORY_ID: Dataform 저장소의 이름입니다.
- REGION: Dataform 저장소가 있는 리전입니다.
- COMPILATION_RESULT: 이 워크플로 호출에 사용할 컴파일 결과의 이름입니다.
- GIT_COMMITISH: 사용하려는 코드 버전의 원격 Git 저장소에 있는 Git commitish(예: 브랜치 또는 Git SHA)
다음 코드 샘플은 다음을 실행하는 Airflow DAG를 보여줍니다.
- Dataform 컴파일 결과를 만듭니다.
- 비동기 Dataform 워크플로 호출을 시작합니다.
DataformWorkflowInvocationStateSensor
를 사용하여 예상 상태가 될 때까지 워크플로의 상태를 폴링합니다.
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
다음을 바꿉니다.
- PROJECT_ID: Dataform Google Cloud 프로젝트 ID입니다.
- REPOSITORY_ID: Dataform 저장소의 이름입니다.
- REGION: Dataform 저장소가 있는 리전입니다.
- COMPILATION_RESULT: 이 워크플로 호출에 사용할 컴파일 결과의 이름입니다.
- GIT_COMMITISH: 사용하려는 코드 버전의 원격 Git 저장소에 있는 Git commitish(예: 브랜치 또는 Git SHA)
- COMPILATION_RESULT: 이 워크플로 호출에 사용할 컴파일 결과의 이름입니다.
컴파일 구성 매개변수 추가
컴파일 구성 매개변수를 create_compilation_result
Airflow DAG 객체에 더 추가할 수 있습니다. 사용 가능한 매개변수에 관한 자세한 내용은 CodeCompilationConfig
Dataform API 참조를 참고하세요.
create_compilation_result
Airflow DAG 객체에 컴파일 구성 매개변수를 추가하려면 다음 형식으로 선택한 매개변수를code_compilation_config
필드에 추가합니다.create_compilation_result = DataformCreateCompilationResultOperator( task_id="create_compilation_result", project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, compilation_result={ "git_commitish": GIT_COMMITISH, "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"} }, )
다음을 바꿉니다.
- PROJECT_ID: Dataform Google Cloud 프로젝트 ID입니다.
- REPOSITORY_ID: Dataform 저장소의 이름입니다.
- REGION: Dataform 저장소가 있는 리전입니다.
- GIT_COMMITISH: 사용하려는 코드 버전의 원격 Git 저장소에 있는 Git commitish(예: 브랜치 또는 Git SHA)
- PARAMETER: 선택한
CodeCompilationConfig
매개변수입니다. 매개변수를 여러 개 추가할 수 있습니다. - PARAMETER_VALUE: 선택한 매개변수의 값입니다.
다음 코드 샘플은 create_compilation_result
Airflow DAG 객체에 추가된 defaultDatabase
매개변수를 보여줍니다.
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
워크플로 호출 구성 매개변수 추가
워크플로 호출 구성 매개변수를 create_workflow_invocation
Airflow DAG 객체에 더 추가할 수 있습니다. 사용 가능한 매개변수에 관한 자세한 내용은 InvocationConfig
Dataform API 참조를 참고하세요.
create_workflow_invocation
Airflow DAG 객체에 워크플로 구성 매개변수를 추가하려면 다음 형식으로 선택한 매개변수를invocation_config
필드에 추가합니다.create_workflow_invocation = DataformCreateWorkflowInvocationOperator( task_id='create_workflow_invocation', project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, workflow_invocation={ "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}", "invocation_config": { "PARAMETER": PARAMETER_VALUE } }, )
다음을 바꿉니다.
- PROJECT_ID: Dataform Google Cloud 프로젝트 ID입니다.
- REPOSITORY_ID: Dataform 저장소의 이름입니다.
- REGION: Dataform 저장소가 있는 리전입니다.
- PARAMETER: 선택한
InvocationConfig
매개변수입니다. 매개변수를 여러 개 추가할 수 있습니다. - PARAMETER_VALUE: 선택한 매개변수의 값입니다.
다음 코드 샘플은 create_workflow_invocation
Airflow DAG 객체에 추가된 includedTags[]
및 transitiveDependenciesIncluded
매개변수를 보여줍니다.
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
},
)
다음 단계
- Dataform 컴파일 출시 구성을 구성하는 방법은 출시 구성 만들기를 참조하세요.
- Dataform의 코드 수명 주기에 대한 자세한 내용은 Dataform의 코드 수명 주기 소개를 참조하세요.
- Dataform API에 대한 자세한 내용은 Dataform API를 참조하세요.
- Cloud Composer 환경에 대해 자세히 알아보려면 Cloud Composer 개요를 참고하세요.
- Workflows 가격 책정에 대한 자세한 내용은 Workflows 가격 책정을 참고하세요.