Questo documento mostra come eseguire le seguenti operazioni in Dataform:
- Pianificare le esecuzioni con le configurazioni del workflow.
- Pianifica le esecuzioni con Workflows e Cloud Scheduler.
- Pianifica le esecuzioni con Cloud Composer.
Prima di iniziare
Per pianificare le esecuzioni con le configurazioni del flusso di lavoro o pianificare le esecuzioni con i flussi di lavoro e Cloud Scheduler, procedi nel seguente modo:
Nella console Google Cloud , vai alla pagina Dataform.
Seleziona o crea un repository.
Crea una configurazione della release.
Per pianificare le esecuzioni con Cloud Composer:
- Seleziona o crea un repository Dataform.
- Concedere a Dataform l'accesso a BigQuery.
- Seleziona o crea un workspace Dataform.
- Crea almeno una tabella.
- Crea un ambiente Cloud Composer 2.
Ruoli obbligatori
Per ottenere le autorizzazioni necessarie per completare le attività descritte in questo documento, chiedi all'amministratore di concederti i seguenti ruoli IAM:
-
Amministratore Dataform (
roles/dataform.admin
) sui repository -
Worker Composer (
roles/composer.worker
) nell'account di servizio dell'ambiente Cloud Composer
Per ulteriori informazioni sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.
Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.
Per utilizzare un account di servizio diverso da quello predefinito di Dataform, concedi l'accesso al service account personalizzato.
Per abilitare le esecuzioni pianificate per una configurazione del workflow quando è abilitata la modalità act-as rigorosa, devi concedere l'autorizzazione iam.serviceAccounts.actAs
al service account Dataform per il account di servizio utilizzato nella configurazione del workflow. Questa autorizzazione è
disponibile nel ruolo
Utente service account (roles/iam.serviceAccountUser
).
Per utilizzare le credenziali utente dell'Account Google durante la creazione di una configurazione del flusso di lavoro (anteprima), concedi l'accesso all'Account Google.
Pianificare le esecuzioni con le configurazioni del workflow
Questa sezione mostra come creare una configurazione del workflow in Dataform per pianificare e configurare le esecuzioni del workflow. Puoi utilizzare le configurazioni del workflow per eseguire i workflow Dataform in base a una pianificazione.
Informazioni sulle configurazioni dei workflow
Per pianificare le esecuzioni di Dataform di tutte le azioni del workflow o di quelle selezionate in BigQuery, puoi creare configurazioni del workflow. In una configurazione del workflow, seleziona una configurazione della release di compilazione, seleziona le azioni del workflow da eseguire e imposta la pianificazione dell'esecuzione.
Poi, durante un'esecuzione pianificata della configurazione del workflow, Dataform esegue il deployment della selezione di azioni dall'ultimo risultato di compilazione nella configurazione di rilascio in BigQuery. Puoi anche attivare manualmente l'esecuzione di una configurazione del flusso di lavoro con l'API Dataform workflowConfigs.
Una configurazione del flusso di lavoro Dataform contiene le seguenti impostazioni di esecuzione:
- ID della configurazione del workflow.
- Configurazione della release.
Service account.
Questo è il account di servizio associato alla configurazione del workflow. Puoi selezionare il service account Dataform predefinito o un account di servizio associato al tuo progetto Google Cloud oppure puoi inserire manualmente un account di servizio diverso. Per impostazione predefinita, le configurazioni del flusso di lavoro utilizzano gli stessi service account dei relativi repository.
Le credenziali del service account sono il metodo di autorizzazione predefinito per la creazione e le esecuzioni della configurazione del workflow pianificato.
Credenziali utente dell'Account Google (anteprima)
Le credenziali utente dell'account Google sono il metodo di autorizzazione predefinito per la creazione e l'esecuzione manuale e non pianificata della configurazione del workflow. Per maggiori informazioni, vedi Autorizzare l'Account Google.
Azioni del workflow da eseguire:
- Tutte le azioni.
- Selezione di azioni.
- Selezione di tag.
Pianificazione dell'esecuzione e fuso orario.
Crea una configurazione di flusso di lavoro
Per creare una configurazione del flusso di lavoro Dataform:
- Nel repository, vai a Releases & Scheduling (Uscite e programmazione).
- Nella sezione Configurazioni dei flussi di lavoro, fai clic su Crea.
Nel riquadro Crea configurazione del flusso di lavoro, inserisci un ID univoco per la configurazione del flusso di lavoro nel campo ID configurazione.
Gli ID possono includere solo numeri, lettere, trattini e trattini bassi.
Nel menu Configurazione della release, seleziona una configurazione della release di compilazione.
Nella sezione Autenticazione, autorizza la configurazione del flusso di lavoro con le credenziali utente del tuo Account Google o un account di servizio.
- Per utilizzare le credenziali utente del tuo Account Google (Anteprima), seleziona Esegui con le mie credenziali utente.
- Per utilizzare un account di servizio, seleziona Esegui con il service account selezionato, quindi seleziona il account di servizio Dataform predefinito o qualsiasi account di servizio associato al tuo progettoGoogle Cloud a cui hai accesso. Se non selezioni un service account, la configurazione del flusso di lavoro utilizza il account di servizio del repository.
(Facoltativo) Nel campo Frequenza di pianificazione, inserisci la frequenza di esecuzione nel formato unix-cron.
Per verificare che Dataform esegua l'ultimo risultato di compilazione nella configurazione della release corrispondente, mantieni una pausa di almeno un'ora tra il momento della creazione del risultato di compilazione e il momento dell'esecuzione pianificata.
(Facoltativo) Nel menu Fuso orario, seleziona il fuso orario per le esecuzioni.
Il fuso orario predefinito è UTC.
Seleziona le azioni del workflow da eseguire:
- Per eseguire l'intero flusso di lavoro, fai clic su Tutte le azioni.
- Per eseguire le azioni selezionate nel flusso di lavoro, fai clic su Selezione delle azioni e poi seleziona le azioni.
- Per eseguire azioni con i tag selezionati, fai clic su Selezione dei tag e poi seleziona i tag.
- (Facoltativo) Per eseguire le azioni o i tag selezionati e le relative dipendenze, seleziona l'opzione Includi dipendenze.
- (Facoltativo) Per eseguire le azioni o i tag selezionati e i relativi elementi dipendenti, seleziona l'opzione Includi elementi dipendenti.
- (Facoltativo) Per ricreare tutte le tabelle da zero, seleziona l'opzione Esegui con aggiornamento completo.
Senza questa opzione, Dataform aggiorna le tabelle incrementali senza ricrearle da zero.
Fai clic su Crea. Se hai selezionato Esegui con le mie credenziali utente come metodo di autenticazione, devi autorizzare il tuo Account Google (Anteprima).
Ad esempio, la seguente configurazione del flusso di lavoro esegue azioni con il tag
hourly
ogni ora nel fuso orario CEST:
- ID configurazione:
production-hourly
- Configurazione della release: -
- Frequenza:
0 * * * *
- Fuso orario:
Central European Summer Time (CEST)
- Selezione delle azioni del workflow: selezione dei tag, tag
hourly
Autorizzare l'Account Google
Per autenticare la risorsa con le credenziali utente del tuo Account Google, devi concedere manualmente l'autorizzazione alle pipeline BigQuery per ottenere il token di accesso per il tuo Account Google e accedere ai dati di origine per tuo conto. Puoi concedere l'approvazione manuale con l'interfaccia della finestra di dialogo OAuth.
Devi concedere l'autorizzazione alle pipeline BigQuery una sola volta.
Per revocare l'autorizzazione che hai concesso:
- Vai alla pagina del tuo Account Google.
- Fai clic su BigQuery Pipelines.
- Fai clic su Rimuovi accesso.
La modifica del proprietario della configurazione del workflow tramite l'aggiornamento delle credenziali richiede anche l'approvazione manuale se il nuovo proprietario dell'Account Google non ha mai creato una configurazione del workflow.
Modificare una configurazione del workflow
Per modificare una configurazione del flusso di lavoro:
- Nel repository, vai a Releases & Scheduling (Uscite e programmazione).
- Accanto alla configurazione del flusso di lavoro da modificare, fai clic sul menu Altro e poi su Modifica.
- Nel riquadro Modifica configurazione del flusso di lavoro, modifica le impostazioni della configurazione della release e poi fai clic su Salva.
Elimina una configurazione del workflow
Per eliminare una configurazione del flusso di lavoro:
- Nel repository, vai a Releases & Scheduling (Uscite e programmazione).
- Accanto alla configurazione del flusso di lavoro che vuoi eliminare, fai clic sul menu Altro e poi su Elimina.
- Nella finestra di dialogo Elimina configurazione della release, fai clic su Elimina.
Pianificare le esecuzioni con Workflows e Cloud Scheduler
Questa sezione mostra come pianificare le esecuzioni dei flussi di lavoro Dataform utilizzando Workflows e Cloud Scheduler.
Informazioni sulle esecuzioni del workflow pianificate
Puoi impostare la frequenza di esecuzione del flusso di lavoro Dataform creando un job Cloud Scheduler che attiva un flusso di lavoro Workflows. Workflows esegue i servizi in un flusso di lavoro di orchestrazione definito da te.
Workflows esegue il workflow Dataform in un processo in due passaggi. Innanzitutto, estrae il codice del repository Dataform dal tuo provider Git e lo compila in un risultato di compilazione. Poi utilizza il risultato della compilazione per creare un workflow Dataform e lo esegue con la frequenza che hai impostato.
Crea un workflow di orchestrazione pianificato
Per pianificare le esecuzioni del flusso di lavoro Dataform, utilizza Workflows per creare un flusso di lavoro di orchestrazione e aggiungere un job Cloud Scheduler come trigger.
Workflows utilizza i service account per concedere ai workflow l'accesso alle risorseGoogle Cloud . Crea un service account e concedigli il ruolo Identity and Access Management Editor Dataform (
roles/dataform.editor
), nonché le autorizzazioni minime necessarie per gestire il flusso di lavoro di orchestrazione. Per ulteriori informazioni, vedi Concedi l'autorizzazione dei workflow per l'accesso alle Google Cloud risorse.Crea un workflow di orchestrazione e utilizza il seguente codice sorgente YAML come definizione del workflow:
main: steps: - init: assign: - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID - createCompilationResult: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"} auth: type: OAuth2 body: gitCommitish: GIT_COMMITISH result: compilationResult - createWorkflowInvocation: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"} auth: type: OAuth2 body: compilationResult: ${compilationResult.body.name} result: workflowInvocation - complete: return: ${workflowInvocation.body.name}
Sostituisci quanto segue:
- PROJECT_ID: l'ID del tuo Google Cloud progetto.
- REPOSITORY_LOCATION: la posizione del repository Dataform.
- REPOSITORY_ID: il nome del tuo repository Dataform.
- GIT_COMMITISH: il ramo Git da cui vuoi eseguire il codice Dataform. Per un repository appena creato, sostituisci con
main
.
Pianifica il flusso di lavoro di orchestrazione utilizzando Cloud Scheduler.
Personalizza la richiesta di creazione del risultato di compilazione del workflow Dataform
Puoi
aggiornare il flusso di lavoro di orchestrazione esistente
e definire le impostazioni della richiesta di creazione del risultato di compilazione del workflow Dataform in formato YAML. Per saperne di più sulle impostazioni,
consulta la
guida di riferimento alle risorse REST projects.locations.repositories.compilationResults
.
Ad esempio, per aggiungere un'impostazione _dev
schemaSuffix
a tutte le azioni durante la compilazione,
sostituisci il corpo del passaggio createCompilationResult
con il seguente snippet di codice:
- createCompilationResult:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
auth:
type: OAuth2
body:
gitCommitish: GIT_COMMITISH
codeCompilationConfig:
schemaSuffix: dev
Puoi anche passare impostazioni aggiuntive come argomenti di runtime in una richiesta di esecuzione di Workflows e accedere a questi argomenti utilizzando le variabili. Per ulteriori informazioni, consulta Passare argomenti di runtime in una richiesta di esecuzione.
Personalizza la richiesta di invocazione del workflow Dataform
Puoi
aggiornare il flusso di lavoro di orchestrazione esistente
e definire le impostazioni della richiesta di chiamata del flusso di lavoro Dataform in
formato YAML. Per saperne di più sulle impostazioni della richiesta di chiamata,
consulta il
riferimento alla risorsa REST projects.locations.repositories.workflowInvocations
.
Ad esempio, per eseguire solo le azioni con il tag hourly
con tutte le
dipendenze transitive incluse, sostituisci il corpo createWorkflowInvocation
con il seguente snippet di codice:
- createWorkflowInvocation:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
auth:
type: OAuth2
body:
compilationResult: ${compilationResult.body.name}
invocationConfig:
includedTags:
- hourly
transitiveDependenciesIncluded: true
Puoi anche passare impostazioni aggiuntive come argomenti di runtime in una richiesta di esecuzione di Workflows e accedere a questi argomenti utilizzando le variabili. Per ulteriori informazioni, consulta Passare argomenti di runtime in una richiesta di esecuzione.
Pianificare le esecuzioni con Cloud Composer
Puoi utilizzare Cloud Composer 2 per pianificare le esecuzioni di Dataform. Dataform non supporta Cloud Composer 1.
Per gestire le pianificazioni per le esecuzioni di Dataform con Cloud Composer 2, puoi utilizzare gli operatori Dataform nei grafi aciclici diretti (DAG) di Airflow. Puoi creare un DAG Airflow che pianifica le chiamate del workflow Dataform.
Dataform fornisce vari operatori Airflow. Sono inclusi operatori per ottenere un risultato di compilazione, ottenere una chiamata del workflow e annullare una chiamata del workflow. Per visualizzare l'elenco completo degli operatori Airflow di Dataform disponibili, consulta Operatori Google Dataform.
Installa il pacchetto PyPi google-cloud-dataform
Se utilizzi Cloud Composer 2 versioni 2.0.25
e successive, questo pacchetto
è preinstallato nel tuo ambiente. Non è necessario installarlo.
Se utilizzi versioni precedenti di Cloud Composer 2,
installa il pacchetto PyPi google-cloud-dataform
.
Nella sezione Pacchetti PyPI, specifica la versione ==0.2.0
.
Crea un DAG Airflow che pianifica le chiamate del workflow Dataform
Per gestire le esecuzioni pianificate dei workflow Dataform con Cloud Composer 2, scrivi il DAG utilizzando gli operatori Dataform Airflow, poi caricalo nel bucket del tuo ambiente.
Il seguente esempio di codice mostra un DAG Airflow che crea un risultato di compilazione Dataform e avvia una chiamata del flusso di lavoro Dataform:
from datetime import datetime
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
Sostituisci quanto segue:
- PROJECT_ID: il tuo ID progetto Google Cloud Dataform.
- REPOSITORY_ID: il nome del tuo repository Dataform.
- REGION: la regione in cui si trova il repository Dataform.
- COMPILATION_RESULT: il nome del risultato della compilazione che vuoi utilizzare per questa chiamata del flusso di lavoro.
- GIT_COMMITISH: il commitish Git nel repository Git remoto della versione del codice che vuoi utilizzare, ad esempio un ramo o uno SHA Git.
Il seguente esempio di codice mostra un DAG Airflow che esegue le seguenti operazioni:
- Crea un risultato di compilazione Dataform.
- Avvia una chiamata asincrona del workflow Dataform.
- Esegue il polling dello stato del flusso di lavoro finché non raggiunge lo stato previsto
utilizzando
DataformWorkflowInvocationStateSensor
.
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
Sostituisci quanto segue:
- PROJECT_ID: il tuo ID progetto Google Cloud Dataform.
- REPOSITORY_ID: il nome del tuo repository Dataform.
- REGION: la regione in cui si trova il repository Dataform.
- COMPILATION_RESULT: il nome del risultato della compilazione che vuoi utilizzare per questa chiamata del flusso di lavoro.
- GIT_COMMITISH: il commitish Git nel repository Git remoto della versione del codice che vuoi utilizzare, ad esempio un ramo o uno SHA Git.
- COMPILATION_RESULT: il nome del risultato della compilazione che vuoi utilizzare per questa chiamata del flusso di lavoro.
Aggiungi parametri di configurazione della compilazione
Puoi aggiungere altri parametri di configurazione della compilazione all'oggetto
DAG Airflow create_compilation_result
. Per saperne di più sui parametri disponibili, consulta il riferimento API Dataform CodeCompilationConfig
.
Per aggiungere parametri di configurazione della compilazione all'oggetto DAG di
create_compilation_result
Airflow, aggiungi i parametri selezionati al campocode_compilation_config
nel seguente formato:create_compilation_result = DataformCreateCompilationResultOperator( task_id="create_compilation_result", project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, compilation_result={ "git_commitish": GIT_COMMITISH, "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"} }, )
Sostituisci quanto segue:
- PROJECT_ID: il tuo ID progetto Google Cloud Dataform.
- REPOSITORY_ID: il nome del tuo repository Dataform.
- REGION: la regione in cui si trova il repository Dataform.
- GIT_COMMITISH: il commitish Git nel repository Git remoto della versione del codice che vuoi utilizzare, ad esempio un ramo o uno SHA Git.
- PARAMETER: il parametro
CodeCompilationConfig
selezionato. Puoi aggiungere più parametri. - PARAMETER_VALUE: il valore del parametro selezionato.
Il seguente esempio di codice mostra il parametro defaultDatabase
aggiunto all'oggetto
DAG Airflow create_compilation_result
:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
Aggiungere parametri di configurazione di chiamata del workflow
Puoi aggiungere parametri di configurazione aggiuntivi per l'invocazione del workflow all'oggetto DAG di Airflow create_workflow_invocation
. Per saperne di più sui parametri disponibili, consulta il riferimento API Dataform InvocationConfig
.
Per aggiungere i parametri di configurazione dell'invocazione del workflow all'oggetto DAG di Airflow
create_workflow_invocation
, aggiungi i parametri selezionati al campoinvocation_config
nel seguente formato:create_workflow_invocation = DataformCreateWorkflowInvocationOperator( task_id='create_workflow_invocation', project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, workflow_invocation={ "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}", "invocation_config": { "PARAMETER": PARAMETER_VALUE } }, )
Sostituisci quanto segue:
- PROJECT_ID: il tuo ID progetto Google Cloud Dataform.
- REPOSITORY_ID: il nome del tuo repository Dataform.
- REGION: la regione in cui si trova il repository Dataform.
- PARAMETER: il parametro
InvocationConfig
selezionato. Puoi aggiungere più parametri. - PARAMETER_VALUE: il valore del parametro selezionato.
Il seguente esempio di codice mostra i parametri includedTags[]
e
transitiveDependenciesIncluded
aggiunti all'oggetto
create_workflow_invocation
DAG di Airflow:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
},
)
Passaggi successivi
- Per scoprire come configurare le configurazioni di rilascio della compilazione di Dataform, consulta Creare una configurazione di rilascio.
- Per scoprire di più sul ciclo di vita del codice Dataform, consulta Introduzione al ciclo di vita del codice in Dataform.
- Per scoprire di più sull'API Dataform, consulta API Dataform.
- Per saperne di più sugli ambienti Cloud Composer, consulta la Panoramica di Cloud Composer.
- Per scoprire di più sui prezzi di Workflows, consulta la pagina Prezzi di Workflows.