Pianifica le esecuzioni con Cloud Composer

Questo documento mostra come eseguire esecuzioni pianificate di Dataform Flussi di lavoro SQL con Cloud Composer 2.

Puoi utilizzare Cloud Composer 2 per pianificare le esecuzioni di Dataform. Dataform non supporta Cloud Composer 1.

Per gestire le pianificazioni delle esecuzioni di Dataform con Cloud Composer 2: puoi utilizzare gli operatori Dataform nei DAG (Directed Acyclic Graph) di Airflow. Puoi creare un DAG Airflow che pianifica Chiamate del flusso di lavoro Dataform.

Dataform fornisce vari operatori Airflow. Sono inclusi gli operatori per ottenere il risultato di una compilazione, la chiamata a un flusso di lavoro e l'annullamento una chiamata a un flusso di lavoro. Per visualizzare l'elenco completo dei dati Dataform disponibili Operatori Airflow, consulta gli operatori di Google Dataform.

Prima di iniziare

  1. Seleziona o crea un repository Dataform.
  2. Concedere a Dataform l'accesso a BigQuery.
  3. Seleziona o crea un'area di lavoro Dataform.
  4. Crea almeno una tabella.
  5. Creare un ambiente Cloud Composer 2.
    1. Concedi il ruolo roles/composer.worker. e roles/dataform.editor ruoli all'account di servizio del tuo ambiente Cloud Composer.

Installa il pacchetto PyPi di google-cloud-dataform

Se usi Cloud Composer 2 versione 2.0.25 e successive, questo pacchetto è preinstallato nel tuo ambiente. Non è necessario installarlo.

Se usi versioni precedenti di Cloud Composer 2, installa il pacchetto PyPi google-cloud-dataform.

Nella sezione dei pacchetti PyPI, specifica la versione ==0.2.0.

Crea un DAG Airflow che pianifica le chiamate del flusso di lavoro Dataform

Per gestire le esecuzioni pianificate dei flussi di lavoro SQL Dataform con Cloud Composer 2, scrivi il DAG usando gli operatori Dataform Airflow, caricarlo nel bucket dell'ambiente.

Il seguente esempio di codice mostra un DAG Airflow che crea un Dataform il risultato della compilazione e avvia una chiamata del flusso di lavoro Dataform:

from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )

create_compilation_result >> create_workflow_invocation

Sostituisci quanto segue:

  • PROJECT_ID: il tuo ID progetto Google Cloud Dataform
  • REPOSITORY_ID: il nome del tuo repository Dataform
  • REGION: la regione in cui l'oggetto Dataform il repository
  • COMPILATION_RESULT: il nome del risultato della compilazione da utilizzare per questa chiamata del flusso di lavoro
  • GIT_COMMITISH: il commitish Git, ad esempio, un ramo o una SHA Git nel repository Git remoto della versione il codice che vuoi utilizzare

Il seguente esempio di codice mostra un DAG Airflow che:

  1. Crea il risultato di una compilazione Dataform.
  2. Avvia una chiamata asincrona del flusso di lavoro Dataform.
  3. Esamina lo stato del flusso di lavoro fino a quando non riporta lo stato previsto mediante DataformWorkflowInvocationStateSensor.
from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": COMPILATION_RESULT
    }
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)


create_compilation_result >> create_workflow_invocation

Sostituisci quanto segue:

  • PROJECT_ID: il tuo ID progetto Google Cloud Dataform
  • REPOSITORY_ID: il nome del tuo repository Dataform
  • REGION: la regione in cui l'oggetto Dataform il repository
  • COMPILATION_RESULT: il nome del risultato della compilazione da utilizzare per questa chiamata del flusso di lavoro
  • GIT_COMMITISH: il commitish Git, ad esempio, un ramo o una SHA Git nel repository Git remoto della versione il codice che vuoi utilizzare
  • COMPILATION_RESULT: il nome del risultato della compilazione da utilizzare per questa chiamata del flusso di lavoro

Aggiungi parametri di configurazione della compilazione

Puoi aggiungere ulteriori parametri di configurazione di compilazione create_compilation_result Oggetto DAG Airflow. Per ulteriori informazioni i parametri disponibili, consulta la documentazione di riferimento dell'API Dataform di CodeCompilationConfig.

  • Per aggiungere parametri di configurazione della compilazione a create_compilation_result Oggetto DAG Airflow, aggiungi i parametri selezionati a code_compilation_config nel seguente formato:
    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
            "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
        },
    )

Sostituisci quanto segue:

  • PROJECT_ID: il tuo ID progetto Google Cloud Dataform
  • REPOSITORY_ID: il nome del tuo repository Dataform
  • REGION con la regione in cui il repository Dataform si trova
  • GIT_COMMITISH: il commitish Git, ad esempio, un ramo o una SHA Git nel repository Git remoto della versione il codice che vuoi utilizzare
  • PARAMETER: parametro CodeCompilationConfig selezionato. Puoi aggiungere più parametri.
  • PARAMETER_VALUE: valore del parametro selezionato

Il seguente esempio di codice mostra il parametro defaultDatabase aggiunto alla create_compilation_result Oggetto DAG Airflow:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": REMOTE_BRANCH,
            "code_compilation_config": { "default_database": "my-custom-gcp-project"}
        },
    )

Aggiungi parametri di configurazione della chiamata del flusso di lavoro

Puoi aggiungere ulteriori parametri di configurazione della chiamata del flusso di lavoro create_workflow_invocation Oggetto DAG Airflow. Per ulteriori informazioni i parametri disponibili, consulta la documentazione di riferimento dell'API Dataform di InvocationConfig.

  • Per aggiungere parametri di configurazione della chiamata del flusso di lavoro alla create_workflow_invocation oggetto DAG Airflow, aggiungi i parametri selezionati a invocation_config nel seguente formato:
    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            
            "invocation_config": { "PARAMETER": PARAMETER_VALUE }
        },

    )

Sostituisci quanto segue:

  • PROJECT_ID: il tuo ID progetto Google Cloud Dataform
  • REPOSITORY_ID: il nome del tuo repository Dataform
  • REGION: la regione in cui il repository Dataform si trova
  • PARAMETER: parametro InvocationConfig selezionato. Puoi aggiungere più parametri.
  • PARAMETER_VALUE: valore del parametro selezionato

Il seguente esempio di codice mostra includedTags[] e transitiveDependenciesIncluded parametri aggiunti a create_workflow_invocation Oggetto DAG Airflow:

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
        },
    )

Passaggi successivi