Menjadwalkan operasi

Dokumen ini menunjukkan cara melakukan hal berikut di Dataform:

Sebelum memulai

Untuk menjadwalkan operasi dengan konfigurasi alur kerja atau menjadwalkan operasi dengan alur kerja dan Cloud Scheduler, pastikan Anda melakukan hal berikut:

  1. Di konsol Google Cloud, buka halaman Dataform.

    Buka Dataform

  2. Pilih atau buat repositori.

  3. Buat konfigurasi rilis.

Untuk menjadwalkan operasi dengan Cloud Composer, pastikan Anda melakukan hal berikut:

  1. Pilih atau buat repositori Dataform.
  2. Berikan akses Dataform ke BigQuery.
  3. Pilih atau buat ruang kerja Dataform.
  4. Buat minimal satu tabel.
  5. Buat lingkungan Cloud Composer 2.

Peran yang diperlukan

Untuk mendapatkan izin yang Anda perlukan untuk menyelesaikan tugas dalam dokumen ini, minta administrator Anda untuk memberi Anda peran IAM berikut:

Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus atau peran bawaan lainnya.

Untuk menggunakan akun layanan selain akun layanan Dataform default, berikan akses ke akun layanan kustom.

Menjadwalkan operasi dengan konfigurasi alur kerja

Bagian ini menunjukkan cara membuat konfigurasi alur kerja di Dataform untuk menjadwalkan dan mengonfigurasi pengoperasian alur kerja. Anda dapat menggunakan konfigurasi alur kerja untuk menjalankan alur kerja Dataform sesuai jadwal.

Tentang konfigurasi alur kerja

Untuk menjadwalkan pengoperasian Dataform dari semua atau tindakan alur kerja yang dipilih di BigQuery, Anda dapat membuat konfigurasi alur kerja. Dalam konfigurasi alur kerja, Anda memilih konfigurasi rilis kompilasi, memilih tindakan alur kerja untuk dieksekusi, dan menetapkan jadwal operasi.

Kemudian, selama konfigurasi alur kerja Anda dijalankan sesuai jadwal, Dataform akan men-deploy pilihan tindakan Anda dari hasil kompilasi terbaru dalam konfigurasi rilis ke BigQuery. Anda juga dapat memicu pengoperasian konfigurasi alur kerja secara manual dengan workflowConfigs Dataform API.

Konfigurasi alur kerja Dataform berisi setelan jalankan berikut:

  • ID konfigurasi alur kerja.
  • Konfigurasi rilis.
  • Akun layanan.

    Ini adalah akun layanan yang terkait dengan konfigurasi alur kerja. Anda dapat memilih akun layanan Dataform default atau akun layanan yang terkait dengan project Google Cloud Anda, atau Anda dapat memasukkan akun layanan lain secara manual. Secara default, konfigurasi alur kerja menggunakan akun layanan yang sama dengan repositori-nya.

  • Tindakan alur kerja yang akan dijalankan:

    • Semua tindakan.
    • Pemilihan tindakan.
    • Pilihan tag.
  • Jadwal dan zona waktu operasi.

Membuat konfigurasi alur kerja

Untuk membuat konfigurasi alur kerja Dataform, ikuti langkah-langkah berikut:

  1. Di repositori Anda, buka Rilis & Penjadwalan.
  2. Di bagian Konfigurasi alur kerja, klik Buat.
  3. Di panel Create workflow configuration, di kolom Configuration ID, masukkan ID unik untuk konfigurasi alur kerja.

    ID hanya boleh berisi angka, huruf, tanda hubung, dan garis bawah.

  4. Di menu Konfigurasi rilis, pilih konfigurasi rilis kompilasi.

  5. Opsional: Di kolom Frequency, masukkan frekuensi operasi dalam format unix-cron.

    Untuk memastikan Dataform mengeksekusi hasil kompilasi terbaru dalam konfigurasi rilis yang sesuai, pertahankan jeda minimum satu jam antara waktu pembuatan hasil kompilasi dan waktu eksekusi terjadwal.

  6. Di menu Service account, pilih akun layanan untuk konfigurasi alur kerja.

    Di menu, Anda dapat memilih akun layanan Dataform default atau akun layanan apa pun yang terkait dengan project Google Cloud yang aksesnya Anda miliki. Jika Anda tidak memilih akun layanan, konfigurasi alur kerja akan menggunakan akun layanan repositori.

  7. Opsional: Di menu Zona waktu, pilih zona waktu untuk dijalankan.

    Zona waktu default adalah UTC.

  8. Pilih tindakan alur kerja yang akan dijalankan:

    • Untuk menjalankan seluruh alur kerja, klik Semua tindakan.
    • Untuk menjalankan tindakan yang dipilih dalam alur kerja, klik Pemilihan tindakan, lalu pilih tindakan.
    • Untuk menjalankan tindakan dengan tag yang dipilih, klik Pemilihan tag, lalu pilih tag.
    • Opsional: Untuk menjalankan tindakan atau tag yang dipilih dan dependensinya, pilih opsi Sertakan dependensi.
    • Opsional: Untuk menjalankan tindakan atau tag yang dipilih dan dependensinya, pilih opsi Sertakan dependensi.
    • Opsional: Untuk mem-build ulang semua tabel dari awal, pilih opsi Jalankan dengan pembaruan penuh.

    Tanpa opsi ini, Dataform akan memperbarui tabel inkremental tanpa membangun ulang dari awal.

  9. Klik Buat.

Misalnya, konfigurasi alur kerja berikut mengeksekusi tindakan dengan tag hourly setiap jam di zona waktu CEST:

  • ID Konfigurasi: production-hourly
  • Konfigurasi rilis: -
  • Frekuensi: 0 * * * *
  • Zona waktu: Central European Summer Time (CEST)
  • Pemilihan tindakan alur kerja: pemilihan tag, tag hourly

Mengedit konfigurasi alur kerja

Untuk mengedit konfigurasi alur kerja, ikuti langkah-langkah berikut:

  1. Di repositori Anda, buka Rilis & Penjadwalan.
  2. Di konfigurasi alur kerja yang ingin diedit, klik menu Lainnya, lalu klik Edit.
  3. Di panel Edit workflow configuration, edit setelan konfigurasi rilis, lalu klik Save.

Menghapus konfigurasi alur kerja

Untuk menghapus konfigurasi alur kerja, ikuti langkah-langkah berikut:

  1. Di repositori Anda, buka Rilis & Penjadwalan.
  2. Di samping konfigurasi alur kerja yang ingin Anda hapus, klik menu More, lalu klik Delete.
  3. Pada dialog Hapus konfigurasi rilis, klik Hapus.

Menjadwalkan operasi dengan Workflows dan Cloud Scheduler

Bagian ini menunjukkan cara menjadwalkan eksekusi alur kerja Dataform menggunakan Workflows dan Cloud Scheduler.

Tentang operasi alur kerja terjadwal

Anda dapat menetapkan frekuensi alur kerja Dataform yang berjalan dengan membuat tugas Cloud Scheduler yang memicu alur kerja Alur Kerja. Alur kerja mengeksekusi layanan dalam alur kerja orkestrasi yang Anda tentukan.

Alur kerja menjalankan alur kerja Dataform Anda dalam proses dua langkah. Pertama, alat ini mengambil kode repositori Dataform dari penyedia Git dan mengompilasikannya menjadi hasil kompilasi. Kemudian, alat ini menggunakan hasil kompilasi untuk membuat alur kerja Dataform dan menjalankannya pada frekuensi yang Anda tetapkan.

Membuat alur kerja orkestrasi terjadwal

Untuk menjadwalkan eksekusi alur kerja Dataform, gunakan Alur kerja untuk membuat alur kerja orkestrasi dan tambahkan tugas Cloud Scheduler sebagai pemicu.

  1. Alur kerja menggunakan akun layanan untuk memberi alur kerja akses ke resourceGoogle Cloud . Buat akun layanan dan berikan peran Identity and Access Management Dataform Editor (roles/dataform.editor) serta izin minimum yang diperlukan untuk mengelola alur kerja orkestrasi Anda. Untuk mengetahui informasi selengkapnya, lihat Memberikan izin alur kerja untuk mengakses Google Cloud resource.

  2. Buat alur kerja orkestrasi dan gunakan kode sumber YAML berikut sebagai definisi alur kerja Anda:

    main:
        steps:
        - init:
            assign:
            - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID
        - createCompilationResult:
            call: http.post
            args:
                url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
                auth:
                    type: OAuth2
                body:
                    gitCommitish: GIT_COMMITISH
            result: compilationResult
        - createWorkflowInvocation:
            call: http.post
            args:
                url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
                auth:
                    type: OAuth2
                body:
                    compilationResult: ${compilationResult.body.name}
            result: workflowInvocation
        - complete:
            return: ${workflowInvocation.body.name}
    

    Ganti kode berikut:

    • PROJECT_ID: ID Google Cloud project Anda.
    • REPOSITORY_LOCATION: lokasi repositori Dataform Anda.
    • REPOSITORY_ID: nama repositori Dataform Anda.
    • GIT_COMMITISH: cabang Git tempat Anda ingin menjalankan kode Dataform. Untuk repositori yang baru dibuat, ganti dengan main.
  3. Jadwalkan alur kerja orkestrasi menggunakan Cloud Scheduler.

Menyesuaikan permintaan hasil kompilasi pembuatan alur kerja Dataform

Anda dapat memperbarui alur kerja orkestrasi yang ada dan menentukan setelan permintaan hasil kompilasi pembuatan alur kerja Dataform dalam format YAML. Untuk mengetahui informasi selengkapnya tentang setelan, lihat referensi resource REST projects.locations.repositories.compilationResults.

Misalnya, untuk menambahkan setelan _dev schemaSuffix ke semua tindakan selama kompilasi, ganti isi langkah createCompilationResult dengan cuplikan kode berikut:

    - createCompilationResult:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
            auth:
                type: OAuth2
            body:
                gitCommitish: GIT_COMMITISH
                codeCompilationConfig:
                    schemaSuffix: dev

Anda juga dapat meneruskan setelan tambahan sebagai argumen runtime dalam permintaan eksekusi Alur Kerja dan mengakses argumen tersebut menggunakan variabel. Untuk informasi selengkapnya, lihat Teruskan argumen runtime dalam permintaan eksekusi.

Menyesuaikan permintaan pemanggilan alur kerja Dataform

Anda dapat memperbarui alur kerja orkestrasi yang ada dan menentukan setelan permintaan pemanggilan alur kerja Dataform dalam format YAML. Untuk informasi selengkapnya tentang setelan permintaan pemanggilan, lihat referensi resource REST projects.locations.repositories.workflowInvocations.

Misalnya, untuk hanya mengeksekusi tindakan dengan tag hourly dengan semua dependensi transitif yang disertakan, ganti isi createWorkflowInvocation dengan cuplikan kode berikut:

    - createWorkflowInvocation:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
            auth:
                type: OAuth2
            body:
                compilationResult: ${compilationResult.body.name}
                invocationConfig:
                    includedTags:
                    - hourly
                    transitiveDependenciesIncluded: true
                

Anda juga dapat meneruskan setelan tambahan sebagai argumen runtime dalam permintaan eksekusi Alur Kerja dan mengakses argumen tersebut menggunakan variabel. Untuk informasi selengkapnya, lihat Teruskan argumen runtime dalam permintaan eksekusi.

Menjadwalkan operasi dengan Cloud Composer

Anda dapat menggunakan Cloud Composer 2 untuk menjadwalkan pengoperasian Dataform. Dataform tidak mendukung Cloud Composer 1.

Untuk mengelola jadwal pengoperasian Dataform dengan Cloud Composer 2, Anda dapat menggunakan operator Dataform di Directed Acyclic Graph (DAG) Airflow. Anda dapat membuat DAG Airflow yang menjadwalkan pemanggilan alur kerja Dataform.

Dataform menyediakan berbagai operator Airflow. Ini mencakup operator untuk mendapatkan hasil kompilasi, mendapatkan pemanggilan alur kerja, dan membatalkan pemanggilan alur kerja. Untuk melihat daftar lengkap operator Dataform Airflow yang tersedia, lihat Operator Dataform Google.

Menginstal paket PyPi google-cloud-dataform

Jika Anda menggunakan Cloud Composer 2 versi 2.0.25 dan yang lebih baru, paket ini sudah diinstal sebelumnya di lingkungan Anda. Anda tidak perlu menginstalnya.

Jika Anda menggunakan Cloud Composer 2 versi sebelumnya, instal paket PyPi google-cloud-dataform.

Di bagian paket PyPI, tentukan versi ==0.2.0.

Membuat DAG Airflow yang menjadwalkan pemanggilan alur kerja Dataform

Untuk mengelola operasi terjadwal alur kerja Dataform dengan Cloud Composer 2, tulis DAG menggunakan operator Airflow Dataform, lalu upload ke bucket lingkungan Anda.

Contoh kode berikut menunjukkan DAG Airflow yang membuat hasil kompilasi Dataform dan memulai pemanggilan alur kerja Dataform:

from datetime import datetime

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )
    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )


create_compilation_result >> create_workflow_invocation

Ganti kode berikut:

  • PROJECT_ID: project ID Google Cloud Dataform Anda.
  • REPOSITORY_ID: nama repositori Dataform Anda.
  • REGION: region tempat repositori Dataform berada.
  • COMPILATION_RESULT: nama hasil kompilasi yang ingin Anda gunakan untuk pemanggilan alur kerja ini.
  • GIT_COMMITISH: commitish Git di repositori Git jarak jauh dari versi kode yang ingin Anda gunakan—misalnya, cabang atau SHA Git.

Contoh kode berikut menunjukkan DAG Airflow yang melakukan hal berikut:

  1. Membuat hasil kompilasi Dataform.
  2. Memulai pemanggilan alur kerja Dataform asinkron.
  3. Melakukan polling status alur kerja Anda hingga memasuki status yang diharapkan menggunakan DataformWorkflowInvocationStateSensor.
from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": COMPILATION_RESULT
    }
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)


create_compilation_result >> create_workflow_invocation

Ganti kode berikut:

  • PROJECT_ID: projectID Google Cloud Dataform Anda.
  • REPOSITORY_ID: nama repositori Dataform Anda.
  • REGION: region tempat repositori Dataform berada.
  • COMPILATION_RESULT: nama hasil kompilasi yang ingin Anda gunakan untuk pemanggilan alur kerja ini.
  • GIT_COMMITISH: commitish Git di repositori Git jarak jauh dari versi kode yang ingin Anda gunakan—misalnya, cabang atau SHA Git.
  • COMPILATION_RESULT: nama hasil kompilasi yang ingin Anda gunakan untuk pemanggilan alur kerja ini.

Menambahkan parameter konfigurasi kompilasi

Anda dapat menambahkan parameter konfigurasi kompilasi tambahan ke objek DAG Airflow create_compilation_result. Untuk mengetahui informasi selengkapnya tentang parameter yang tersedia, lihat referensi API Dataform CodeCompilationConfig.

  • Untuk menambahkan parameter konfigurasi kompilasi ke objek DAG Airflow create_compilation_result, tambahkan parameter yang dipilih ke kolom code_compilation_config dalam format berikut:

        create_compilation_result = DataformCreateCompilationResultOperator(
            task_id="create_compilation_result",
            project_id=PROJECT_ID,
            region=REGION,
            repository_id=REPOSITORY_ID,
            compilation_result={
                "git_commitish": GIT_COMMITISH,
                "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
            },
        )
    

    Ganti kode berikut:

    • PROJECT_ID: project ID Google Cloud Dataform Anda.
    • REPOSITORY_ID: nama repositori Dataform Anda.
    • REGION: region tempat repositori Dataform berada.
    • GIT_COMMITISH: commitish Git di repositori Git jarak jauh dari versi kode yang ingin Anda gunakan—misalnya, cabang atau SHA Git.
    • PARAMETER: parameter CodeCompilationConfig yang dipilih. Anda dapat menambahkan beberapa parameter.
    • PARAMETER_VALUE: nilai parameter yang dipilih.

Contoh kode berikut menunjukkan parameter defaultDatabase yang ditambahkan ke objek DAG Airflow create_compilation_result:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": REMOTE_BRANCH,
            "code_compilation_config": { "default_database": "my-custom-gcp-project"}
        },
    )

Menambahkan parameter konfigurasi pemanggilan alur kerja

Anda dapat menambahkan parameter konfigurasi pemanggilan alur kerja tambahan ke objek DAG Airflow create_workflow_invocation. Untuk mengetahui informasi selengkapnya tentang parameter yang tersedia, lihat referensi API Dataform InvocationConfig.

  • Untuk menambahkan parameter konfigurasi pemanggilan alur kerja ke objek DAG Airflow create_workflow_invocation, tambahkan parameter yang dipilih ke kolom invocation_config dalam format berikut:

        create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
            task_id='create_workflow_invocation',
            project_id=PROJECT_ID,
            region=REGION,
            repository_id=REPOSITORY_ID,
            workflow_invocation={
                "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
                "invocation_config": { "PARAMETER": PARAMETER_VALUE }
            },
        )
    
    

    Ganti kode berikut:

    • PROJECT_ID: project ID Google Cloud Dataform Anda.
    • REPOSITORY_ID: nama repositori Dataform Anda.
    • REGION: region tempat repositori Dataform berada.
    • PARAMETER: parameter InvocationConfig yang dipilih. Anda dapat menambahkan beberapa parameter.
    • PARAMETER_VALUE: nilai parameter yang dipilih.

Contoh kode berikut menunjukkan parameter includedTags[] dan transitiveDependenciesIncluded yang ditambahkan ke objek DAG Airflow create_workflow_invocation:

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
        },
    )

Langkah berikutnya