Ce document vous explique comment effectuer les opérations suivantes dans Dataform:
- Planifier des exécutions avec des configurations de workflow
- Planifiez des exécutions avec Workflows et Cloud Scheduler.
- Planifier des exécutions avec Cloud Composer
Avant de commencer
Pour planifier des exécutions avec des configurations de workflow ou planifier des exécutions avec des workflows et Cloud Scheduler, procédez comme suit:
Dans la console Google Cloud, accédez à la page Dataform page.
Sélectionnez ou créez un dépôt.
Créez une configuration de version.
Pour planifier des exécutions avec Cloud Composer, procédez comme suit:
- Sélectionnez ou créez un dépôt Dataform.
- Accordez à Dataform l'accès à BigQuery.
- Sélectionnez ou créez un espace de travail Dataform.
- Créez au moins une table.
- Créez un environnement Cloud Composer 2.
Rôles requis
Pour obtenir les autorisations nécessaires pour effectuer les tâches décrites dans ce document, demandez à votre administrateur de vous accorder les rôles IAM suivants:
-
Administrateur Dataform (
roles/dataform.admin
) sur les dépôts -
Éditeur Dataform (
roles/dataform.editor
) sur les dépôts et le compte de service de votre environnement Cloud Composer -
Nœud de calcul Composer (
roles/composer.worker
) sur le compte de service de l'environnement Cloud Composer
Pour en savoir plus sur l'attribution de rôles, consultez la page Gérer l'accès aux projets, aux dossiers et aux organisations.
Vous pouvez également obtenir les autorisations requises via des rôles personnalisés ou d'autres rôles prédéfinis.
Pour utiliser un compte de service autre que le compte de service Dataform par défaut, accordez l'accès au compte de service personnalisé.
Planifier des exécutions avec des configurations de workflow
Cette section explique comment créer une configuration de workflow dans Dataform pour planifier et configurer les exécutions de workflow. Vous pouvez utiliser des configurations de workflow pour exécuter des workflows Dataform de manière planifiée.
À propos des configurations de workflow
Pour planifier des exécutions Dataform de toutes les actions de workflow ou de certaines d'entre elles dans BigQuery, vous pouvez créer des configurations de workflow. Dans une configuration de workflow, vous sélectionnez une configuration de version de compilation, les actions de workflow à exécuter et la planification d'exécution.
Ensuite, lors d'une exécution planifiée de la configuration de votre workflow, Dataform déploie votre sélection d'actions à partir du dernier résultat de compilation dans votre configuration de version dans BigQuery. Vous pouvez également déclencher manuellement l'exécution d'une configuration de workflow avec les workflowConfigs de l'API Dataform.
Une configuration de workflow Dataform contient les paramètres d'exécution suivants:
- ID de la configuration du workflow.
- Configuration de la version.
Compte de service.
Il s'agit du compte de service associé à la configuration du workflow. Vous pouvez sélectionner le compte de service Dataform par défaut ou un compte de service associé à votre projet Google Cloud, ou saisir manuellement un autre compte de service. Par défaut, les configurations de workflow utilisent les mêmes comptes de service que leurs dépôts.
Actions de workflow à exécuter:
- Toutes les actions.
- Sélection d'actions.
- Sélection de balises.
Calendrier d'exécution et fuseau horaire
Créer une configuration de workflow
Pour créer une configuration de workflow Dataform, procédez comme suit:
- Dans votre dépôt, accédez à Publications et planification.
- Dans la section Configurations de workflow, cliquez sur Créer.
Dans le volet Create workflow configuration (Créer une configuration de workflow), dans le champ Configuration ID (ID de configuration), saisissez un ID unique pour la configuration de workflow.
Les ID ne peuvent contenir que des chiffres, des lettres, des traits d'union et des traits de soulignement.
Dans le menu Configuration de version, sélectionnez une configuration de version de compilation.
Facultatif: dans le champ Fréquence, saisissez la fréquence d'exécution au format unix-cron.
Pour vous assurer que Dataform exécute le dernier résultat de compilation dans la configuration de version correspondante, laissez un temps de pause d'au moins une heure entre le moment de la création du résultat de compilation et l'heure de l'exécution planifiée.
Dans le menu Compte de service, sélectionnez un compte de service pour la configuration du workflow.
Dans le menu, vous pouvez sélectionner le compte de service Dataform par défaut ou tout compte de service associé à votre projet Google Cloud auquel vous avez accès. Si vous ne sélectionnez pas de compte de service, la configuration du workflow utilise le compte de service du dépôt.
Facultatif: dans le menu Fuseau horaire, sélectionnez le fuseau horaire pour les exécutions.
Le fuseau horaire par défaut est UTC.
Sélectionnez les actions de workflow à exécuter:
- Pour exécuter l'ensemble du workflow, cliquez sur Toutes les actions.
- Pour exécuter des actions sélectionnées dans le workflow, cliquez sur Sélection d'actions, puis sélectionnez les actions.
- Pour exécuter des actions avec des tags sélectionnés, cliquez sur Sélection de tags, puis sélectionnez des tags.
- Facultatif: Pour exécuter les actions ou balises sélectionnées et leurs dépendances, sélectionnez l'option Inclure les dépendances.
- Facultatif: Pour exécuter les actions ou tags sélectionnés et leurs dépendances, sélectionnez l'option Inclure les dépendances.
- Facultatif: Pour reconstruire toutes les tables à partir de zéro, sélectionnez l'option Run with full refresh (Exécuter avec actualisation complète).
Sans cette option, Dataform met à jour les tables incrémentielles sans les recréer à partir de zéro.
Cliquez sur Créer.
Par exemple, la configuration de workflow suivante exécute des actions avec la balise hourly
toutes les heures dans le fuseau horaire CEST:
- ID de configuration:
production-hourly
- Configuration de la version: -
- Fréquence:
0 * * * *
- Fuseau horaire:
Central European Summer Time (CEST)
- Sélection d'actions de workflow: sélection de balises, balise
hourly
Modifier la configuration d'un workflow
Pour modifier la configuration d'un workflow, procédez comme suit:
- Dans votre dépôt, accédez à Versions et planification.
- À côté de la configuration de workflow que vous souhaitez modifier, cliquez sur le menu Plus, puis sur Modifier.
- Dans le volet Modifier la configuration du workflow, modifiez les paramètres de configuration de la version, puis cliquez sur Enregistrer.
Supprimer une configuration de workflow
Pour supprimer une configuration de workflow, procédez comme suit:
- Dans votre dépôt, accédez à Publications et planification.
- À côté de la configuration de workflow que vous souhaitez supprimer, cliquez sur le menu Plus, puis sur Supprimer.
- Dans la boîte de dialogue Supprimer la configuration de version, cliquez sur Supprimer.
Planifier des exécutions avec Workflows et Cloud Scheduler
Cette section explique comment planifier l'exécution de workflows Dataform à l'aide de Workflows et de Cloud Scheduler.
À propos des exécutions de workflow planifiées
Vous pouvez définir la fréquence d'exécution de vos workflows Dataform en créant une tâche Cloud Scheduler qui déclenche un workflow Workflows. Workflows exécute les services dans un workflow d'orchestration que vous définissez.
Workflows exécute votre workflow Dataform en deux étapes. Tout d'abord, il extrait le code de votre dépôt Dataform de votre fournisseur Git et le compile en résultat de compilation. Il utilise ensuite le résultat de la compilation pour créer un workflow Dataform et l'exécute à la fréquence que vous définissez.
Créer un workflow d'orchestration planifié
Pour planifier l'exécution de votre workflow Dataform, utilisez Workflows pour créer un workflow d'orchestration et ajouter un job Cloud Scheduler comme déclencheur.
Workflows utilise des comptes de service pour accorder aux workflows un accès aux ressourcesGoogle Cloud . Créez un compte de service et accordez-lui le rôle Éditeur Dataform (
roles/dataform.editor
) pour la gestion des identités et des accès, ainsi que les autorisations minimales requises pour gérer votre workflow d'orchestration. Pour en savoir plus, consultez la section Accorder une autorisation à un workflow pour accéder aux ressources Google Cloud .Créez un workflow d'orchestration et utilisez le code source YAML suivant comme définition de workflow:
main: steps: - init: assign: - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID - createCompilationResult: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"} auth: type: OAuth2 body: gitCommitish: GIT_COMMITISH result: compilationResult - createWorkflowInvocation: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"} auth: type: OAuth2 body: compilationResult: ${compilationResult.body.name} result: workflowInvocation - complete: return: ${workflowInvocation.body.name}
Remplacez les éléments suivants :
- PROJECT_ID: ID de votre Google Cloud projet.
- REPOSITORY_LOCATION: emplacement de votre dépôt Dataform.
- REPOSITORY_ID: nom de votre dépôt Dataform.
- GIT_COMMITISH: branche Git à partir de laquelle vous souhaitez exécuter le code Dataform. Pour un dépôt nouvellement créé, remplacez-le par
main
.
Planifiez le workflow d'orchestration à l'aide de Cloud Scheduler.
Personnaliser la requête de création de résultat de compilation du workflow Dataform
Vous pouvez mettre à jour le workflow d'orchestration existant et définir les paramètres de la requête de création de résultats de compilation du workflow Dataform au format YAML. Pour en savoir plus sur les paramètres, consultez la documentation de référence sur les ressources REST projects.locations.repositories.compilationResults
.
Par exemple, pour ajouter un paramètre schemaSuffix
_dev
à toutes les actions lors de la compilation, remplacez le corps de l'étape createCompilationResult
par l'extrait de code suivant:
- createCompilationResult:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
auth:
type: OAuth2
body:
gitCommitish: GIT_COMMITISH
codeCompilationConfig:
schemaSuffix: dev
Vous pouvez également transmettre des paramètres supplémentaires en tant qu'arguments d'exécution dans une requête d'exécution de workflow et y accéder à l'aide de variables. Pour en savoir plus, consultez la section Transmettre des arguments d'environnement d'exécution dans une requête d'exécution.
Personnaliser la requête d'appel de workflow Dataform
Vous pouvez mettre à jour le workflow d'orchestration existant et définir les paramètres de la requête d'appel du workflow Dataform au format YAML. Pour en savoir plus sur les paramètres de la requête d'appel, consultez la documentation de référence sur les ressources REST projects.locations.repositories.workflowInvocations
.
Par exemple, pour n'exécuter que des actions avec la balise hourly
avec toutes les dépendances transitives incluses, remplacez le corps createWorkflowInvocation
par l'extrait de code suivant:
- createWorkflowInvocation:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
auth:
type: OAuth2
body:
compilationResult: ${compilationResult.body.name}
invocationConfig:
includedTags:
- hourly
transitiveDependenciesIncluded: true
Vous pouvez également transmettre des paramètres supplémentaires en tant qu'arguments d'exécution dans une requête d'exécution de workflow et y accéder à l'aide de variables. Pour en savoir plus, consultez la section Transmettre des arguments d'environnement d'exécution dans une requête d'exécution.
Planifier des exécutions avec Cloud Composer
Vous pouvez utiliser Cloud Composer 2 pour planifier des exécutions Dataform. Dataform n'est pas compatible avec Cloud Composer 1.
Pour gérer les planifications des exécutions Dataform avec Cloud Composer 2, vous pouvez utiliser des opérateurs Dataform dans les graphes orientés acycliques (DAG) Airflow. Vous pouvez créer un DAG Airflow qui planifie les invocations de workflow Dataform.
Dataform fournit différents opérateurs Airflow. Il s'agit d'opérateurs permettant d'obtenir un résultat de compilation, d'obtenir une invocation de workflow et d'annuler une invocation de workflow. Pour obtenir la liste complète des opérateurs Dataform Airflow disponibles, consultez les opérateurs Google Dataform.
Installer le package PyPI google-cloud-dataform
Si vous utilisez les versions 2.0.25
et ultérieures de Cloud Composer 2, ce package est préinstallé dans votre environnement. Vous n'avez pas besoin de l'installer.
Si vous utilisez des versions antérieures de Cloud Composer 2, installez le package PyPI google-cloud-dataform
.
Dans la section "Packages PyPI", spécifiez la version ==0.2.0
.
Créer un DAG Airflow qui planifie les invocations de workflow Dataform
Pour gérer les exécutions planifiées des workflows Dataform avec Cloud Composer 2, écrivez le DAG à l'aide des opérateurs Airflow Dataform, puis importez-le dans le bucket de votre environnement.
L'exemple de code suivant montre un DAG Airflow qui crée un résultat de compilation Dataform et lance une invocation de workflow Dataform:
from datetime import datetime
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
Remplacez les éléments suivants :
- PROJECT_ID: ID de votre projet Google Cloud Dataform.
- REPOSITORY_ID: nom de votre dépôt Dataform.
- REGION: région dans laquelle se trouve le dépôt Dataform.
- COMPILATION_RESULT: nom du résultat de compilation que vous souhaitez utiliser pour cette invocation de workflow.
- GIT_COMMITISH: commit Git dans le dépôt Git distant de la version de votre code que vous souhaitez utiliser (par exemple, une branche ou un SHA Git).
L'exemple de code suivant montre un DAG Airflow qui effectue les opérations suivantes:
- Crée un résultat de compilation Dataform.
- Démarre une invocation de workflow Dataform asynchrone.
- Interroge l'état de votre workflow jusqu'à ce qu'il passe à l'état attendu à l'aide de
DataformWorkflowInvocationStateSensor
.
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
Remplacez les éléments suivants :
- PROJECT_ID: ID de votre projet Google Cloud Dataform.
- REPOSITORY_ID: nom de votre dépôt Dataform.
- REGION: région dans laquelle se trouve le dépôt Dataform.
- COMPILATION_RESULT: nom du résultat de compilation que vous souhaitez utiliser pour cette invocation de workflow.
- GIT_COMMITISH: commit Git dans le dépôt Git distant de la version de votre code que vous souhaitez utiliser (par exemple, une branche ou un SHA Git).
- COMPILATION_RESULT: nom du résultat de compilation que vous souhaitez utiliser pour cette invocation de workflow.
Ajouter des paramètres de configuration de compilation
Vous pouvez ajouter des paramètres de configuration de compilation supplémentaires à l'objet DAG Airflow create_compilation_result
. Pour en savoir plus sur les paramètres disponibles, consultez la documentation de référence de l'API Dataform CodeCompilationConfig
.
Pour ajouter des paramètres de configuration de compilation à l'objet DAG Airflow
create_compilation_result
, ajoutez les paramètres sélectionnés au champcode_compilation_config
au format suivant:create_compilation_result = DataformCreateCompilationResultOperator( task_id="create_compilation_result", project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, compilation_result={ "git_commitish": GIT_COMMITISH, "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"} }, )
Remplacez les éléments suivants :
- PROJECT_ID: ID de votre projet Google Cloud Dataform.
- REPOSITORY_ID: nom de votre dépôt Dataform.
- REGION: région dans laquelle se trouve le dépôt Dataform.
- GIT_COMMITISH: commit Git dans le dépôt Git distant de la version de votre code que vous souhaitez utiliser (par exemple, une branche ou un SHA Git).
- PARAMETER: paramètre
CodeCompilationConfig
sélectionné. Vous pouvez ajouter plusieurs paramètres. - PARAMETER_VALUE: valeur du paramètre sélectionné.
L'exemple de code suivant montre le paramètre defaultDatabase
ajouté à l'objet DAG Airflow create_compilation_result
:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
Ajouter des paramètres de configuration d'appel de workflow
Vous pouvez ajouter des paramètres de configuration d'appel de workflow supplémentaires à l'objet DAG Airflow create_workflow_invocation
. Pour en savoir plus sur les paramètres disponibles, consultez la documentation de référence de l'API Dataform InvocationConfig
.
Pour ajouter des paramètres de configuration d'appel de workflow à l'objet DAG Airflow
create_workflow_invocation
, ajoutez les paramètres sélectionnés au champinvocation_config
au format suivant:create_workflow_invocation = DataformCreateWorkflowInvocationOperator( task_id='create_workflow_invocation', project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, workflow_invocation={ "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}", "invocation_config": { "PARAMETER": PARAMETER_VALUE } }, )
Remplacez les éléments suivants :
- PROJECT_ID: ID de votre projet Google Cloud Dataform.
- REPOSITORY_ID: nom de votre dépôt Dataform.
- REGION: région dans laquelle se trouve le dépôt Dataform.
- PARAMETER: paramètre
InvocationConfig
sélectionné. Vous pouvez ajouter plusieurs paramètres. - PARAMETER_VALUE: valeur du paramètre sélectionné.
L'exemple de code suivant montre les paramètres includedTags[]
et transitiveDependenciesIncluded
ajoutés à l'objet DAG Airflow create_workflow_invocation
:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
},
)
Étape suivante
- Pour savoir comment configurer les configurations de version de compilation Dataform, consultez la section Créer une configuration de version.
- Pour en savoir plus sur le cycle de vie du code dans Dataform, consultez la section Présentation du cycle de vie du code dans Dataform.
- Pour en savoir plus sur l'API Dataform, consultez la section API Dataform.
- Pour en savoir plus sur les environnements Cloud Composer, consultez la section Présentation de Cloud Composer.
- Pour en savoir plus sur les tarifs de Workflows, consultez la page Tarifs de Workflows.