Puoi utilizzare Cloud Composer 2 per pianificare le esecuzioni di Dataform. Dataform non supporta Cloud Composer 1.
Per gestire le pianificazioni delle esecuzioni di Dataform con Cloud Composer 2: puoi utilizzare gli operatori Dataform nei DAG (Directed Acyclic Graph) di Airflow. Puoi creare un DAG Airflow che pianifica Chiamate del flusso di lavoro Dataform.
Dataform fornisce vari operatori Airflow. Sono inclusi gli operatori per ottenere il risultato di una compilazione, la chiamata a un flusso di lavoro e l'annullamento una chiamata a un flusso di lavoro. Per visualizzare l'elenco completo dei dati Dataform disponibili Operatori Airflow, consulta gli operatori di Google Dataform.
Prima di iniziare
- Seleziona o crea un repository Dataform.
- Concedere a Dataform l'accesso a BigQuery.
- Seleziona o crea un'area di lavoro Dataform.
- Crea almeno una tabella.
- Creare un ambiente Cloud Composer 2.
- Concedi il ruolo roles/composer.worker. e roles/dataform.editor ruoli all'account di servizio del tuo ambiente Cloud Composer.
Installa il pacchetto PyPi di google-cloud-dataform
Se usi Cloud Composer 2 versione 2.0.25
e successive, questo pacchetto
è preinstallato nel tuo ambiente. Non è necessario installarlo.
Se usi versioni precedenti di Cloud Composer 2,
installa il pacchetto PyPi google-cloud-dataform
.
Nella sezione dei pacchetti PyPI, specifica la versione ==0.2.0
.
Crea un DAG Airflow che pianifica le chiamate del flusso di lavoro Dataform
Per gestire le esecuzioni pianificate dei flussi di lavoro SQL Dataform con Cloud Composer 2, scrivi il DAG usando gli operatori Dataform Airflow, caricarlo nel bucket dell'ambiente.
Il seguente esempio di codice mostra un DAG Airflow che crea un Dataform il risultato della compilazione e avvia una chiamata del flusso di lavoro Dataform:
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
Sostituisci quanto segue:
- PROJECT_ID: il tuo ID progetto Google Cloud Dataform
- REPOSITORY_ID: il nome del tuo repository Dataform
- REGION: la regione in cui l'oggetto Dataform il repository
- COMPILATION_RESULT: il nome del risultato della compilazione da utilizzare per questa chiamata del flusso di lavoro
- GIT_COMMITISH: il commitish Git, ad esempio, un ramo o una SHA Git nel repository Git remoto della versione il codice che vuoi utilizzare
Il seguente esempio di codice mostra un DAG Airflow che:
- Crea il risultato di una compilazione Dataform.
- Avvia una chiamata asincrona del flusso di lavoro Dataform.
- Esamina lo stato del flusso di lavoro fino a quando non riporta lo stato previsto
mediante
DataformWorkflowInvocationStateSensor
.
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
Sostituisci quanto segue:
- PROJECT_ID: il tuo ID progetto Google Cloud Dataform
- REPOSITORY_ID: il nome del tuo repository Dataform
- REGION: la regione in cui l'oggetto Dataform il repository
- COMPILATION_RESULT: il nome del risultato della compilazione da utilizzare per questa chiamata del flusso di lavoro
- GIT_COMMITISH: il commitish Git, ad esempio, un ramo o una SHA Git nel repository Git remoto della versione il codice che vuoi utilizzare
- COMPILATION_RESULT: il nome del risultato della compilazione da utilizzare per questa chiamata del flusso di lavoro
Aggiungi parametri di configurazione della compilazione
Puoi aggiungere ulteriori parametri di configurazione di compilazione
create_compilation_result
Oggetto DAG Airflow. Per ulteriori informazioni
i parametri disponibili, consulta la documentazione di riferimento dell'API Dataform di CodeCompilationConfig
.
- Per aggiungere parametri di configurazione della compilazione a
create_compilation_result
Oggetto DAG Airflow, aggiungi i parametri selezionati acode_compilation_config
nel seguente formato:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
"code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
},
)
Sostituisci quanto segue:
- PROJECT_ID: il tuo ID progetto Google Cloud Dataform
- REPOSITORY_ID: il nome del tuo repository Dataform
- REGION con la regione in cui il repository Dataform si trova
- GIT_COMMITISH: il commitish Git, ad esempio, un ramo o una SHA Git nel repository Git remoto della versione il codice che vuoi utilizzare
- PARAMETER: parametro
CodeCompilationConfig
selezionato. Puoi aggiungere più parametri. - PARAMETER_VALUE: valore del parametro selezionato
Il seguente esempio di codice mostra il parametro defaultDatabase
aggiunto alla
create_compilation_result
Oggetto DAG Airflow:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
Aggiungi parametri di configurazione della chiamata del flusso di lavoro
Puoi aggiungere ulteriori parametri di configurazione della chiamata del flusso di lavoro
create_workflow_invocation
Oggetto DAG Airflow. Per ulteriori informazioni
i parametri disponibili, consulta la documentazione di riferimento dell'API Dataform di InvocationConfig
.
- Per aggiungere parametri di configurazione della chiamata del flusso di lavoro alla
create_workflow_invocation
oggetto DAG Airflow, aggiungi i parametri selezionati ainvocation_config
nel seguente formato:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "PARAMETER": PARAMETER_VALUE }
},
)
Sostituisci quanto segue:
- PROJECT_ID: il tuo ID progetto Google Cloud Dataform
- REPOSITORY_ID: il nome del tuo repository Dataform
- REGION: la regione in cui il repository Dataform si trova
- PARAMETER: parametro
InvocationConfig
selezionato. Puoi aggiungere più parametri. - PARAMETER_VALUE: valore del parametro selezionato
Il seguente esempio di codice mostra includedTags[]
e
transitiveDependenciesIncluded
parametri aggiunti a
create_workflow_invocation
Oggetto DAG Airflow:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
},
)
Passaggi successivi
- Per scoprire come configurare gli override delle compilazioni per le esecuzioni dei flussi di lavoro, consulta Configura gli override delle compilazioni.
- Per scoprire di più sull'API Dataform, consulta API Dataform.
- Per saperne di più sugli ambienti Cloud Composer, consulta Panoramica di Cloud Composer.
- Per scoprire come pianificare le esecuzioni con Workflows e Cloud Scheduler, consulta Pianifica le esecuzioni con Workflows e Cloud Scheduler.