Planifier des exécutions

Ce document vous explique comment effectuer les opérations suivantes dans Dataform:

Avant de commencer

Pour planifier des exécutions avec des configurations de workflow ou planifier des exécutions avec des workflows et Cloud Scheduler, procédez comme suit:

  1. Dans la console Google Cloud, accédez à la page Dataform page.

    Accéder à Dataform

  2. Sélectionnez ou créez un dépôt.

  3. Créez une configuration de version.

Pour planifier des exécutions avec Cloud Composer, procédez comme suit:

  1. Sélectionnez ou créez un dépôt Dataform.
  2. Accordez à Dataform l'accès à BigQuery.
  3. Sélectionnez ou créez un espace de travail Dataform.
  4. Créez au moins une table.
  5. Créez un environnement Cloud Composer 2.

Rôles requis

Pour obtenir les autorisations nécessaires pour effectuer les tâches décrites dans ce document, demandez à votre administrateur de vous accorder les rôles IAM suivants:

Pour en savoir plus sur l'attribution de rôles, consultez la page Gérer l'accès aux projets, aux dossiers et aux organisations.

Vous pouvez également obtenir les autorisations requises via des rôles personnalisés ou d'autres rôles prédéfinis.

Pour utiliser un compte de service autre que le compte de service Dataform par défaut, accordez l'accès au compte de service personnalisé.

Planifier des exécutions avec des configurations de workflow

Cette section explique comment créer une configuration de workflow dans Dataform pour planifier et configurer les exécutions de workflow. Vous pouvez utiliser des configurations de workflow pour exécuter des workflows Dataform de manière planifiée.

À propos des configurations de workflow

Pour planifier des exécutions Dataform de toutes les actions de workflow ou de certaines d'entre elles dans BigQuery, vous pouvez créer des configurations de workflow. Dans une configuration de workflow, vous sélectionnez une configuration de version de compilation, les actions de workflow à exécuter et la planification d'exécution.

Ensuite, lors d'une exécution planifiée de la configuration de votre workflow, Dataform déploie votre sélection d'actions à partir du dernier résultat de compilation dans votre configuration de version dans BigQuery. Vous pouvez également déclencher manuellement l'exécution d'une configuration de workflow avec les workflowConfigs de l'API Dataform.

Une configuration de workflow Dataform contient les paramètres d'exécution suivants:

  • ID de la configuration du workflow.
  • Configuration de la version.
  • Compte de service.

    Il s'agit du compte de service associé à la configuration du workflow. Vous pouvez sélectionner le compte de service Dataform par défaut ou un compte de service associé à votre projet Google Cloud, ou saisir manuellement un autre compte de service. Par défaut, les configurations de workflow utilisent les mêmes comptes de service que leurs dépôts.

  • Actions de workflow à exécuter:

    • Toutes les actions.
    • Sélection d'actions.
    • Sélection de balises.
  • Calendrier d'exécution et fuseau horaire

Créer une configuration de workflow

Pour créer une configuration de workflow Dataform, procédez comme suit:

  1. Dans votre dépôt, accédez à Publications et planification.
  2. Dans la section Configurations de workflow, cliquez sur Créer.
  3. Dans le volet Create workflow configuration (Créer une configuration de workflow), dans le champ Configuration ID (ID de configuration), saisissez un ID unique pour la configuration de workflow.

    Les ID ne peuvent contenir que des chiffres, des lettres, des traits d'union et des traits de soulignement.

  4. Dans le menu Configuration de version, sélectionnez une configuration de version de compilation.

  5. Facultatif: dans le champ Fréquence, saisissez la fréquence d'exécution au format unix-cron.

    Pour vous assurer que Dataform exécute le dernier résultat de compilation dans la configuration de version correspondante, laissez un temps de pause d'au moins une heure entre le moment de la création du résultat de compilation et l'heure de l'exécution planifiée.

  6. Dans le menu Compte de service, sélectionnez un compte de service pour la configuration du workflow.

    Dans le menu, vous pouvez sélectionner le compte de service Dataform par défaut ou tout compte de service associé à votre projet Google Cloud auquel vous avez accès. Si vous ne sélectionnez pas de compte de service, la configuration du workflow utilise le compte de service du dépôt.

  7. Facultatif: dans le menu Fuseau horaire, sélectionnez le fuseau horaire pour les exécutions.

    Le fuseau horaire par défaut est UTC.

  8. Sélectionnez les actions de workflow à exécuter:

    • Pour exécuter l'ensemble du workflow, cliquez sur Toutes les actions.
    • Pour exécuter des actions sélectionnées dans le workflow, cliquez sur Sélection d'actions, puis sélectionnez les actions.
    • Pour exécuter des actions avec des tags sélectionnés, cliquez sur Sélection de tags, puis sélectionnez des tags.
    • Facultatif: Pour exécuter les actions ou balises sélectionnées et leurs dépendances, sélectionnez l'option Inclure les dépendances.
    • Facultatif: Pour exécuter les actions ou tags sélectionnés et leurs dépendances, sélectionnez l'option Inclure les dépendances.
    • Facultatif: Pour reconstruire toutes les tables à partir de zéro, sélectionnez l'option Run with full refresh (Exécuter avec actualisation complète).

    Sans cette option, Dataform met à jour les tables incrémentielles sans les recréer à partir de zéro.

  9. Cliquez sur Créer.

Par exemple, la configuration de workflow suivante exécute des actions avec la balise hourly toutes les heures dans le fuseau horaire CEST:

  • ID de configuration: production-hourly
  • Configuration de la version: -
  • Fréquence: 0 * * * *
  • Fuseau horaire: Central European Summer Time (CEST)
  • Sélection d'actions de workflow: sélection de balises, balise hourly

Modifier la configuration d'un workflow

Pour modifier la configuration d'un workflow, procédez comme suit:

  1. Dans votre dépôt, accédez à Versions et planification.
  2. À côté de la configuration de workflow que vous souhaitez modifier, cliquez sur le menu Plus, puis sur Modifier.
  3. Dans le volet Modifier la configuration du workflow, modifiez les paramètres de configuration de la version, puis cliquez sur Enregistrer.

Supprimer une configuration de workflow

Pour supprimer une configuration de workflow, procédez comme suit:

  1. Dans votre dépôt, accédez à Publications et planification.
  2. À côté de la configuration de workflow que vous souhaitez supprimer, cliquez sur le menu Plus, puis sur Supprimer.
  3. Dans la boîte de dialogue Supprimer la configuration de version, cliquez sur Supprimer.

Planifier des exécutions avec Workflows et Cloud Scheduler

Cette section explique comment planifier l'exécution de workflows Dataform à l'aide de Workflows et de Cloud Scheduler.

À propos des exécutions de workflow planifiées

Vous pouvez définir la fréquence d'exécution de vos workflows Dataform en créant une tâche Cloud Scheduler qui déclenche un workflow Workflows. Workflows exécute les services dans un workflow d'orchestration que vous définissez.

Workflows exécute votre workflow Dataform en deux étapes. Tout d'abord, il extrait le code de votre dépôt Dataform de votre fournisseur Git et le compile en résultat de compilation. Il utilise ensuite le résultat de la compilation pour créer un workflow Dataform et l'exécute à la fréquence que vous définissez.

Créer un workflow d'orchestration planifié

Pour planifier l'exécution de votre workflow Dataform, utilisez Workflows pour créer un workflow d'orchestration et ajouter un job Cloud Scheduler comme déclencheur.

  1. Workflows utilise des comptes de service pour accorder aux workflows un accès aux ressourcesGoogle Cloud . Créez un compte de service et accordez-lui le rôle Éditeur Dataform (roles/dataform.editor) pour la gestion des identités et des accès, ainsi que les autorisations minimales requises pour gérer votre workflow d'orchestration. Pour en savoir plus, consultez la section Accorder une autorisation à un workflow pour accéder aux ressources Google Cloud .

  2. Créez un workflow d'orchestration et utilisez le code source YAML suivant comme définition de workflow:

    main:
        steps:
        - init:
            assign:
            - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID
        - createCompilationResult:
            call: http.post
            args:
                url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
                auth:
                    type: OAuth2
                body:
                    gitCommitish: GIT_COMMITISH
            result: compilationResult
        - createWorkflowInvocation:
            call: http.post
            args:
                url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
                auth:
                    type: OAuth2
                body:
                    compilationResult: ${compilationResult.body.name}
            result: workflowInvocation
        - complete:
            return: ${workflowInvocation.body.name}
    

    Remplacez les éléments suivants :

    • PROJECT_ID: ID de votre Google Cloud projet.
    • REPOSITORY_LOCATION: emplacement de votre dépôt Dataform.
    • REPOSITORY_ID: nom de votre dépôt Dataform.
    • GIT_COMMITISH: branche Git à partir de laquelle vous souhaitez exécuter le code Dataform. Pour un dépôt nouvellement créé, remplacez-le par main.
  3. Planifiez le workflow d'orchestration à l'aide de Cloud Scheduler.

Personnaliser la requête de création de résultat de compilation du workflow Dataform

Vous pouvez mettre à jour le workflow d'orchestration existant et définir les paramètres de la requête de création de résultats de compilation du workflow Dataform au format YAML. Pour en savoir plus sur les paramètres, consultez la documentation de référence sur les ressources REST projects.locations.repositories.compilationResults.

Par exemple, pour ajouter un paramètre schemaSuffix _dev à toutes les actions lors de la compilation, remplacez le corps de l'étape createCompilationResult par l'extrait de code suivant:

    - createCompilationResult:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
            auth:
                type: OAuth2
            body:
                gitCommitish: GIT_COMMITISH
                codeCompilationConfig:
                    schemaSuffix: dev

Vous pouvez également transmettre des paramètres supplémentaires en tant qu'arguments d'exécution dans une requête d'exécution de workflow et y accéder à l'aide de variables. Pour en savoir plus, consultez la section Transmettre des arguments d'environnement d'exécution dans une requête d'exécution.

Personnaliser la requête d'appel de workflow Dataform

Vous pouvez mettre à jour le workflow d'orchestration existant et définir les paramètres de la requête d'appel du workflow Dataform au format YAML. Pour en savoir plus sur les paramètres de la requête d'appel, consultez la documentation de référence sur les ressources REST projects.locations.repositories.workflowInvocations.

Par exemple, pour n'exécuter que des actions avec la balise hourly avec toutes les dépendances transitives incluses, remplacez le corps createWorkflowInvocation par l'extrait de code suivant:

    - createWorkflowInvocation:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
            auth:
                type: OAuth2
            body:
                compilationResult: ${compilationResult.body.name}
                invocationConfig:
                    includedTags:
                    - hourly
                    transitiveDependenciesIncluded: true
                

Vous pouvez également transmettre des paramètres supplémentaires en tant qu'arguments d'exécution dans une requête d'exécution de workflow et y accéder à l'aide de variables. Pour en savoir plus, consultez la section Transmettre des arguments d'environnement d'exécution dans une requête d'exécution.

Planifier des exécutions avec Cloud Composer

Vous pouvez utiliser Cloud Composer 2 pour planifier des exécutions Dataform. Dataform n'est pas compatible avec Cloud Composer 1.

Pour gérer les planifications des exécutions Dataform avec Cloud Composer 2, vous pouvez utiliser des opérateurs Dataform dans les graphes orientés acycliques (DAG) Airflow. Vous pouvez créer un DAG Airflow qui planifie les invocations de workflow Dataform.

Dataform fournit différents opérateurs Airflow. Il s'agit d'opérateurs permettant d'obtenir un résultat de compilation, d'obtenir une invocation de workflow et d'annuler une invocation de workflow. Pour obtenir la liste complète des opérateurs Dataform Airflow disponibles, consultez les opérateurs Google Dataform.

Installer le package PyPI google-cloud-dataform

Si vous utilisez les versions 2.0.25 et ultérieures de Cloud Composer 2, ce package est préinstallé dans votre environnement. Vous n'avez pas besoin de l'installer.

Si vous utilisez des versions antérieures de Cloud Composer 2, installez le package PyPI google-cloud-dataform.

Dans la section "Packages PyPI", spécifiez la version ==0.2.0.

Créer un DAG Airflow qui planifie les invocations de workflow Dataform

Pour gérer les exécutions planifiées des workflows Dataform avec Cloud Composer 2, écrivez le DAG à l'aide des opérateurs Airflow Dataform, puis importez-le dans le bucket de votre environnement.

L'exemple de code suivant montre un DAG Airflow qui crée un résultat de compilation Dataform et lance une invocation de workflow Dataform:

from datetime import datetime

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )
    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )


create_compilation_result >> create_workflow_invocation

Remplacez les éléments suivants :

  • PROJECT_ID: ID de votre projet Google Cloud Dataform.
  • REPOSITORY_ID: nom de votre dépôt Dataform.
  • REGION: région dans laquelle se trouve le dépôt Dataform.
  • COMPILATION_RESULT: nom du résultat de compilation que vous souhaitez utiliser pour cette invocation de workflow.
  • GIT_COMMITISH: commit Git dans le dépôt Git distant de la version de votre code que vous souhaitez utiliser (par exemple, une branche ou un SHA Git).

L'exemple de code suivant montre un DAG Airflow qui effectue les opérations suivantes:

  1. Crée un résultat de compilation Dataform.
  2. Démarre une invocation de workflow Dataform asynchrone.
  3. Interroge l'état de votre workflow jusqu'à ce qu'il passe à l'état attendu à l'aide de DataformWorkflowInvocationStateSensor.
from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": COMPILATION_RESULT
    }
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)


create_compilation_result >> create_workflow_invocation

Remplacez les éléments suivants :

  • PROJECT_ID: ID de votre projet Google Cloud Dataform.
  • REPOSITORY_ID: nom de votre dépôt Dataform.
  • REGION: région dans laquelle se trouve le dépôt Dataform.
  • COMPILATION_RESULT: nom du résultat de compilation que vous souhaitez utiliser pour cette invocation de workflow.
  • GIT_COMMITISH: commit Git dans le dépôt Git distant de la version de votre code que vous souhaitez utiliser (par exemple, une branche ou un SHA Git).
  • COMPILATION_RESULT: nom du résultat de compilation que vous souhaitez utiliser pour cette invocation de workflow.

Ajouter des paramètres de configuration de compilation

Vous pouvez ajouter des paramètres de configuration de compilation supplémentaires à l'objet DAG Airflow create_compilation_result. Pour en savoir plus sur les paramètres disponibles, consultez la documentation de référence de l'API Dataform CodeCompilationConfig.

  • Pour ajouter des paramètres de configuration de compilation à l'objet DAG Airflow create_compilation_result, ajoutez les paramètres sélectionnés au champ code_compilation_config au format suivant:

        create_compilation_result = DataformCreateCompilationResultOperator(
            task_id="create_compilation_result",
            project_id=PROJECT_ID,
            region=REGION,
            repository_id=REPOSITORY_ID,
            compilation_result={
                "git_commitish": GIT_COMMITISH,
                "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
            },
        )
    

    Remplacez les éléments suivants :

    • PROJECT_ID: ID de votre projet Google Cloud Dataform.
    • REPOSITORY_ID: nom de votre dépôt Dataform.
    • REGION: région dans laquelle se trouve le dépôt Dataform.
    • GIT_COMMITISH: commit Git dans le dépôt Git distant de la version de votre code que vous souhaitez utiliser (par exemple, une branche ou un SHA Git).
    • PARAMETER: paramètre CodeCompilationConfig sélectionné. Vous pouvez ajouter plusieurs paramètres.
    • PARAMETER_VALUE: valeur du paramètre sélectionné.

L'exemple de code suivant montre le paramètre defaultDatabase ajouté à l'objet DAG Airflow create_compilation_result:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": REMOTE_BRANCH,
            "code_compilation_config": { "default_database": "my-custom-gcp-project"}
        },
    )

Ajouter des paramètres de configuration d'appel de workflow

Vous pouvez ajouter des paramètres de configuration d'appel de workflow supplémentaires à l'objet DAG Airflow create_workflow_invocation. Pour en savoir plus sur les paramètres disponibles, consultez la documentation de référence de l'API Dataform InvocationConfig.

  • Pour ajouter des paramètres de configuration d'appel de workflow à l'objet DAG Airflow create_workflow_invocation, ajoutez les paramètres sélectionnés au champ invocation_config au format suivant:

        create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
            task_id='create_workflow_invocation',
            project_id=PROJECT_ID,
            region=REGION,
            repository_id=REPOSITORY_ID,
            workflow_invocation={
                "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
                "invocation_config": { "PARAMETER": PARAMETER_VALUE }
            },
        )
    
    

    Remplacez les éléments suivants :

    • PROJECT_ID: ID de votre projet Google Cloud Dataform.
    • REPOSITORY_ID: nom de votre dépôt Dataform.
    • REGION: région dans laquelle se trouve le dépôt Dataform.
    • PARAMETER: paramètre InvocationConfig sélectionné. Vous pouvez ajouter plusieurs paramètres.
    • PARAMETER_VALUE: valeur du paramètre sélectionné.

L'exemple de code suivant montre les paramètres includedTags[] et transitiveDependenciesIncluded ajoutés à l'objet DAG Airflow create_workflow_invocation:

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
        },
    )

Étape suivante