Vous pouvez utiliser Cloud Composer 2 pour planifier des exécutions Dataform. Dataform n'est pas compatible avec Cloud Composer 1.
Pour gérer les planifications des exécutions Dataform avec Cloud Composer 2, procédez comme suit : vous pouvez utiliser des opérateurs Dataform dans les graphes orientés acycliques (DAG) d'Airflow. Vous pouvez créer un DAG Airflow qui planifie Appels du workflow Dataform.
Dataform propose divers opérateurs Airflow. Cela inclut les opérateurs pour obtenir un résultat de compilation, un appel de workflow et annuler un appel de workflow. Afficher la liste complète des Dataform disponibles Opérateurs Airflow, consultez la page Opérateurs Google Dataform.
Avant de commencer
- Sélectionner ou Créez un dépôt Dataform.
- Autorisez Dataform à accéder à BigQuery.
- Sélectionnez ou créez un espace de travail Dataform.
- Créez au moins une table.
- Créez un environnement Cloud Composer 2.
- Accordez le rôle roles/composer.worker. et roles/dataform.editor au compte de service de votre environnement Cloud Composer.
Installer le package PyPi google-cloud-dataform
Si vous utilisez les versions 2.0.25
et ultérieures de Cloud Composer, ce package
est préinstallé dans votre environnement. Vous n'avez pas besoin de l'installer.
Si vous utilisez des versions antérieures de Cloud Composer 2,
Installez le package PyPi google-cloud-dataform
.
Dans la section "Packages PyPI", spécifiez la version ==0.2.0
.
Créer un DAG Airflow qui planifie les appels du workflow Dataform
Pour gérer les exécutions planifiées de workflows Dataform SQL avec Cloud Composer 2, écrire le DAG à l'aide des opérateurs Dataform Airflow, importez-le dans le bucket de votre environnement.
L'exemple de code suivant montre un DAG Airflow qui crée un objet Dataform résultat de la compilation et lance un appel de workflow Dataform:
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
Remplacez les éléments suivants :
- PROJECT_ID: ID de votre projet Google Cloud Dataform
- REPOSITORY_ID: nom de votre dépôt Dataform
- REGION: région dans laquelle Dataform se trouve
- COMPILATION_RESULT: nom du résultat de la compilation que vous souhaitez utiliser pour cet appel de workflow
- GIT_COMMITISH: commitish Git. Par exemple, une branche ou un SHA Git, dans votre dépôt Git distant le code que vous souhaitez utiliser
L'exemple de code suivant montre un DAG Airflow qui:
- Crée un résultat de compilation Dataform.
- Démarre un appel de workflow Dataform asynchrone.
- Interroge l'état de votre workflow jusqu'à ce qu'il passe à l'état attendu
à l'aide de
DataformWorkflowInvocationStateSensor
.
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
Remplacez les éléments suivants :
- PROJECT_ID: ID de votre projet Google Cloud Dataform
- REPOSITORY_ID: nom de votre dépôt Dataform
- REGION: région dans laquelle Dataform se trouve
- COMPILATION_RESULT: nom du résultat de la compilation que vous souhaitez utiliser pour cet appel de workflow
- GIT_COMMITISH: commitish Git. Par exemple, une branche ou un SHA Git, dans le dépôt Git distant de la version du code que vous souhaitez utiliser
- COMPILATION_RESULT: nom du résultat de la compilation que vous souhaitez utiliser pour cet appel de workflow
Ajouter des paramètres de configuration de compilation
Vous pouvez ajouter des paramètres de configuration de compilation supplémentaires au
create_compilation_result
objet DAG Airflow. Pour en savoir plus sur
disponibles, consultez la documentation de référence de l'API Dataform CodeCompilationConfig
.
- Pour ajouter des paramètres de configuration de compilation à
create_compilation_result
Objet DAG Airflow, ajoutez les paramètres sélectionnés àcode_compilation_config
au format suivant:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
"code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
},
)
Remplacez les éléments suivants :
- PROJECT_ID: ID de votre projet Google Cloud Dataform
- REPOSITORY_ID: nom de votre dépôt Dataform
- REGION par la région dans laquelle le dépôt Dataform ; se trouve
- GIT_COMMITISH: commitish Git. Par exemple, une branche ou un SHA Git, dans le dépôt Git distant de la version du code que vous souhaitez utiliser
- PARAMETER: paramètre
CodeCompilationConfig
sélectionné. Vous pouvez ajouter plusieurs paramètres. - PARAMETER_VALUE: valeur du paramètre sélectionné
L'exemple de code suivant montre le paramètre defaultDatabase
ajouté au
create_compilation_result
Objet DAG Airflow:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
Ajouter des paramètres de configuration d'appel de workflow
Vous pouvez ajouter des paramètres de configuration d'appel de workflow supplémentaires
create_workflow_invocation
objet DAG Airflow. Pour en savoir plus sur
disponibles, consultez la documentation de référence de l'API Dataform InvocationConfig
.
- Pour ajouter des paramètres de configuration d'appel de workflow à
create_workflow_invocation
objet DAG Airflow, ajouter les paramètres sélectionnés parinvocation_config
, au format suivant:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "PARAMETER": PARAMETER_VALUE }
},
)
Remplacez les éléments suivants :
- PROJECT_ID: ID de votre projet Google Cloud Dataform
- REPOSITORY_ID: nom de votre dépôt Dataform
- REGION: région dans laquelle le dépôt Dataform se trouve
- PARAMETER: paramètre
InvocationConfig
sélectionné. Vous pouvez ajouter plusieurs paramètres. - PARAMETER_VALUE: valeur du paramètre sélectionné
L'exemple de code suivant montre les classes includedTags[]
et
Paramètres transitiveDependenciesIncluded
ajoutés à
create_workflow_invocation
Objet DAG Airflow:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
},
)
Étape suivante
- Pour savoir comment configurer des remplacements de compilation pour les exécutions de workflow, consultez Configurez des remplacements de compilation.
- Pour en savoir plus sur l'API Dataform, consultez API Dataform :
- Pour en savoir plus sur les environnements Cloud Composer, consultez Présentation de Cloud Composer
- Pour savoir comment planifier des exécutions avec Workflows et Cloud Scheduler, consultez Planifiez des exécutions avec Workflows et Cloud Scheduler.