Dokumen ini menunjukkan cara melakukan hal berikut di Dataform:
- Menjadwalkan operasi dengan konfigurasi alur kerja.
- Menjadwalkan operasi dengan Workflows dan Cloud Scheduler.
- Menjadwalkan operasi dengan Cloud Composer.
Sebelum memulai
Untuk menjadwalkan operasi dengan konfigurasi alur kerja atau menjadwalkan operasi dengan alur kerja dan Cloud Scheduler, pastikan Anda melakukan hal berikut:
Di konsol Google Cloud, buka halaman Dataform.
Pilih atau buat repositori.
Buat konfigurasi rilis.
Untuk menjadwalkan operasi dengan Cloud Composer, pastikan Anda melakukan hal berikut:
- Pilih atau buat repositori Dataform.
- Berikan akses Dataform ke BigQuery.
- Pilih atau buat ruang kerja Dataform.
- Buat minimal satu tabel.
- Buat lingkungan Cloud Composer 2.
Peran yang diperlukan
Untuk mendapatkan izin yang Anda perlukan untuk menyelesaikan tugas dalam dokumen ini, minta administrator Anda untuk memberi Anda peran IAM berikut:
-
Dataform Admin (
roles/dataform.admin
) di repositori -
Editor Dataform (
roles/dataform.editor
) di repositori dan akun layanan lingkungan Cloud Composer Anda -
Pekerja Composer (
roles/composer.worker
) di akun layanan lingkungan Cloud Composer
Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.
Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus atau peran bawaan lainnya.
Untuk menggunakan akun layanan selain akun layanan Dataform default, berikan akses ke akun layanan kustom.
Menjadwalkan operasi dengan konfigurasi alur kerja
Bagian ini menunjukkan cara membuat konfigurasi alur kerja di Dataform untuk menjadwalkan dan mengonfigurasi pengoperasian alur kerja. Anda dapat menggunakan konfigurasi alur kerja untuk menjalankan alur kerja Dataform sesuai jadwal.
Tentang konfigurasi alur kerja
Untuk menjadwalkan pengoperasian Dataform dari semua atau tindakan alur kerja yang dipilih di BigQuery, Anda dapat membuat konfigurasi alur kerja. Dalam konfigurasi alur kerja, Anda memilih konfigurasi rilis kompilasi, memilih tindakan alur kerja untuk dieksekusi, dan menetapkan jadwal operasi.
Kemudian, selama konfigurasi alur kerja Anda dijalankan sesuai jadwal, Dataform akan men-deploy pilihan tindakan Anda dari hasil kompilasi terbaru dalam konfigurasi rilis ke BigQuery. Anda juga dapat memicu pengoperasian konfigurasi alur kerja secara manual dengan workflowConfigs Dataform API.
Konfigurasi alur kerja Dataform berisi setelan jalankan berikut:
- ID konfigurasi alur kerja.
- Konfigurasi rilis.
Akun layanan.
Ini adalah akun layanan yang terkait dengan konfigurasi alur kerja. Anda dapat memilih akun layanan Dataform default atau akun layanan yang terkait dengan project Google Cloud Anda, atau Anda dapat memasukkan akun layanan lain secara manual. Secara default, konfigurasi alur kerja menggunakan akun layanan yang sama dengan repositori-nya.
Tindakan alur kerja yang akan dijalankan:
- Semua tindakan.
- Pemilihan tindakan.
- Pilihan tag.
Jadwal dan zona waktu operasi.
Membuat konfigurasi alur kerja
Untuk membuat konfigurasi alur kerja Dataform, ikuti langkah-langkah berikut:
- Di repositori Anda, buka Rilis & Penjadwalan.
- Di bagian Konfigurasi alur kerja, klik Buat.
Di panel Create workflow configuration, di kolom Configuration ID, masukkan ID unik untuk konfigurasi alur kerja.
ID hanya boleh berisi angka, huruf, tanda hubung, dan garis bawah.
Di menu Konfigurasi rilis, pilih konfigurasi rilis kompilasi.
Opsional: Di kolom Frequency, masukkan frekuensi operasi dalam format unix-cron.
Untuk memastikan Dataform mengeksekusi hasil kompilasi terbaru dalam konfigurasi rilis yang sesuai, pertahankan jeda minimum satu jam antara waktu pembuatan hasil kompilasi dan waktu eksekusi terjadwal.
Di menu Service account, pilih akun layanan untuk konfigurasi alur kerja.
Di menu, Anda dapat memilih akun layanan Dataform default atau akun layanan apa pun yang terkait dengan project Google Cloud yang aksesnya Anda miliki. Jika Anda tidak memilih akun layanan, konfigurasi alur kerja akan menggunakan akun layanan repositori.
Opsional: Di menu Zona waktu, pilih zona waktu untuk dijalankan.
Zona waktu default adalah UTC.
Pilih tindakan alur kerja yang akan dijalankan:
- Untuk menjalankan seluruh alur kerja, klik Semua tindakan.
- Untuk menjalankan tindakan yang dipilih dalam alur kerja, klik Pemilihan tindakan, lalu pilih tindakan.
- Untuk menjalankan tindakan dengan tag yang dipilih, klik Pemilihan tag, lalu pilih tag.
- Opsional: Untuk menjalankan tindakan atau tag yang dipilih dan dependensinya, pilih opsi Sertakan dependensi.
- Opsional: Untuk menjalankan tindakan atau tag yang dipilih dan dependensinya, pilih opsi Sertakan dependensi.
- Opsional: Untuk mem-build ulang semua tabel dari awal, pilih opsi Jalankan dengan pembaruan penuh.
Tanpa opsi ini, Dataform akan memperbarui tabel inkremental tanpa membangun ulang dari awal.
Klik Buat.
Misalnya, konfigurasi alur kerja berikut mengeksekusi tindakan dengan tag hourly
setiap jam di zona waktu CEST:
- ID Konfigurasi:
production-hourly
- Konfigurasi rilis: -
- Frekuensi:
0 * * * *
- Zona waktu:
Central European Summer Time (CEST)
- Pemilihan tindakan alur kerja: pemilihan tag, tag
hourly
Mengedit konfigurasi alur kerja
Untuk mengedit konfigurasi alur kerja, ikuti langkah-langkah berikut:
- Di repositori Anda, buka Rilis & Penjadwalan.
- Di konfigurasi alur kerja yang ingin diedit, klik menu Lainnya, lalu klik Edit.
- Di panel Edit workflow configuration, edit setelan konfigurasi rilis, lalu klik Save.
Menghapus konfigurasi alur kerja
Untuk menghapus konfigurasi alur kerja, ikuti langkah-langkah berikut:
- Di repositori Anda, buka Rilis & Penjadwalan.
- Di samping konfigurasi alur kerja yang ingin Anda hapus, klik menu More, lalu klik Delete.
- Pada dialog Hapus konfigurasi rilis, klik Hapus.
Menjadwalkan operasi dengan Workflows dan Cloud Scheduler
Bagian ini menunjukkan cara menjadwalkan eksekusi alur kerja Dataform menggunakan Workflows dan Cloud Scheduler.
Tentang operasi alur kerja terjadwal
Anda dapat menetapkan frekuensi alur kerja Dataform yang berjalan dengan membuat tugas Cloud Scheduler yang memicu alur kerja Alur Kerja. Alur kerja mengeksekusi layanan dalam alur kerja orkestrasi yang Anda tentukan.
Alur kerja menjalankan alur kerja Dataform Anda dalam proses dua langkah. Pertama, alat ini mengambil kode repositori Dataform dari penyedia Git dan mengompilasikannya menjadi hasil kompilasi. Kemudian, alat ini menggunakan hasil kompilasi untuk membuat alur kerja Dataform dan menjalankannya pada frekuensi yang Anda tetapkan.
Membuat alur kerja orkestrasi terjadwal
Untuk menjadwalkan eksekusi alur kerja Dataform, gunakan Alur kerja untuk membuat alur kerja orkestrasi dan tambahkan tugas Cloud Scheduler sebagai pemicu.
Alur kerja menggunakan akun layanan untuk memberi alur kerja akses ke resourceGoogle Cloud . Buat akun layanan dan berikan peran Identity and Access Management Dataform Editor (
roles/dataform.editor
) serta izin minimum yang diperlukan untuk mengelola alur kerja orkestrasi Anda. Untuk mengetahui informasi selengkapnya, lihat Memberikan izin alur kerja untuk mengakses Google Cloud resource.Buat alur kerja orkestrasi dan gunakan kode sumber YAML berikut sebagai definisi alur kerja Anda:
main: steps: - init: assign: - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID - createCompilationResult: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"} auth: type: OAuth2 body: gitCommitish: GIT_COMMITISH result: compilationResult - createWorkflowInvocation: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"} auth: type: OAuth2 body: compilationResult: ${compilationResult.body.name} result: workflowInvocation - complete: return: ${workflowInvocation.body.name}
Ganti kode berikut:
- PROJECT_ID: ID Google Cloud project Anda.
- REPOSITORY_LOCATION: lokasi repositori Dataform Anda.
- REPOSITORY_ID: nama repositori Dataform Anda.
- GIT_COMMITISH: cabang Git tempat Anda ingin menjalankan kode Dataform. Untuk repositori yang baru dibuat, ganti dengan
main
.
Jadwalkan alur kerja orkestrasi menggunakan Cloud Scheduler.
Menyesuaikan permintaan hasil kompilasi pembuatan alur kerja Dataform
Anda dapat
memperbarui alur kerja orkestrasi yang ada
dan menentukan setelan permintaan hasil
kompilasi pembuatan alur kerja Dataform dalam format YAML. Untuk mengetahui informasi selengkapnya tentang setelan,
lihat
referensi resource REST projects.locations.repositories.compilationResults
.
Misalnya, untuk menambahkan setelan _dev
schemaSuffix
ke semua tindakan selama kompilasi,
ganti isi langkah createCompilationResult
dengan cuplikan kode berikut:
- createCompilationResult:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
auth:
type: OAuth2
body:
gitCommitish: GIT_COMMITISH
codeCompilationConfig:
schemaSuffix: dev
Anda juga dapat meneruskan setelan tambahan sebagai argumen runtime dalam permintaan eksekusi Alur Kerja dan mengakses argumen tersebut menggunakan variabel. Untuk informasi selengkapnya, lihat Teruskan argumen runtime dalam permintaan eksekusi.
Menyesuaikan permintaan pemanggilan alur kerja Dataform
Anda dapat
memperbarui alur kerja orkestrasi yang ada
dan menentukan setelan permintaan pemanggilan alur kerja Dataform dalam
format YAML. Untuk informasi selengkapnya tentang setelan permintaan pemanggilan,
lihat
referensi resource REST projects.locations.repositories.workflowInvocations
.
Misalnya, untuk hanya mengeksekusi tindakan dengan tag hourly
dengan semua
dependensi transitif yang disertakan, ganti isi createWorkflowInvocation
dengan cuplikan kode berikut:
- createWorkflowInvocation:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
auth:
type: OAuth2
body:
compilationResult: ${compilationResult.body.name}
invocationConfig:
includedTags:
- hourly
transitiveDependenciesIncluded: true
Anda juga dapat meneruskan setelan tambahan sebagai argumen runtime dalam permintaan eksekusi Alur Kerja dan mengakses argumen tersebut menggunakan variabel. Untuk informasi selengkapnya, lihat Teruskan argumen runtime dalam permintaan eksekusi.
Menjadwalkan operasi dengan Cloud Composer
Anda dapat menggunakan Cloud Composer 2 untuk menjadwalkan pengoperasian Dataform. Dataform tidak mendukung Cloud Composer 1.
Untuk mengelola jadwal pengoperasian Dataform dengan Cloud Composer 2, Anda dapat menggunakan operator Dataform di Directed Acyclic Graph (DAG) Airflow. Anda dapat membuat DAG Airflow yang menjadwalkan pemanggilan alur kerja Dataform.
Dataform menyediakan berbagai operator Airflow. Ini mencakup operator untuk mendapatkan hasil kompilasi, mendapatkan pemanggilan alur kerja, dan membatalkan pemanggilan alur kerja. Untuk melihat daftar lengkap operator Dataform Airflow yang tersedia, lihat Operator Dataform Google.
Menginstal paket PyPi google-cloud-dataform
Jika Anda menggunakan Cloud Composer 2 versi 2.0.25
dan yang lebih baru, paket ini sudah diinstal sebelumnya di lingkungan Anda. Anda tidak perlu menginstalnya.
Jika Anda menggunakan Cloud Composer 2 versi sebelumnya,
instal paket PyPi google-cloud-dataform
.
Di bagian paket PyPI, tentukan versi ==0.2.0
.
Membuat DAG Airflow yang menjadwalkan pemanggilan alur kerja Dataform
Untuk mengelola operasi terjadwal alur kerja Dataform dengan Cloud Composer 2, tulis DAG menggunakan operator Airflow Dataform, lalu upload ke bucket lingkungan Anda.
Contoh kode berikut menunjukkan DAG Airflow yang membuat hasil kompilasi Dataform dan memulai pemanggilan alur kerja Dataform:
from datetime import datetime
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
Ganti kode berikut:
- PROJECT_ID: project ID Google Cloud Dataform Anda.
- REPOSITORY_ID: nama repositori Dataform Anda.
- REGION: region tempat repositori Dataform berada.
- COMPILATION_RESULT: nama hasil kompilasi yang ingin Anda gunakan untuk pemanggilan alur kerja ini.
- GIT_COMMITISH: commitish Git di repositori Git jarak jauh dari versi kode yang ingin Anda gunakan—misalnya, cabang atau SHA Git.
Contoh kode berikut menunjukkan DAG Airflow yang melakukan hal berikut:
- Membuat hasil kompilasi Dataform.
- Memulai pemanggilan alur kerja Dataform asinkron.
- Melakukan polling status alur kerja Anda hingga memasuki status yang diharapkan
menggunakan
DataformWorkflowInvocationStateSensor
.
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
Ganti kode berikut:
- PROJECT_ID: projectID Google Cloud Dataform Anda.
- REPOSITORY_ID: nama repositori Dataform Anda.
- REGION: region tempat repositori Dataform berada.
- COMPILATION_RESULT: nama hasil kompilasi yang ingin Anda gunakan untuk pemanggilan alur kerja ini.
- GIT_COMMITISH: commitish Git di repositori Git jarak jauh dari versi kode yang ingin Anda gunakan—misalnya, cabang atau SHA Git.
- COMPILATION_RESULT: nama hasil kompilasi yang ingin Anda gunakan untuk pemanggilan alur kerja ini.
Menambahkan parameter konfigurasi kompilasi
Anda dapat menambahkan parameter konfigurasi kompilasi tambahan ke objek DAG Airflow create_compilation_result
. Untuk mengetahui informasi selengkapnya tentang
parameter yang tersedia, lihat
referensi API Dataform CodeCompilationConfig
.
Untuk menambahkan parameter konfigurasi kompilasi ke objek DAG Airflow
create_compilation_result
, tambahkan parameter yang dipilih ke kolomcode_compilation_config
dalam format berikut:create_compilation_result = DataformCreateCompilationResultOperator( task_id="create_compilation_result", project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, compilation_result={ "git_commitish": GIT_COMMITISH, "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"} }, )
Ganti kode berikut:
- PROJECT_ID: project ID Google Cloud Dataform Anda.
- REPOSITORY_ID: nama repositori Dataform Anda.
- REGION: region tempat repositori Dataform berada.
- GIT_COMMITISH: commitish Git di repositori Git jarak jauh dari versi kode yang ingin Anda gunakan—misalnya, cabang atau SHA Git.
- PARAMETER: parameter
CodeCompilationConfig
yang dipilih. Anda dapat menambahkan beberapa parameter. - PARAMETER_VALUE: nilai parameter yang dipilih.
Contoh kode berikut menunjukkan parameter defaultDatabase
yang ditambahkan ke
objek DAG Airflow create_compilation_result
:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
Menambahkan parameter konfigurasi pemanggilan alur kerja
Anda dapat menambahkan parameter konfigurasi pemanggilan alur kerja tambahan ke
objek DAG Airflow create_workflow_invocation
. Untuk mengetahui informasi selengkapnya tentang
parameter yang tersedia, lihat
referensi API Dataform InvocationConfig
.
Untuk menambahkan parameter konfigurasi pemanggilan alur kerja ke objek DAG Airflow
create_workflow_invocation
, tambahkan parameter yang dipilih ke kolominvocation_config
dalam format berikut:create_workflow_invocation = DataformCreateWorkflowInvocationOperator( task_id='create_workflow_invocation', project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, workflow_invocation={ "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}", "invocation_config": { "PARAMETER": PARAMETER_VALUE } }, )
Ganti kode berikut:
- PROJECT_ID: project ID Google Cloud Dataform Anda.
- REPOSITORY_ID: nama repositori Dataform Anda.
- REGION: region tempat repositori Dataform berada.
- PARAMETER: parameter
InvocationConfig
yang dipilih. Anda dapat menambahkan beberapa parameter. - PARAMETER_VALUE: nilai parameter yang dipilih.
Contoh kode berikut menunjukkan parameter includedTags[]
dan
transitiveDependenciesIncluded
yang ditambahkan ke
objek DAG Airflow create_workflow_invocation
:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
},
)
Langkah berikutnya
- Untuk mempelajari cara mengonfigurasi konfigurasi rilis kompilasi Dataform, lihat Membuat konfigurasi rilis.
- Untuk mempelajari siklus proses kode Dataform lebih lanjut, lihat Pengantar siklus proses kode di Dataform.
- Untuk mempelajari Dataform API lebih lanjut, lihat Dataform API.
- Untuk mempelajari lingkungan Cloud Composer lebih lanjut, lihat Ringkasan Cloud Composer.
- Untuk mempelajari harga Workflows lebih lanjut, lihat Harga Workflows.