Questo documento mostra come eseguire le seguenti operazioni in Dataform:
- Pianifica le esecuzioni con le configurazioni dei flussi di lavoro.
- Pianifica le esecuzioni con Workflows e Cloud Scheduler.
- Pianifica le esecuzioni con Cloud Composer.
Prima di iniziare
Per pianificare le esecuzioni con le configurazioni dei flussi di lavoro o pianificare le esecuzioni con i flussi di lavoro e Cloud Scheduler, assicurati di svolgere le seguenti operazioni:
Nella console Google Cloud, vai alla pagina Dataform.
Seleziona o crea un repository.
Crea una configurazione della release.
Per pianificare le esecuzioni con Cloud Composer, assicurati di svolgere quanto segue:
- Seleziona o crea un repository Dataform.
- Concedi a Dataform l'accesso a BigQuery.
- Seleziona o crea uno spazio di lavoro Dataform.
- Crea almeno una tabella.
- Crea un ambiente Cloud Composer 2.
Ruoli obbligatori
Per ottenere le autorizzazioni necessarie per completare le attività descritte in questo documento, chiedi all'amministratore di concederti i seguenti ruoli IAM:
-
Amministratore Dataform (
roles/dataform.admin
) nei repository -
Editor Dataform (
roles/dataform.editor
) nei repository e nell'account di servizio del tuo ambiente Cloud Composer -
Worker Composer (
roles/composer.worker
) nell'account di servizio dell'ambiente Cloud Composer
Per ulteriori informazioni sulla concessione dei ruoli, consulta Gestire l'accesso a progetti, cartelle e organizzazioni.
Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.
Per utilizzare un account di servizio diverso dall'account di servizio Dataform predefinito, concedi l'accesso all'account di servizio personalizzato.
Pianificare le esecuzioni con le configurazioni del flusso di lavoro
Questa sezione mostra come creare una configurazione del flusso di lavoro in Dataform per pianificare e configurare le esecuzioni del flusso di lavoro. Puoi utilizzare le configurazioni del workflow per eseguire i flussi di lavoro Dataform in base a una pianificazione.
Informazioni sulle configurazioni dei workflow
Per pianificare le esecuzioni di Dataform di tutte o di alcune azioni di workflow in BigQuery, puoi creare configurazioni di workflow. In una configurazione del flusso di lavoro, seleziona una configurazione della release di compilazione, le azioni del flusso di lavoro per l'esecuzione e imposta la pianificazione dell'esecuzione.
Poi, durante un'esecuzione pianificata della configurazione del flusso di lavoro, Dataform esegue il deployment della selezione di azioni dall'ultimo risultato di compilazione nella configurazione della release in BigQuery. Puoi anche attivare manualmente l'esecuzione di una configurazione del flusso di lavoro con workflowConfigs dell'API Dataform.
Una configurazione del flusso di lavoro Dataform contiene le seguenti impostazioni di esecuzione:
- ID della configurazione del flusso di lavoro.
- Configurazione della release.
Account di servizio.
Si tratta dell'account di servizio associato alla configurazione del flusso di lavoro. Puoi selezionare l'account di servizio Dataform predefinito o un account di servizio associato al tuo progetto Google Cloud oppure inserire manualmente un altro account di servizio. Per impostazione predefinita, le configurazioni dei flussi di lavoro utilizzano gli stessi account di servizio dei relativi repository.
Azioni del flusso di lavoro da eseguire:
- Tutte le azioni.
- Selezione di azioni.
- Selezione di tag.
Pianificazione delle esecuzioni e fuso orario.
Crea una configurazione di flusso di lavoro
Per creare una configurazione del flusso di lavoro di Dataform:
- Nel repository, vai a Uscite e pianificazione.
- Nella sezione Configurazioni dei flussi di lavoro, fai clic su Crea.
Nel riquadro Crea configurazione del flusso di lavoro, inserisci un ID univoco per la configurazione del flusso di lavoro nel campo ID configurazione.
Gli ID possono includere solo numeri, lettere, trattini e trattini bassi.
Nel menu Configurazione della release, seleziona una configurazione della release della compilazione.
(Facoltativo) Nel campo Frequenza, inserisci la frequenza delle esecuzioni nel formato unix-cron.
Per assicurarti che Dataform esegua il risultato di compilazione più recente nella configurazione della release corrispondente, mantieni un'interruzione minima di un'ora tra l'ora di creazione del risultato di compilazione e l'ora di esecuzione pianificata.
Nel menu Service account, seleziona un service account per la configurazione del flusso di lavoro.
Nel menu, puoi selezionare l'account di servizio Dataform predefinito o qualsiasi account di servizio associato al tuo progetto Google Cloud a cui hai accesso. Se non selezioni un account di servizio, la configurazione del flusso di lavoro utilizza l'account di servizio del repository.
(Facoltativo) Nel menu Fuso orario, seleziona il fuso orario per le esecuzioni.
Il fuso orario predefinito è UTC.
Seleziona le azioni del flusso di lavoro da eseguire:
- Per eseguire l'intero flusso di lavoro, fai clic su Tutte le azioni.
- Per eseguire le azioni selezionate nel flusso di lavoro, fai clic su Selezione di azioni e poi seleziona le azioni.
- Per eseguire azioni con i tag selezionati, fai clic su Selezione di tag e seleziona i tag.
- (Facoltativo) Per eseguire le azioni o i tag selezionati e le relative dipendenze, seleziona l'opzione Includi dipendenze.
- (Facoltativo) Per eseguire le azioni o i tag selezionati e i relativi elementi dipendenti, seleziona l'opzione Includi elementi dipendenti.
- (Facoltativo) Per ricostruire tutte le tabelle da zero, seleziona l'opzione Esegui con aggiornamento completo.
Senza questa opzione, Dataform aggiorna le tabelle incrementali senza ricostruirle da zero.
Fai clic su Crea.
Ad esempio, la seguente configurazione del flusso di lavoro esegue azioni con il
hourly
tag ogni ora nel fuso orario CEST:
- ID configurazione:
production-hourly
- Configurazione della release: -
- Frequenza:
0 * * * *
- Fuso orario:
Central European Summer Time (CEST)
- Selezione di azioni di flusso di lavoro: selezione di tag, tag
hourly
Modificare una configurazione del flusso di lavoro
Per modificare una configurazione del flusso di lavoro:
- Nel repository, vai a Uscite e pianificazione.
- Accanto alla configurazione del flusso di lavoro che vuoi modificare, fai clic sul menu Altro e poi su Modifica.
- Nel riquadro Modifica configurazione del flusso di lavoro, modifica le impostazioni della configurazione della release e poi fai clic su Salva.
Eliminare una configurazione del flusso di lavoro
Per eliminare una configurazione del flusso di lavoro:
- Nel repository, vai a Uscite e pianificazione.
- Accanto alla configurazione del flusso di lavoro che vuoi eliminare, fai clic sul menu Altro e poi su Elimina.
- Nella finestra di dialogo Elimina configurazione della release, fai clic su Elimina.
Pianificare le esecuzioni con Workflows e Cloud Scheduler
Questa sezione mostra come pianificare le esecuzioni dei flussi di lavoro Dataform utilizzando Workflows e Cloud Scheduler.
Informazioni sulle esecuzioni pianificate del flusso di lavoro
Puoi impostare la frequenza di esecuzione dei flussi di lavoro di Dataform creando un job Cloud Scheduler che attivi un flusso di lavoro Workflows. Workflows esegue i servizi in un flusso di lavoro di orchestrazione definito da te.
Workflows esegue il flusso di lavoro Dataform in un procedura in due fasi. Innanzitutto, estrae il codice del repository Dataform dal provider Git e lo compila in un risultato di compilazione. Quindi, utilizza il risultato della compilazione per creare un flusso di lavoro Dataform e lo esegue con la frequenza impostata.
Creare un flusso di lavoro di orchestrazione pianificato
Per pianificare le esecuzioni del flusso di lavoro Dataform, utilizza Workflows per creare un flusso di lavoro di orchestrazione e aggiungi un job Cloud Scheduler come attivatore.
Workflows utilizza gli account di servizio per concedere ai flussi di lavoro l'accesso alle Google Cloud risorse. Crea un account di servizio e concedigli il ruolo di Gestione di identità e accessi Editor di Dataform (
roles/dataform.editor
) nonché le autorizzazioni minime necessarie per gestire il flusso di lavoro di orchestrazione. Per ulteriori informazioni, vedi Concedere l'autorizzazione dei workflow per l'accesso alle Google Cloud risorse.Crea un flusso di lavoro di orchestrazione e utilizza il seguente codice sorgente YAML come definizione del flusso di lavoro:
main: steps: - init: assign: - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID - createCompilationResult: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"} auth: type: OAuth2 body: gitCommitish: GIT_COMMITISH result: compilationResult - createWorkflowInvocation: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"} auth: type: OAuth2 body: compilationResult: ${compilationResult.body.name} result: workflowInvocation - complete: return: ${workflowInvocation.body.name}
Sostituisci quanto segue:
- PROJECT_ID: l'ID del tuo Google Cloud progetto.
- REPOSITORY_LOCATION: la posizione del repository Dataform.
- REPOSITORY_ID: il nome del tuo repository Dataform.
- GIT_COMMITISH: il ramo Git da cui vuoi eseguire il codice Dataform. Per un repository appena creato, sostituisci con
main
.
Pianifica il flusso di lavoro di orchestrazione utilizzando Cloud Scheduler.
Personalizzare la richiesta di risultato della compilazione della creazione del flusso di lavoro Dataform
Puoi
aggiornare il flusso di lavoro di orchestrazione esistente
e definire le impostazioni della richiesta di creazione del risultato della compilazione del flusso di lavoro Dataform in formato YAML. Per ulteriori informazioni sulle impostazioni, consulta la projects.locations.repositories.compilationResults
guida di riferimento alle risorse REST.
Ad esempio, per aggiungere un'impostazione _dev
schemaSuffix
a tutte le azioni durante la compilazione,
sostituire il corpo del passaggio createCompilationResult
con il seguente snippet di codice:
- createCompilationResult:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
auth:
type: OAuth2
body:
gitCommitish: GIT_COMMITISH
codeCompilationConfig:
schemaSuffix: dev
Puoi anche passare impostazioni aggiuntive come argomenti di runtime in una richiesta di esecuzione di Workflows e accedere a questi argomenti utilizzando le variabili. Per ulteriori informazioni, consulta Passare gli argomenti di runtime in una richiesta di esecuzione.
Personalizzare la richiesta di chiamata del flusso di lavoro Dataform
Puoi
aggiornare il flusso di lavoro di orchestrazione esistente
e definire le impostazioni della richiesta di chiamata del flusso di lavoro Dataform in
formato YAML. Per ulteriori informazioni sulle impostazioni della richiesta di invocazione, consulta la projects.locations.repositories.workflowInvocations
guida di riferimento delle risorse REST.
Ad esempio, per eseguire solo azioni con il tag hourly
con tutte le dipendenze trascendenti incluse, sostituisci il corpo createWorkflowInvocation
con lo snippet di codice seguente:
- createWorkflowInvocation:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
auth:
type: OAuth2
body:
compilationResult: ${compilationResult.body.name}
invocationConfig:
includedTags:
- hourly
transitiveDependenciesIncluded: true
Puoi anche passare impostazioni aggiuntive come argomenti di runtime in una richiesta di esecuzione di Workflows e accedere a questi argomenti utilizzando le variabili. Per ulteriori informazioni, consulta Passare gli argomenti di runtime in una richiesta di esecuzione.
Pianificare le esecuzioni con Cloud Composer
Puoi utilizzare Cloud Composer 2 per pianificare le esecuzioni di Dataform. Dataform non supporta Cloud Composer 1.
Per gestire le pianificazioni delle esecuzioni di Dataform con Cloud Composer 2, puoi utilizzare operatori Dataform nei grafici diretti aciclici (DAG) di Airflow. Puoi creare un DAG Airflow che pianifica le invocazioni del flusso di lavoro Dataform.
Dataform fornisce vari operatori Airflow. Sono inclusi gli operatori per ottenere un risultato di compilazione, un'invocazione di flusso di lavoro e l'annullamento di un'invocazione di flusso di lavoro. Per visualizzare l'elenco completo degli operatori Dataform Airflow disponibili, consulta Operatori Google Dataform.
Installa il pacchetto PyPi google-cloud-dataform
Se utilizzi le versioni 2.0.25
e successive di Cloud Composer 2, questo pacchetto è preinstallato nel tuo ambiente. Non è necessario installarlo.
Se utilizzi versioni precedenti di Cloud Composer 2,
installa il pacchetto PyPi google-cloud-dataform
.
Nella sezione dei pacchetti PyPI, specifica la versione ==0.2.0
.
Crea un DAG Airflow che pianifica le invocazioni del flusso di lavoro Dataform
Per gestire le esecuzioni pianificate dei flussi di lavoro Dataform con Cloud Composer 2, scrivi il DAG utilizzando gli operatori Airflow di Dataform, quindi caricalo nel bucket del tuo ambiente.
Il seguente esempio di codice mostra un DAG Airflow che crea un risultato di compilazione Dataform e avvia un'invocazione del flusso di lavoro Dataform:
from datetime import datetime
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
Sostituisci quanto segue:
- PROJECT_ID: l'ID del tuo progetto Google Cloud Dataform.
- REPOSITORY_ID: il nome del tuo repository Dataform.
- REGION: la regione in cui si trova il repository Dataform.
- COMPILATION_RESULT: il nome del risultato della compilazione che vuoi utilizzare per l'invocazione di questo flusso di lavoro.
- GIT_COMMITISH: il commitish Git nel repository Git remoto della versione del codice che vuoi utilizzare, ad esempio un ramo o un SHA Git.
Il seguente esempio di codice mostra un DAG Airflow che esegue le seguenti operazioni:
- Crea un risultato di compilazione di Dataform.
- Avvia un'invocazione asincrona del flusso di lavoro Dataform.
- Esegue il polling dello stato del flusso di lavoro finché non entra nello stato previsto utilizzando
DataformWorkflowInvocationStateSensor
.
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
Sostituisci quanto segue:
- PROJECT_ID: il tuo ID progetto Google Cloud Dataform.
- REPOSITORY_ID: il nome del tuo repository Dataform.
- REGION: la regione in cui si trova il repository Dataform.
- COMPILATION_RESULT: il nome del risultato della compilazione che vuoi utilizzare per l'invocazione di questo flusso di lavoro.
- GIT_COMMITISH: il commitish Git nel repository Git remoto della versione del codice che vuoi utilizzare, ad esempio un ramo o un SHA Git.
- COMPILATION_RESULT: il nome del risultato della compilazione che vuoi utilizzare per l'invocazione di questo flusso di lavoro.
Aggiungere i parametri di configurazione della compilazione
Puoi aggiungere altri parametri di configurazione della compilazione all'create_compilation_result
oggetto DAG di Airflow. Per ulteriori informazioni sui parametri disponibili, consulta il CodeCompilationConfig
riferimento all'API Dataform.
Per aggiungere parametri di configurazione della compilazione all'oggetto DAG
create_compilation_result
Airflow, aggiungi i parametri selezionati al campocode_compilation_config
nel seguente formato:create_compilation_result = DataformCreateCompilationResultOperator( task_id="create_compilation_result", project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, compilation_result={ "git_commitish": GIT_COMMITISH, "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"} }, )
Sostituisci quanto segue:
- PROJECT_ID: l'ID del tuo progetto Google Cloud Dataform.
- REPOSITORY_ID: il nome del tuo repository Dataform.
- REGION: la regione in cui si trova il repository Dataform.
- GIT_COMMITISH: il commitish Git nel repository Git remoto della versione del codice che vuoi utilizzare, ad esempio un ramo o un SHA Git.
- PARAMETER: il parametro
CodeCompilationConfig
selezionato. Puoi aggiungere più parametri. - PARAMETER_VALUE: il valore del parametro selezionato.
Il seguente esempio di codice mostra il parametro defaultDatabase
aggiunto all'oggetto DAG Airflow create_compilation_result
:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
Aggiungi i parametri di configurazione dell'invocazione del workflow
Puoi aggiungere altri parametri di configurazione dell'invocazione del flusso di lavoro all'create_workflow_invocation
oggetto DAG di Airflow. Per ulteriori informazioni sui parametri disponibili, consulta il InvocationConfig
riferimento all'API Dataform.
Per aggiungere i parametri di configurazione dell'invocazione del flusso di lavoro all'
create_workflow_invocation
oggetto DAG Airflow, aggiungi i parametri selezionati al campoinvocation_config
nel seguente formato:create_workflow_invocation = DataformCreateWorkflowInvocationOperator( task_id='create_workflow_invocation', project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, workflow_invocation={ "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}", "invocation_config": { "PARAMETER": PARAMETER_VALUE } }, )
Sostituisci quanto segue:
- PROJECT_ID: l'ID del tuo progetto Google Cloud Dataform.
- REPOSITORY_ID: il nome del tuo repository Dataform.
- REGION: la regione in cui si trova il repository Dataform.
- PARAMETER: il parametro
InvocationConfig
selezionato. Puoi aggiungere più parametri. - PARAMETER_VALUE: il valore del parametro selezionato.
Il seguente esempio di codice mostra i parametri includedTags[]
e
transitiveDependenciesIncluded
aggiunti all'oggetto DAG create_workflow_invocation
Airflow:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
},
)
Passaggi successivi
- Per scoprire come configurare le configurazioni della release di compilazione di Dataform, consulta Creare una configurazione della release.
- Per scoprire di più sul ciclo di vita del codice Dataform, consulta Introduzione al ciclo di vita del codice in Dataform.
- Per scoprire di più sull'API Dataform, consulta API Dataform.
- Per scoprire di più sugli ambienti Cloud Composer, consulta la Panoramica di Cloud Composer.
- Per scoprire di più sui prezzi di Workflows, consulta la pagina relativa ai prezzi di Workflows.