Menjadwalkan eksekusi dengan Cloud Composer

Dokumen ini menunjukkan cara menjalankan eksekusi terjadwal alur kerja Dataform SQL menggunakan Cloud Composer 2.

Anda dapat menggunakan Cloud Composer 2 untuk menjadwalkan eksekusi Dataform. Formulir data tidak mendukung Cloud Composer 1.

Untuk mengelola jadwal eksekusi Dataform dengan Cloud Composer 2, Anda dapat menggunakan operator Dataform di Airflow Directed Acyclic Graphs (DAG). Anda dapat membuat DAG Airflow yang menjadwalkan pemanggilan alur kerja Dataform.

Dataform menyediakan berbagai operator Airflow. Ini mencakup operator untuk mendapatkan hasil kompilasi, mendapatkan pemanggilan alur kerja, dan membatalkan pemanggilan alur kerja. Untuk melihat daftar lengkap operator Dataform Airflow yang tersedia, lihat Operator Dataform Google.

Sebelum memulai

  1. Pilih atau buat repositori Dataform.
  2. Berikan akses Dataform ke BigQuery.
  3. Pilih atau buat Ruang kerja Dataform.
  4. Buat minimal satu tabel.
  5. Buat lingkungan Cloud Composer 2.
    1. Berikan peran roles/composer.worker dan roles/dataform.editor ke akun layanan lingkungan Cloud Composer.

Menginstal paket PyPi google-cloud-dataform

Jika Anda menggunakan Cloud Composer 2 versi 2.0.25 dan yang lebih baru, paket ini sudah diinstal di lingkungan Anda. Anda tidak perlu menginstalnya.

Jika Anda menggunakan Cloud Composer 2 versi sebelumnya, instal paket PyPi google-cloud-dataform.

Di bagian paket PyPI, tentukan versi ==0.2.0.

Membuat DAG Airflow yang menjadwalkan pemanggilan alur kerja Dataform

Untuk mengelola eksekusi terjadwal alur kerja Dataform SQL dengan Cloud Composer 2, tulis DAG menggunakan operator Dataform Airflow, lalu upload ke bucket lingkungan Anda.

Contoh kode berikut menunjukkan DAG Airflow yang membuat hasil kompilasi Dataform dan memulai pemanggilan alur kerja Dataform:

from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )

create_compilation_result >> create_workflow_invocation

Ganti kode berikut:

  • PROJECT_ID: ID Project Google Cloud Dataform Anda
  • REPOSITORY_ID: nama repositori Dataform Anda
  • REGION: wilayah tempat repositori Dataform berada
  • COMPILATION_RESULT: nama hasil kompilasi yang ingin Anda gunakan untuk pemanggilan alur kerja ini
  • GIT_COMMITISH: commitish Git, misalnya, cabang atau SHA Git, di repositori Git jarak jauh dari versi kode yang ingin Anda gunakan

Contoh kode berikut menunjukkan DAG Airflow yang:

  1. Membuat hasil kompilasi Dataform
  2. Memulai pemanggilan alur kerja Dataform asinkron.
  3. Melakukan polling status alur kerja hingga memasuki status yang diharapkan menggunakan DataformWorkflowInvocationStateSensor.
from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": COMPILATION_RESULT
    }
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)


create_compilation_result >> create_workflow_invocation

Ganti kode berikut:

  • PROJECT_ID: ID Project Google Cloud Dataform Anda
  • REPOSITORY_ID: nama repositori Dataform Anda
  • REGION: wilayah tempat repositori Dataform berada
  • COMPILATION_RESULT: nama hasil kompilasi yang ingin Anda gunakan untuk pemanggilan alur kerja ini
  • GIT_COMMITISH: commitish Git, misalnya, cabang atau SHA Git, di repositori Git jarak jauh dari versi kode yang ingin Anda gunakan
  • COMPILATION_RESULT: nama hasil kompilasi yang ingin Anda gunakan untuk pemanggilan alur kerja ini

Menambahkan parameter konfigurasi kompilasi

Anda dapat menambahkan parameter konfigurasi kompilasi tambahan ke objek DAG Airflow create_compilation_result. Untuk mengetahui informasi selengkapnya tentang parameter yang tersedia, lihat Referensi CodeCompilationConfig Dataform API.

  • Untuk menambahkan parameter konfigurasi kompilasi ke objek DAG Airflow create_compilation_result, tambahkan parameter yang Anda pilih ke code_compilation_config dalam format berikut:
    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
            "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
        },
    )

Ganti kode berikut:

  • PROJECT_ID: ID Project Google Cloud Dataform Anda
  • REPOSITORY_ID: nama repositori Dataform Anda
  • REGION dengan wilayah tempat repositori Dataform berada
  • GIT_COMMITISH: commitish Git, misalnya, cabang atau SHA Git, di repositori Git jarak jauh dari versi kode yang ingin Anda gunakan
  • PARAMETER: parameter CodeCompilationConfig yang dipilih. Anda dapat menambahkan beberapa parameter.
  • PARAMETER_VALUE: nilai parameter yang dipilih

Contoh kode berikut menunjukkan parameter defaultDatabase yang ditambahkan ke objek DAG Airflow create_compilation_result:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": REMOTE_BRANCH,
            "code_compilation_config": { "default_database": "my-custom-gcp-project"}
        },
    )

Menambahkan parameter konfigurasi pemanggilan alur kerja

Anda dapat menambahkan parameter konfigurasi pemanggilan alur kerja tambahan ke objek DAG Airflow create_workflow_invocation. Untuk mengetahui informasi selengkapnya tentang parameter yang tersedia, lihat Referensi InvocationConfig Dataform API.

  • Untuk menambahkan parameter konfigurasi pemanggilan alur kerja ke objek DAG Airflow create_workflow_invocation, tambahkan parameter yang dipilih ke invocation_config dalam format berikut:
    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            
            "invocation_config": { "PARAMETER": PARAMETER_VALUE }
        },

    )

Ganti kode berikut:

  • PROJECT_ID: ID Project Google Cloud Dataform Anda
  • REPOSITORY_ID: nama repositori Dataform Anda
  • REGION: region tempat repositori Dataform berada
  • PARAMETER: parameter InvocationConfig yang dipilih. Anda dapat menambahkan beberapa parameter.
  • PARAMETER_VALUE: nilai parameter yang dipilih

Contoh kode berikut menunjukkan parameter includedTags[] dan transitiveDependenciesIncluded yang ditambahkan ke objek DAG Airflow create_workflow_invocation:

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
        },
    )

Langkah selanjutnya