Vous pouvez utiliser Cloud Composer 2 pour planifier des exécutions Dataform. Dataform n'est pas compatible avec Cloud Composer 1.
Pour gérer les planifications des exécutions Dataform avec Cloud Composer 2, vous pouvez utiliser des opérateurs Dataform dans les graphes orientés acycliques (DAG) d'Airflow. Vous pouvez créer un DAG Airflow qui planifie les appels du workflow Dataform.
Dataform propose divers opérateurs Airflow. Ceux-ci incluent les opérateurs permettant d'obtenir un résultat de compilation, d'obtenir un appel de workflow et d'annuler un appel de workflow. Pour afficher la liste complète des opérateurs Dataform Airflow disponibles, consultez la page Opérateurs Google Dataform.
Avant de commencer
- Sélectionnez ou créez un dépôt Dataform.
- Autorisez Dataform à accéder à BigQuery.
- Sélectionnez ou créez un espace de travail Dataform.
- Créez au moins une table.
- Créez un environnement Cloud Composer 2.
- Attribuez les rôles roles/composer.worker et roles/dataform.editor au compte de service de votre environnement Cloud Composer.
Installer le package PyPi google-cloud-dataform
Si vous utilisez les versions 2.0.25
et ultérieures de Cloud Composer, ce package est préinstallé dans votre environnement. Vous n'avez pas besoin de l'installer.
Si vous utilisez des versions antérieures de Cloud Composer 2, installez le package PyPi google-cloud-dataform
.
Dans la section "Packages PyPI", spécifiez la version ==0.2.0
.
Créer un DAG Airflow qui planifie les appels du workflow Dataform
Pour gérer les exécutions planifiées de workflows Dataform SQL avec Cloud Composer 2, rédigez le DAG à l'aide des opérateurs Dataform Airflow, puis importez-le dans le bucket de votre environnement.
L'exemple de code suivant montre un DAG Airflow qui crée un résultat de compilation Dataform et lance un appel de workflow Dataform:
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
Remplacez les éléments suivants :
- PROJECT_ID: ID de votre projet Google Cloud Dataform
- REPOSITORY_ID: nom de votre dépôt Dataform
- REGION: région dans laquelle se trouve le dépôt Dataform
- COMPILATION_RESULT: nom du résultat de compilation que vous souhaitez utiliser pour cet appel de workflow
- GIT_COMMITISH: le commitish Git, par exemple une branche ou un SHA Git, stocké dans votre dépôt Git distant de la version de votre code que vous souhaitez utiliser.
L'exemple de code suivant montre un DAG Airflow qui:
- Crée un résultat de compilation Dataform.
- Démarre un appel de workflow Dataform asynchrone.
- Interroge l'état de votre workflow jusqu'à ce qu'il passe à l'état attendu à l'aide de
DataformWorkflowInvocationStateSensor
.
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
Remplacez les éléments suivants :
- PROJECT_ID: ID de votre projet Google Cloud Dataform
- REPOSITORY_ID: nom de votre dépôt Dataform
- REGION: région dans laquelle se trouve le dépôt Dataform
- COMPILATION_RESULT: nom du résultat de compilation que vous souhaitez utiliser pour cet appel de workflow
- GIT_COMMITISH: commitish Git, par exemple une branche ou un SHA Git, stocké dans votre dépôt Git distant de la version de votre code que vous souhaitez utiliser.
- COMPILATION_RESULT: nom du résultat de compilation que vous souhaitez utiliser pour cet appel de workflow
Ajouter des paramètres de configuration de compilation
Vous pouvez ajouter des paramètres de configuration de compilation supplémentaires à l'objet DAG Airflow create_compilation_result
. Pour en savoir plus sur les paramètres disponibles, consultez la documentation de référence de l'API Dataform CodeCompilationConfig
.
- Pour ajouter des paramètres de configuration de compilation à l'objet DAG Airflow
create_compilation_result
, ajoutez les paramètres sélectionnés àcode_compilation_config
au format suivant:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
"code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
},
)
Remplacez les éléments suivants :
- PROJECT_ID: ID de votre projet Google Cloud Dataform
- REPOSITORY_ID: nom de votre dépôt Dataform
- REGION par la région dans laquelle se trouve le dépôt Dataform.
- GIT_COMMITISH: commitish Git, par exemple une branche ou un SHA Git, stocké dans votre dépôt Git distant de la version de votre code que vous souhaitez utiliser.
- PARAMETER: paramètre
CodeCompilationConfig
sélectionné. Vous pouvez ajouter plusieurs paramètres. - PARAMETER_VALUE: valeur du paramètre sélectionné
L'exemple de code suivant montre le paramètre defaultDatabase
ajouté à l'objet DAG Airflow create_compilation_result
:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
Ajouter des paramètres de configuration d'appel de workflow
Vous pouvez ajouter des paramètres de configuration d'appel de workflow supplémentaires à l'objet DAG Airflow create_workflow_invocation
. Pour en savoir plus sur les paramètres disponibles, consultez la documentation de référence de l'API Dataform InvocationConfig
.
- Pour ajouter des paramètres de configuration d'appel de workflow à l'objet DAG Airflow
create_workflow_invocation
, ajoutez les paramètres sélectionnés àinvocation_config
au format suivant:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "PARAMETER": PARAMETER_VALUE }
},
)
Remplacez les éléments suivants :
- PROJECT_ID: ID de votre projet Google Cloud Dataform
- REPOSITORY_ID: nom de votre dépôt Dataform
- REGION: région dans laquelle se trouve le dépôt Dataform
- PARAMETER: paramètre
InvocationConfig
sélectionné. Vous pouvez ajouter plusieurs paramètres. - PARAMETER_VALUE: valeur du paramètre sélectionné
L'exemple de code suivant montre les paramètres includedTags[]
et transitiveDependenciesIncluded
ajoutés à l'objet DAG Airflow create_workflow_invocation
:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
},
)
Étapes suivantes
- Pour savoir comment configurer des remplacements de compilation pour les exécutions de workflow, consultez Configurer des remplacements de compilation.
- Pour en savoir plus sur l'API Dataform, consultez la page API Dataform.
- Pour en savoir plus sur les environnements Cloud Composer, consultez la page Présentation de Cloud Composer.
- Pour savoir comment planifier des exécutions avec Workflows et Cloud Scheduler, consultez la section Planifier des exécutions avec Workflows et Cloud Scheduler.