Anda dapat menggunakan Cloud Composer 2 untuk menjadwalkan eksekusi Dataform. Formulir data tidak mendukung Cloud Composer 1.
Untuk mengelola jadwal eksekusi Dataform dengan Cloud Composer 2, Anda dapat menggunakan operator Dataform di Airflow Directed Acyclic Graphs (DAG). Anda dapat membuat DAG Airflow yang menjadwalkan pemanggilan alur kerja Dataform.
Dataform menyediakan berbagai operator Airflow. Ini mencakup operator untuk mendapatkan hasil kompilasi, mendapatkan pemanggilan alur kerja, dan membatalkan pemanggilan alur kerja. Untuk melihat daftar lengkap operator Dataform Airflow yang tersedia, lihat Operator Dataform Google.
Sebelum memulai
- Pilih atau buat repositori Dataform.
- Berikan akses Dataform ke BigQuery.
- Pilih atau buat Ruang kerja Dataform.
- Buat minimal satu tabel.
- Buat lingkungan Cloud Composer 2.
- Berikan peran roles/composer.worker dan roles/dataform.editor ke akun layanan lingkungan Cloud Composer.
Menginstal paket PyPi google-cloud-dataform
Jika Anda menggunakan Cloud Composer 2 versi 2.0.25
dan yang lebih baru, paket ini sudah diinstal di lingkungan Anda. Anda tidak perlu menginstalnya.
Jika Anda menggunakan Cloud Composer 2 versi sebelumnya, instal paket PyPi google-cloud-dataform
.
Di bagian paket PyPI, tentukan versi ==0.2.0
.
Membuat DAG Airflow yang menjadwalkan pemanggilan alur kerja Dataform
Untuk mengelola eksekusi terjadwal alur kerja Dataform SQL dengan Cloud Composer 2, tulis DAG menggunakan operator Dataform Airflow, lalu upload ke bucket lingkungan Anda.
Contoh kode berikut menunjukkan DAG Airflow yang membuat hasil kompilasi Dataform dan memulai pemanggilan alur kerja Dataform:
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
Ganti kode berikut:
- PROJECT_ID: ID Project Google Cloud Dataform Anda
- REPOSITORY_ID: nama repositori Dataform Anda
- REGION: wilayah tempat repositori Dataform berada
- COMPILATION_RESULT: nama hasil kompilasi yang ingin Anda gunakan untuk pemanggilan alur kerja ini
- GIT_COMMITISH: commitish Git, misalnya, cabang atau SHA Git, di repositori Git jarak jauh dari versi kode yang ingin Anda gunakan
Contoh kode berikut menunjukkan DAG Airflow yang:
- Membuat hasil kompilasi Dataform
- Memulai pemanggilan alur kerja Dataform asinkron.
- Melakukan polling status alur kerja hingga memasuki status yang diharapkan
menggunakan
DataformWorkflowInvocationStateSensor
.
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
Ganti kode berikut:
- PROJECT_ID: ID Project Google Cloud Dataform Anda
- REPOSITORY_ID: nama repositori Dataform Anda
- REGION: wilayah tempat repositori Dataform berada
- COMPILATION_RESULT: nama hasil kompilasi yang ingin Anda gunakan untuk pemanggilan alur kerja ini
- GIT_COMMITISH: commitish Git, misalnya, cabang atau SHA Git, di repositori Git jarak jauh dari versi kode yang ingin Anda gunakan
- COMPILATION_RESULT: nama hasil kompilasi yang ingin Anda gunakan untuk pemanggilan alur kerja ini
Menambahkan parameter konfigurasi kompilasi
Anda dapat menambahkan parameter konfigurasi kompilasi tambahan ke
objek DAG Airflow create_compilation_result
. Untuk mengetahui informasi selengkapnya tentang parameter yang tersedia, lihat Referensi CodeCompilationConfig
Dataform API.
- Untuk menambahkan parameter konfigurasi kompilasi ke objek DAG Airflow
create_compilation_result
, tambahkan parameter yang Anda pilih kecode_compilation_config
dalam format berikut:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
"code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
},
)
Ganti kode berikut:
- PROJECT_ID: ID Project Google Cloud Dataform Anda
- REPOSITORY_ID: nama repositori Dataform Anda
- REGION dengan wilayah tempat repositori Dataform berada
- GIT_COMMITISH: commitish Git, misalnya, cabang atau SHA Git, di repositori Git jarak jauh dari versi kode yang ingin Anda gunakan
- PARAMETER: parameter
CodeCompilationConfig
yang dipilih. Anda dapat menambahkan beberapa parameter. - PARAMETER_VALUE: nilai parameter yang dipilih
Contoh kode berikut menunjukkan parameter defaultDatabase
yang ditambahkan ke
objek DAG Airflow create_compilation_result
:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
Menambahkan parameter konfigurasi pemanggilan alur kerja
Anda dapat menambahkan parameter konfigurasi pemanggilan alur kerja tambahan ke
objek DAG Airflow create_workflow_invocation
. Untuk mengetahui informasi selengkapnya tentang parameter yang tersedia, lihat Referensi InvocationConfig
Dataform API.
- Untuk menambahkan parameter konfigurasi pemanggilan alur kerja ke objek DAG Airflow
create_workflow_invocation
, tambahkan parameter yang dipilih keinvocation_config
dalam format berikut:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "PARAMETER": PARAMETER_VALUE }
},
)
Ganti kode berikut:
- PROJECT_ID: ID Project Google Cloud Dataform Anda
- REPOSITORY_ID: nama repositori Dataform Anda
- REGION: region tempat repositori Dataform berada
- PARAMETER: parameter
InvocationConfig
yang dipilih. Anda dapat menambahkan beberapa parameter. - PARAMETER_VALUE: nilai parameter yang dipilih
Contoh kode berikut menunjukkan parameter includedTags[]
dan
transitiveDependenciesIncluded
yang ditambahkan ke
objek DAG Airflow create_workflow_invocation
:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
},
)
Langkah selanjutnya
- Untuk mempelajari cara mengonfigurasi penggantian kompilasi untuk eksekusi alur kerja, lihat Mengonfigurasi penggantian kompilasi.
- Untuk mempelajari Dataform API lebih lanjut, lihat Dataform API.
- Untuk mempelajari lingkungan Cloud Composer lebih lanjut, lihat Ringkasan Cloud Composer.
- Untuk mempelajari cara menjadwalkan eksekusi dengan Workflows dan Cloud Scheduler, lihat Menjadwalkan eksekusi dengan Workflows dan Cloud Scheduler.