Pianificare le esecuzioni

Questo documento mostra come eseguire le seguenti operazioni in Dataform:

Prima di iniziare

Per pianificare le esecuzioni con le configurazioni del flusso di lavoro o pianificare le esecuzioni con i flussi di lavoro e Cloud Scheduler, procedi nel seguente modo:

  1. Nella console Google Cloud , vai alla pagina Dataform.

    Vai a Dataform

  2. Seleziona o crea un repository.

  3. Crea una configurazione della release.

Per pianificare le esecuzioni con Cloud Composer:

  1. Seleziona o crea un repository Dataform.
  2. Concedere a Dataform l'accesso a BigQuery.
  3. Seleziona o crea un workspace Dataform.
  4. Crea almeno una tabella.
  5. Crea un ambiente Cloud Composer 2.

Ruoli obbligatori

Per ottenere le autorizzazioni necessarie per completare le attività descritte in questo documento, chiedi all'amministratore di concederti i seguenti ruoli IAM:

Per ulteriori informazioni sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.

Per utilizzare un account di servizio diverso da quello predefinito di Dataform, concedi l'accesso al service account personalizzato.

Per abilitare le esecuzioni pianificate per una configurazione del workflow quando è abilitata la modalità act-as rigorosa, devi concedere l'autorizzazione iam.serviceAccounts.actAs al service account Dataform per il account di servizio utilizzato nella configurazione del workflow. Questa autorizzazione è disponibile nel ruolo Utente service account (roles/iam.serviceAccountUser).

Per utilizzare le credenziali utente dell'Account Google durante la creazione di una configurazione del flusso di lavoro (anteprima), concedi l'accesso all'Account Google.

Pianificare le esecuzioni con le configurazioni del workflow

Questa sezione mostra come creare una configurazione del workflow in Dataform per pianificare e configurare le esecuzioni del workflow. Puoi utilizzare le configurazioni del workflow per eseguire i workflow Dataform in base a una pianificazione.

Informazioni sulle configurazioni dei workflow

Per pianificare le esecuzioni di Dataform di tutte le azioni del workflow o di quelle selezionate in BigQuery, puoi creare configurazioni del workflow. In una configurazione del workflow, seleziona una configurazione della release di compilazione, seleziona le azioni del workflow da eseguire e imposta la pianificazione dell'esecuzione.

Poi, durante un'esecuzione pianificata della configurazione del workflow, Dataform esegue il deployment della selezione di azioni dall'ultimo risultato di compilazione nella configurazione di rilascio in BigQuery. Puoi anche attivare manualmente l'esecuzione di una configurazione del flusso di lavoro con l'API Dataform workflowConfigs.

Una configurazione del flusso di lavoro Dataform contiene le seguenti impostazioni di esecuzione:

  • ID della configurazione del workflow.
  • Configurazione della release.
  • Service account.

    Questo è il account di servizio associato alla configurazione del workflow. Puoi selezionare il service account Dataform predefinito o un account di servizio associato al tuo progetto Google Cloud oppure puoi inserire manualmente un account di servizio diverso. Per impostazione predefinita, le configurazioni del flusso di lavoro utilizzano gli stessi service account dei relativi repository.

    Le credenziali del service account sono il metodo di autorizzazione predefinito per la creazione e le esecuzioni della configurazione del workflow pianificato.

  • Credenziali utente dell'Account Google (anteprima)

    Le credenziali utente dell'account Google sono il metodo di autorizzazione predefinito per la creazione e l'esecuzione manuale e non pianificata della configurazione del workflow. Per maggiori informazioni, vedi Autorizzare l'Account Google.

  • Azioni del workflow da eseguire:

    • Tutte le azioni.
    • Selezione di azioni.
    • Selezione di tag.
  • Pianificazione dell'esecuzione e fuso orario.

Crea una configurazione di flusso di lavoro

Per creare una configurazione del flusso di lavoro Dataform:

  1. Nel repository, vai a Releases & Scheduling (Uscite e programmazione).
  2. Nella sezione Configurazioni dei flussi di lavoro, fai clic su Crea.
  3. Nel riquadro Crea configurazione del flusso di lavoro, inserisci un ID univoco per la configurazione del flusso di lavoro nel campo ID configurazione.

    Gli ID possono includere solo numeri, lettere, trattini e trattini bassi.

  4. Nel menu Configurazione della release, seleziona una configurazione della release di compilazione.

  5. Nella sezione Autenticazione, autorizza la configurazione del flusso di lavoro con le credenziali utente del tuo Account Google o un account di servizio.

    • Per utilizzare le credenziali utente del tuo Account Google (Anteprima), seleziona Esegui con le mie credenziali utente.
    • Per utilizzare un account di servizio, seleziona Esegui con il service account selezionato, quindi seleziona il account di servizio Dataform predefinito o qualsiasi account di servizio associato al tuo progettoGoogle Cloud a cui hai accesso. Se non selezioni un service account, la configurazione del flusso di lavoro utilizza il account di servizio del repository.
  6. (Facoltativo) Nel campo Frequenza di pianificazione, inserisci la frequenza di esecuzione nel formato unix-cron.

    Per verificare che Dataform esegua l'ultimo risultato di compilazione nella configurazione della release corrispondente, mantieni una pausa di almeno un'ora tra il momento della creazione del risultato di compilazione e il momento dell'esecuzione pianificata.

  7. (Facoltativo) Nel menu Fuso orario, seleziona il fuso orario per le esecuzioni.

    Il fuso orario predefinito è UTC.

  8. Seleziona le azioni del workflow da eseguire:

    • Per eseguire l'intero flusso di lavoro, fai clic su Tutte le azioni.
    • Per eseguire le azioni selezionate nel flusso di lavoro, fai clic su Selezione delle azioni e poi seleziona le azioni.
    • Per eseguire azioni con i tag selezionati, fai clic su Selezione dei tag e poi seleziona i tag.
    • (Facoltativo) Per eseguire le azioni o i tag selezionati e le relative dipendenze, seleziona l'opzione Includi dipendenze.
    • (Facoltativo) Per eseguire le azioni o i tag selezionati e i relativi elementi dipendenti, seleziona l'opzione Includi elementi dipendenti.
    • (Facoltativo) Per ricreare tutte le tabelle da zero, seleziona l'opzione Esegui con aggiornamento completo.

    Senza questa opzione, Dataform aggiorna le tabelle incrementali senza ricrearle da zero.

  9. Fai clic su Crea. Se hai selezionato Esegui con le mie credenziali utente come metodo di autenticazione, devi autorizzare il tuo Account Google (Anteprima).

Ad esempio, la seguente configurazione del flusso di lavoro esegue azioni con il tag hourly ogni ora nel fuso orario CEST:

  • ID configurazione: production-hourly
  • Configurazione della release: -
  • Frequenza: 0 * * * *
  • Fuso orario: Central European Summer Time (CEST)
  • Selezione delle azioni del workflow: selezione dei tag, tag hourly

Autorizzare l'Account Google

Per autenticare la risorsa con le credenziali utente del tuo Account Google, devi concedere manualmente l'autorizzazione alle pipeline BigQuery per ottenere il token di accesso per il tuo Account Google e accedere ai dati di origine per tuo conto. Puoi concedere l'approvazione manuale con l'interfaccia della finestra di dialogo OAuth.

Devi concedere l'autorizzazione alle pipeline BigQuery una sola volta.

Per revocare l'autorizzazione che hai concesso:

  1. Vai alla pagina del tuo Account Google.
  2. Fai clic su BigQuery Pipelines.
  3. Fai clic su Rimuovi accesso.

La modifica del proprietario della configurazione del workflow tramite l'aggiornamento delle credenziali richiede anche l'approvazione manuale se il nuovo proprietario dell'Account Google non ha mai creato una configurazione del workflow.

Modificare una configurazione del workflow

Per modificare una configurazione del flusso di lavoro:

  1. Nel repository, vai a Releases & Scheduling (Uscite e programmazione).
  2. Accanto alla configurazione del flusso di lavoro da modificare, fai clic sul menu Altro e poi su Modifica.
  3. Nel riquadro Modifica configurazione del flusso di lavoro, modifica le impostazioni della configurazione della release e poi fai clic su Salva.

Elimina una configurazione del workflow

Per eliminare una configurazione del flusso di lavoro:

  1. Nel repository, vai a Releases & Scheduling (Uscite e programmazione).
  2. Accanto alla configurazione del flusso di lavoro che vuoi eliminare, fai clic sul menu Altro e poi su Elimina.
  3. Nella finestra di dialogo Elimina configurazione della release, fai clic su Elimina.

Pianificare le esecuzioni con Workflows e Cloud Scheduler

Questa sezione mostra come pianificare le esecuzioni dei flussi di lavoro Dataform utilizzando Workflows e Cloud Scheduler.

Informazioni sulle esecuzioni del workflow pianificate

Puoi impostare la frequenza di esecuzione del flusso di lavoro Dataform creando un job Cloud Scheduler che attiva un flusso di lavoro Workflows. Workflows esegue i servizi in un flusso di lavoro di orchestrazione definito da te.

Workflows esegue il workflow Dataform in un processo in due passaggi. Innanzitutto, estrae il codice del repository Dataform dal tuo provider Git e lo compila in un risultato di compilazione. Poi utilizza il risultato della compilazione per creare un workflow Dataform e lo esegue con la frequenza che hai impostato.

Crea un workflow di orchestrazione pianificato

Per pianificare le esecuzioni del flusso di lavoro Dataform, utilizza Workflows per creare un flusso di lavoro di orchestrazione e aggiungere un job Cloud Scheduler come trigger.

  1. Workflows utilizza i service account per concedere ai workflow l'accesso alle risorseGoogle Cloud . Crea un service account e concedigli il ruolo Identity and Access Management Editor Dataform (roles/dataform.editor), nonché le autorizzazioni minime necessarie per gestire il flusso di lavoro di orchestrazione. Per ulteriori informazioni, vedi Concedi l'autorizzazione dei workflow per l'accesso alle Google Cloud risorse.

  2. Crea un workflow di orchestrazione e utilizza il seguente codice sorgente YAML come definizione del workflow:

    main:
        steps:
        - init:
            assign:
            - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID
        - createCompilationResult:
            call: http.post
            args:
                url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
                auth:
                    type: OAuth2
                body:
                    gitCommitish: GIT_COMMITISH
            result: compilationResult
        - createWorkflowInvocation:
            call: http.post
            args:
                url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
                auth:
                    type: OAuth2
                body:
                    compilationResult: ${compilationResult.body.name}
            result: workflowInvocation
        - complete:
            return: ${workflowInvocation.body.name}
    

    Sostituisci quanto segue:

    • PROJECT_ID: l'ID del tuo Google Cloud progetto.
    • REPOSITORY_LOCATION: la posizione del repository Dataform.
    • REPOSITORY_ID: il nome del tuo repository Dataform.
    • GIT_COMMITISH: il ramo Git da cui vuoi eseguire il codice Dataform. Per un repository appena creato, sostituisci con main.
  3. Pianifica il flusso di lavoro di orchestrazione utilizzando Cloud Scheduler.

Personalizza la richiesta di creazione del risultato di compilazione del workflow Dataform

Puoi aggiornare il flusso di lavoro di orchestrazione esistente e definire le impostazioni della richiesta di creazione del risultato di compilazione del workflow Dataform in formato YAML. Per saperne di più sulle impostazioni, consulta la guida di riferimento alle risorse REST projects.locations.repositories.compilationResults.

Ad esempio, per aggiungere un'impostazione _dev schemaSuffix a tutte le azioni durante la compilazione, sostituisci il corpo del passaggio createCompilationResult con il seguente snippet di codice:

    - createCompilationResult:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
            auth:
                type: OAuth2
            body:
                gitCommitish: GIT_COMMITISH
                codeCompilationConfig:
                    schemaSuffix: dev

Puoi anche passare impostazioni aggiuntive come argomenti di runtime in una richiesta di esecuzione di Workflows e accedere a questi argomenti utilizzando le variabili. Per ulteriori informazioni, consulta Passare argomenti di runtime in una richiesta di esecuzione.

Personalizza la richiesta di invocazione del workflow Dataform

Puoi aggiornare il flusso di lavoro di orchestrazione esistente e definire le impostazioni della richiesta di chiamata del flusso di lavoro Dataform in formato YAML. Per saperne di più sulle impostazioni della richiesta di chiamata, consulta il riferimento alla risorsa REST projects.locations.repositories.workflowInvocations.

Ad esempio, per eseguire solo le azioni con il tag hourly con tutte le dipendenze transitive incluse, sostituisci il corpo createWorkflowInvocation con il seguente snippet di codice:

    - createWorkflowInvocation:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
            auth:
                type: OAuth2
            body:
                compilationResult: ${compilationResult.body.name}
                invocationConfig:
                    includedTags:
                    - hourly
                    transitiveDependenciesIncluded: true
                

Puoi anche passare impostazioni aggiuntive come argomenti di runtime in una richiesta di esecuzione di Workflows e accedere a questi argomenti utilizzando le variabili. Per ulteriori informazioni, consulta Passare argomenti di runtime in una richiesta di esecuzione.

Pianificare le esecuzioni con Cloud Composer

Puoi utilizzare Cloud Composer 2 per pianificare le esecuzioni di Dataform. Dataform non supporta Cloud Composer 1.

Per gestire le pianificazioni per le esecuzioni di Dataform con Cloud Composer 2, puoi utilizzare gli operatori Dataform nei grafi aciclici diretti (DAG) di Airflow. Puoi creare un DAG Airflow che pianifica le chiamate del workflow Dataform.

Dataform fornisce vari operatori Airflow. Sono inclusi operatori per ottenere un risultato di compilazione, ottenere una chiamata del workflow e annullare una chiamata del workflow. Per visualizzare l'elenco completo degli operatori Airflow di Dataform disponibili, consulta Operatori Google Dataform.

Installa il pacchetto PyPi google-cloud-dataform

Se utilizzi Cloud Composer 2 versioni 2.0.25 e successive, questo pacchetto è preinstallato nel tuo ambiente. Non è necessario installarlo.

Se utilizzi versioni precedenti di Cloud Composer 2, installa il pacchetto PyPi google-cloud-dataform.

Nella sezione Pacchetti PyPI, specifica la versione ==0.2.0.

Crea un DAG Airflow che pianifica le chiamate del workflow Dataform

Per gestire le esecuzioni pianificate dei workflow Dataform con Cloud Composer 2, scrivi il DAG utilizzando gli operatori Dataform Airflow, poi caricalo nel bucket del tuo ambiente.

Il seguente esempio di codice mostra un DAG Airflow che crea un risultato di compilazione Dataform e avvia una chiamata del flusso di lavoro Dataform:

from datetime import datetime

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )
    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )


create_compilation_result >> create_workflow_invocation

Sostituisci quanto segue:

  • PROJECT_ID: il tuo ID progetto Google Cloud Dataform.
  • REPOSITORY_ID: il nome del tuo repository Dataform.
  • REGION: la regione in cui si trova il repository Dataform.
  • COMPILATION_RESULT: il nome del risultato della compilazione che vuoi utilizzare per questa chiamata del flusso di lavoro.
  • GIT_COMMITISH: il commitish Git nel repository Git remoto della versione del codice che vuoi utilizzare, ad esempio un ramo o uno SHA Git.

Il seguente esempio di codice mostra un DAG Airflow che esegue le seguenti operazioni:

  1. Crea un risultato di compilazione Dataform.
  2. Avvia una chiamata asincrona del workflow Dataform.
  3. Esegue il polling dello stato del flusso di lavoro finché non raggiunge lo stato previsto utilizzando DataformWorkflowInvocationStateSensor.
from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": COMPILATION_RESULT
    }
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)


create_compilation_result >> create_workflow_invocation

Sostituisci quanto segue:

  • PROJECT_ID: il tuo ID progetto Google Cloud Dataform.
  • REPOSITORY_ID: il nome del tuo repository Dataform.
  • REGION: la regione in cui si trova il repository Dataform.
  • COMPILATION_RESULT: il nome del risultato della compilazione che vuoi utilizzare per questa chiamata del flusso di lavoro.
  • GIT_COMMITISH: il commitish Git nel repository Git remoto della versione del codice che vuoi utilizzare, ad esempio un ramo o uno SHA Git.
  • COMPILATION_RESULT: il nome del risultato della compilazione che vuoi utilizzare per questa chiamata del flusso di lavoro.

Aggiungi parametri di configurazione della compilazione

Puoi aggiungere altri parametri di configurazione della compilazione all'oggetto DAG Airflow create_compilation_result. Per saperne di più sui parametri disponibili, consulta il riferimento API Dataform CodeCompilationConfig.

  • Per aggiungere parametri di configurazione della compilazione all'oggetto DAG di create_compilation_result Airflow, aggiungi i parametri selezionati al campo code_compilation_config nel seguente formato:

        create_compilation_result = DataformCreateCompilationResultOperator(
            task_id="create_compilation_result",
            project_id=PROJECT_ID,
            region=REGION,
            repository_id=REPOSITORY_ID,
            compilation_result={
                "git_commitish": GIT_COMMITISH,
                "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
            },
        )
    

    Sostituisci quanto segue:

    • PROJECT_ID: il tuo ID progetto Google Cloud Dataform.
    • REPOSITORY_ID: il nome del tuo repository Dataform.
    • REGION: la regione in cui si trova il repository Dataform.
    • GIT_COMMITISH: il commitish Git nel repository Git remoto della versione del codice che vuoi utilizzare, ad esempio un ramo o uno SHA Git.
    • PARAMETER: il parametro CodeCompilationConfig selezionato. Puoi aggiungere più parametri.
    • PARAMETER_VALUE: il valore del parametro selezionato.

Il seguente esempio di codice mostra il parametro defaultDatabase aggiunto all'oggetto DAG Airflow create_compilation_result:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": REMOTE_BRANCH,
            "code_compilation_config": { "default_database": "my-custom-gcp-project"}
        },
    )

Aggiungere parametri di configurazione di chiamata del workflow

Puoi aggiungere parametri di configurazione aggiuntivi per l'invocazione del workflow all'oggetto DAG di Airflow create_workflow_invocation. Per saperne di più sui parametri disponibili, consulta il riferimento API Dataform InvocationConfig.

  • Per aggiungere i parametri di configurazione dell'invocazione del workflow all'oggetto DAG di Airflow create_workflow_invocation, aggiungi i parametri selezionati al campo invocation_config nel seguente formato:

        create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
            task_id='create_workflow_invocation',
            project_id=PROJECT_ID,
            region=REGION,
            repository_id=REPOSITORY_ID,
            workflow_invocation={
                "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
                "invocation_config": { "PARAMETER": PARAMETER_VALUE }
            },
        )
    
    

    Sostituisci quanto segue:

    • PROJECT_ID: il tuo ID progetto Google Cloud Dataform.
    • REPOSITORY_ID: il nome del tuo repository Dataform.
    • REGION: la regione in cui si trova il repository Dataform.
    • PARAMETER: il parametro InvocationConfig selezionato. Puoi aggiungere più parametri.
    • PARAMETER_VALUE: il valore del parametro selezionato.

Il seguente esempio di codice mostra i parametri includedTags[] e transitiveDependenciesIncluded aggiunti all'oggetto create_workflow_invocation DAG di Airflow:

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
        },
    )

Passaggi successivi