Pianificare le esecuzioni

Questo documento mostra come eseguire le seguenti operazioni in Dataform:

Prima di iniziare

Per pianificare le esecuzioni con le configurazioni dei flussi di lavoro o pianificare le esecuzioni con i flussi di lavoro e Cloud Scheduler, assicurati di svolgere le seguenti operazioni:

  1. Nella console Google Cloud, vai alla pagina Dataform.

    Vai a Dataform

  2. Seleziona o crea un repository.

  3. Crea una configurazione della release.

Per pianificare le esecuzioni con Cloud Composer, assicurati di svolgere quanto segue:

  1. Seleziona o crea un repository Dataform.
  2. Concedi a Dataform l'accesso a BigQuery.
  3. Seleziona o crea uno spazio di lavoro Dataform.
  4. Crea almeno una tabella.
  5. Crea un ambiente Cloud Composer 2.

Ruoli obbligatori

Per ottenere le autorizzazioni necessarie per completare le attività descritte in questo documento, chiedi all'amministratore di concederti i seguenti ruoli IAM:

Per ulteriori informazioni sulla concessione dei ruoli, consulta Gestire l'accesso a progetti, cartelle e organizzazioni.

Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.

Per utilizzare un account di servizio diverso dall'account di servizio Dataform predefinito, concedi l'accesso all'account di servizio personalizzato.

Pianificare le esecuzioni con le configurazioni del flusso di lavoro

Questa sezione mostra come creare una configurazione del flusso di lavoro in Dataform per pianificare e configurare le esecuzioni del flusso di lavoro. Puoi utilizzare le configurazioni del workflow per eseguire i flussi di lavoro Dataform in base a una pianificazione.

Informazioni sulle configurazioni dei workflow

Per pianificare le esecuzioni di Dataform di tutte o di alcune azioni di workflow in BigQuery, puoi creare configurazioni di workflow. In una configurazione del flusso di lavoro, seleziona una configurazione della release di compilazione, le azioni del flusso di lavoro per l'esecuzione e imposta la pianificazione dell'esecuzione.

Poi, durante un'esecuzione pianificata della configurazione del flusso di lavoro, Dataform esegue il deployment della selezione di azioni dall'ultimo risultato di compilazione nella configurazione della release in BigQuery. Puoi anche attivare manualmente l'esecuzione di una configurazione del flusso di lavoro con workflowConfigs dell'API Dataform.

Una configurazione del flusso di lavoro Dataform contiene le seguenti impostazioni di esecuzione:

  • ID della configurazione del flusso di lavoro.
  • Configurazione della release.
  • Account di servizio.

    Si tratta dell'account di servizio associato alla configurazione del flusso di lavoro. Puoi selezionare l'account di servizio Dataform predefinito o un account di servizio associato al tuo progetto Google Cloud oppure inserire manualmente un altro account di servizio. Per impostazione predefinita, le configurazioni dei flussi di lavoro utilizzano gli stessi account di servizio dei relativi repository.

  • Azioni del flusso di lavoro da eseguire:

    • Tutte le azioni.
    • Selezione di azioni.
    • Selezione di tag.
  • Pianificazione delle esecuzioni e fuso orario.

Crea una configurazione di flusso di lavoro

Per creare una configurazione del flusso di lavoro di Dataform:

  1. Nel repository, vai a Uscite e pianificazione.
  2. Nella sezione Configurazioni dei flussi di lavoro, fai clic su Crea.
  3. Nel riquadro Crea configurazione del flusso di lavoro, inserisci un ID univoco per la configurazione del flusso di lavoro nel campo ID configurazione.

    Gli ID possono includere solo numeri, lettere, trattini e trattini bassi.

  4. Nel menu Configurazione della release, seleziona una configurazione della release della compilazione.

  5. (Facoltativo) Nel campo Frequenza, inserisci la frequenza delle esecuzioni nel formato unix-cron.

    Per assicurarti che Dataform esegua il risultato di compilazione più recente nella configurazione della release corrispondente, mantieni un'interruzione minima di un'ora tra l'ora di creazione del risultato di compilazione e l'ora di esecuzione pianificata.

  6. Nel menu Service account, seleziona un service account per la configurazione del flusso di lavoro.

    Nel menu, puoi selezionare l'account di servizio Dataform predefinito o qualsiasi account di servizio associato al tuo progetto Google Cloud a cui hai accesso. Se non selezioni un account di servizio, la configurazione del flusso di lavoro utilizza l'account di servizio del repository.

  7. (Facoltativo) Nel menu Fuso orario, seleziona il fuso orario per le esecuzioni.

    Il fuso orario predefinito è UTC.

  8. Seleziona le azioni del flusso di lavoro da eseguire:

    • Per eseguire l'intero flusso di lavoro, fai clic su Tutte le azioni.
    • Per eseguire le azioni selezionate nel flusso di lavoro, fai clic su Selezione di azioni e poi seleziona le azioni.
    • Per eseguire azioni con i tag selezionati, fai clic su Selezione di tag e seleziona i tag.
    • (Facoltativo) Per eseguire le azioni o i tag selezionati e le relative dipendenze, seleziona l'opzione Includi dipendenze.
    • (Facoltativo) Per eseguire le azioni o i tag selezionati e i relativi elementi dipendenti, seleziona l'opzione Includi elementi dipendenti.
    • (Facoltativo) Per ricostruire tutte le tabelle da zero, seleziona l'opzione Esegui con aggiornamento completo.

    Senza questa opzione, Dataform aggiorna le tabelle incrementali senza ricostruirle da zero.

  9. Fai clic su Crea.

Ad esempio, la seguente configurazione del flusso di lavoro esegue azioni con il hourly tag ogni ora nel fuso orario CEST:

  • ID configurazione: production-hourly
  • Configurazione della release: -
  • Frequenza: 0 * * * *
  • Fuso orario: Central European Summer Time (CEST)
  • Selezione di azioni di flusso di lavoro: selezione di tag, tag hourly

Modificare una configurazione del flusso di lavoro

Per modificare una configurazione del flusso di lavoro:

  1. Nel repository, vai a Uscite e pianificazione.
  2. Accanto alla configurazione del flusso di lavoro che vuoi modificare, fai clic sul menu Altro e poi su Modifica.
  3. Nel riquadro Modifica configurazione del flusso di lavoro, modifica le impostazioni della configurazione della release e poi fai clic su Salva.

Eliminare una configurazione del flusso di lavoro

Per eliminare una configurazione del flusso di lavoro:

  1. Nel repository, vai a Uscite e pianificazione.
  2. Accanto alla configurazione del flusso di lavoro che vuoi eliminare, fai clic sul menu Altro e poi su Elimina.
  3. Nella finestra di dialogo Elimina configurazione della release, fai clic su Elimina.

Pianificare le esecuzioni con Workflows e Cloud Scheduler

Questa sezione mostra come pianificare le esecuzioni dei flussi di lavoro Dataform utilizzando Workflows e Cloud Scheduler.

Informazioni sulle esecuzioni pianificate del flusso di lavoro

Puoi impostare la frequenza di esecuzione dei flussi di lavoro di Dataform creando un job Cloud Scheduler che attivi un flusso di lavoro Workflows. Workflows esegue i servizi in un flusso di lavoro di orchestrazione definito da te.

Workflows esegue il flusso di lavoro Dataform in un procedura in due fasi. Innanzitutto, estrae il codice del repository Dataform dal provider Git e lo compila in un risultato di compilazione. Quindi, utilizza il risultato della compilazione per creare un flusso di lavoro Dataform e lo esegue con la frequenza impostata.

Creare un flusso di lavoro di orchestrazione pianificato

Per pianificare le esecuzioni del flusso di lavoro Dataform, utilizza Workflows per creare un flusso di lavoro di orchestrazione e aggiungi un job Cloud Scheduler come attivatore.

  1. Workflows utilizza gli account di servizio per concedere ai flussi di lavoro l'accesso alle Google Cloud risorse. Crea un account di servizio e concedigli il ruolo di Gestione di identità e accessi Editor di Dataform (roles/dataform.editor) nonché le autorizzazioni minime necessarie per gestire il flusso di lavoro di orchestrazione. Per ulteriori informazioni, vedi Concedere l'autorizzazione dei workflow per l'accesso alle Google Cloud risorse.

  2. Crea un flusso di lavoro di orchestrazione e utilizza il seguente codice sorgente YAML come definizione del flusso di lavoro:

    main:
        steps:
        - init:
            assign:
            - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID
        - createCompilationResult:
            call: http.post
            args:
                url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
                auth:
                    type: OAuth2
                body:
                    gitCommitish: GIT_COMMITISH
            result: compilationResult
        - createWorkflowInvocation:
            call: http.post
            args:
                url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
                auth:
                    type: OAuth2
                body:
                    compilationResult: ${compilationResult.body.name}
            result: workflowInvocation
        - complete:
            return: ${workflowInvocation.body.name}
    

    Sostituisci quanto segue:

    • PROJECT_ID: l'ID del tuo Google Cloud progetto.
    • REPOSITORY_LOCATION: la posizione del repository Dataform.
    • REPOSITORY_ID: il nome del tuo repository Dataform.
    • GIT_COMMITISH: il ramo Git da cui vuoi eseguire il codice Dataform. Per un repository appena creato, sostituisci con main.
  3. Pianifica il flusso di lavoro di orchestrazione utilizzando Cloud Scheduler.

Personalizzare la richiesta di risultato della compilazione della creazione del flusso di lavoro Dataform

Puoi aggiornare il flusso di lavoro di orchestrazione esistente e definire le impostazioni della richiesta di creazione del risultato della compilazione del flusso di lavoro Dataform in formato YAML. Per ulteriori informazioni sulle impostazioni, consulta la projects.locations.repositories.compilationResults guida di riferimento alle risorse REST.

Ad esempio, per aggiungere un'impostazione _dev schemaSuffix a tutte le azioni durante la compilazione, sostituire il corpo del passaggio createCompilationResult con il seguente snippet di codice:

    - createCompilationResult:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
            auth:
                type: OAuth2
            body:
                gitCommitish: GIT_COMMITISH
                codeCompilationConfig:
                    schemaSuffix: dev

Puoi anche passare impostazioni aggiuntive come argomenti di runtime in una richiesta di esecuzione di Workflows e accedere a questi argomenti utilizzando le variabili. Per ulteriori informazioni, consulta Passare gli argomenti di runtime in una richiesta di esecuzione.

Personalizzare la richiesta di chiamata del flusso di lavoro Dataform

Puoi aggiornare il flusso di lavoro di orchestrazione esistente e definire le impostazioni della richiesta di chiamata del flusso di lavoro Dataform in formato YAML. Per ulteriori informazioni sulle impostazioni della richiesta di invocazione, consulta la projects.locations.repositories.workflowInvocationsguida di riferimento delle risorse REST.

Ad esempio, per eseguire solo azioni con il tag hourly con tutte le dipendenze trascendenti incluse, sostituisci il corpo createWorkflowInvocation con lo snippet di codice seguente:

    - createWorkflowInvocation:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
            auth:
                type: OAuth2
            body:
                compilationResult: ${compilationResult.body.name}
                invocationConfig:
                    includedTags:
                    - hourly
                    transitiveDependenciesIncluded: true
                

Puoi anche passare impostazioni aggiuntive come argomenti di runtime in una richiesta di esecuzione di Workflows e accedere a questi argomenti utilizzando le variabili. Per ulteriori informazioni, consulta Passare gli argomenti di runtime in una richiesta di esecuzione.

Pianificare le esecuzioni con Cloud Composer

Puoi utilizzare Cloud Composer 2 per pianificare le esecuzioni di Dataform. Dataform non supporta Cloud Composer 1.

Per gestire le pianificazioni delle esecuzioni di Dataform con Cloud Composer 2, puoi utilizzare operatori Dataform nei grafici diretti aciclici (DAG) di Airflow. Puoi creare un DAG Airflow che pianifica le invocazioni del flusso di lavoro Dataform.

Dataform fornisce vari operatori Airflow. Sono inclusi gli operatori per ottenere un risultato di compilazione, un'invocazione di flusso di lavoro e l'annullamento di un'invocazione di flusso di lavoro. Per visualizzare l'elenco completo degli operatori Dataform Airflow disponibili, consulta Operatori Google Dataform.

Installa il pacchetto PyPi google-cloud-dataform

Se utilizzi le versioni 2.0.25 e successive di Cloud Composer 2, questo pacchetto è preinstallato nel tuo ambiente. Non è necessario installarlo.

Se utilizzi versioni precedenti di Cloud Composer 2, installa il pacchetto PyPi google-cloud-dataform.

Nella sezione dei pacchetti PyPI, specifica la versione ==0.2.0.

Crea un DAG Airflow che pianifica le invocazioni del flusso di lavoro Dataform

Per gestire le esecuzioni pianificate dei flussi di lavoro Dataform con Cloud Composer 2, scrivi il DAG utilizzando gli operatori Airflow di Dataform, quindi caricalo nel bucket del tuo ambiente.

Il seguente esempio di codice mostra un DAG Airflow che crea un risultato di compilazione Dataform e avvia un'invocazione del flusso di lavoro Dataform:

from datetime import datetime

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )
    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )


create_compilation_result >> create_workflow_invocation

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del tuo progetto Google Cloud Dataform.
  • REPOSITORY_ID: il nome del tuo repository Dataform.
  • REGION: la regione in cui si trova il repository Dataform.
  • COMPILATION_RESULT: il nome del risultato della compilazione che vuoi utilizzare per l'invocazione di questo flusso di lavoro.
  • GIT_COMMITISH: il commitish Git nel repository Git remoto della versione del codice che vuoi utilizzare, ad esempio un ramo o un SHA Git.

Il seguente esempio di codice mostra un DAG Airflow che esegue le seguenti operazioni:

  1. Crea un risultato di compilazione di Dataform.
  2. Avvia un'invocazione asincrona del flusso di lavoro Dataform.
  3. Esegue il polling dello stato del flusso di lavoro finché non entra nello stato previsto utilizzando DataformWorkflowInvocationStateSensor.
from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": COMPILATION_RESULT
    }
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)


create_compilation_result >> create_workflow_invocation

Sostituisci quanto segue:

  • PROJECT_ID: il tuo ID progetto Google Cloud Dataform.
  • REPOSITORY_ID: il nome del tuo repository Dataform.
  • REGION: la regione in cui si trova il repository Dataform.
  • COMPILATION_RESULT: il nome del risultato della compilazione che vuoi utilizzare per l'invocazione di questo flusso di lavoro.
  • GIT_COMMITISH: il commitish Git nel repository Git remoto della versione del codice che vuoi utilizzare, ad esempio un ramo o un SHA Git.
  • COMPILATION_RESULT: il nome del risultato della compilazione che vuoi utilizzare per l'invocazione di questo flusso di lavoro.

Aggiungere i parametri di configurazione della compilazione

Puoi aggiungere altri parametri di configurazione della compilazione all'create_compilation_resultoggetto DAG di Airflow. Per ulteriori informazioni sui parametri disponibili, consulta il CodeCompilationConfig riferimento all'API Dataform.

  • Per aggiungere parametri di configurazione della compilazione all'oggetto DAG create_compilation_result Airflow, aggiungi i parametri selezionati al campo code_compilation_config nel seguente formato:

        create_compilation_result = DataformCreateCompilationResultOperator(
            task_id="create_compilation_result",
            project_id=PROJECT_ID,
            region=REGION,
            repository_id=REPOSITORY_ID,
            compilation_result={
                "git_commitish": GIT_COMMITISH,
                "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
            },
        )
    

    Sostituisci quanto segue:

    • PROJECT_ID: l'ID del tuo progetto Google Cloud Dataform.
    • REPOSITORY_ID: il nome del tuo repository Dataform.
    • REGION: la regione in cui si trova il repository Dataform.
    • GIT_COMMITISH: il commitish Git nel repository Git remoto della versione del codice che vuoi utilizzare, ad esempio un ramo o un SHA Git.
    • PARAMETER: il parametro CodeCompilationConfig selezionato. Puoi aggiungere più parametri.
    • PARAMETER_VALUE: il valore del parametro selezionato.

Il seguente esempio di codice mostra il parametro defaultDatabase aggiunto all'oggetto DAG Airflow create_compilation_result:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": REMOTE_BRANCH,
            "code_compilation_config": { "default_database": "my-custom-gcp-project"}
        },
    )

Aggiungi i parametri di configurazione dell'invocazione del workflow

Puoi aggiungere altri parametri di configurazione dell'invocazione del flusso di lavoro all'create_workflow_invocationoggetto DAG di Airflow. Per ulteriori informazioni sui parametri disponibili, consulta il InvocationConfig riferimento all'API Dataform.

  • Per aggiungere i parametri di configurazione dell'invocazione del flusso di lavoro all'create_workflow_invocationoggetto DAG Airflow, aggiungi i parametri selezionati al campo invocation_config nel seguente formato:

        create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
            task_id='create_workflow_invocation',
            project_id=PROJECT_ID,
            region=REGION,
            repository_id=REPOSITORY_ID,
            workflow_invocation={
                "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
                "invocation_config": { "PARAMETER": PARAMETER_VALUE }
            },
        )
    
    

    Sostituisci quanto segue:

    • PROJECT_ID: l'ID del tuo progetto Google Cloud Dataform.
    • REPOSITORY_ID: il nome del tuo repository Dataform.
    • REGION: la regione in cui si trova il repository Dataform.
    • PARAMETER: il parametro InvocationConfig selezionato. Puoi aggiungere più parametri.
    • PARAMETER_VALUE: il valore del parametro selezionato.

Il seguente esempio di codice mostra i parametri includedTags[] e transitiveDependenciesIncluded aggiunti all'oggetto DAG create_workflow_invocation Airflow:

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
        },
    )

Passaggi successivi