Planifier des exécutions avec Cloud Composer

Ce document explique comment exécuter des exécutions planifiées de Dataform. Workflows SQL utilisant Cloud Composer 2.

Vous pouvez utiliser Cloud Composer 2 pour planifier des exécutions Dataform. Dataform n'est pas compatible avec Cloud Composer 1.

Pour gérer les planifications des exécutions Dataform avec Cloud Composer 2, procédez comme suit : vous pouvez utiliser des opérateurs Dataform dans les graphes orientés acycliques (DAG) d'Airflow. Vous pouvez créer un DAG Airflow qui planifie Appels du workflow Dataform.

Dataform propose divers opérateurs Airflow. Cela inclut les opérateurs pour obtenir un résultat de compilation, un appel de workflow et annuler un appel de workflow. Afficher la liste complète des Dataform disponibles Opérateurs Airflow, consultez la page Opérateurs Google Dataform.

Avant de commencer

  1. Sélectionner ou Créez un dépôt Dataform.
  2. Autorisez Dataform à accéder à BigQuery.
  3. Sélectionnez ou créez un espace de travail Dataform.
  4. Créez au moins une table.
  5. Créez un environnement Cloud Composer 2.
    1. Accordez le rôle roles/composer.worker. et roles/dataform.editor au compte de service de votre environnement Cloud Composer.

Installer le package PyPi google-cloud-dataform

Si vous utilisez les versions 2.0.25 et ultérieures de Cloud Composer, ce package est préinstallé dans votre environnement. Vous n'avez pas besoin de l'installer.

Si vous utilisez des versions antérieures de Cloud Composer 2, Installez le package PyPi google-cloud-dataform.

Dans la section "Packages PyPI", spécifiez la version ==0.2.0.

Créer un DAG Airflow qui planifie les appels du workflow Dataform

Pour gérer les exécutions planifiées de workflows Dataform SQL avec Cloud Composer 2, écrire le DAG à l'aide des opérateurs Dataform Airflow, importez-le dans le bucket de votre environnement.

L'exemple de code suivant montre un DAG Airflow qui crée un objet Dataform résultat de la compilation et lance un appel de workflow Dataform:

from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )

create_compilation_result >> create_workflow_invocation

Remplacez les éléments suivants :

  • PROJECT_ID: ID de votre projet Google Cloud Dataform
  • REPOSITORY_ID: nom de votre dépôt Dataform
  • REGION: région dans laquelle Dataform se trouve
  • COMPILATION_RESULT: nom du résultat de la compilation que vous souhaitez utiliser pour cet appel de workflow
  • GIT_COMMITISH: commitish Git. Par exemple, une branche ou un SHA Git, dans votre dépôt Git distant le code que vous souhaitez utiliser

L'exemple de code suivant montre un DAG Airflow qui:

  1. Crée un résultat de compilation Dataform.
  2. Démarre un appel de workflow Dataform asynchrone.
  3. Interroge l'état de votre workflow jusqu'à ce qu'il passe à l'état attendu à l'aide de DataformWorkflowInvocationStateSensor.
from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": COMPILATION_RESULT
    }
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)


create_compilation_result >> create_workflow_invocation

Remplacez les éléments suivants :

  • PROJECT_ID: ID de votre projet Google Cloud Dataform
  • REPOSITORY_ID: nom de votre dépôt Dataform
  • REGION: région dans laquelle Dataform se trouve
  • COMPILATION_RESULT: nom du résultat de la compilation que vous souhaitez utiliser pour cet appel de workflow
  • GIT_COMMITISH: commitish Git. Par exemple, une branche ou un SHA Git, dans le dépôt Git distant de la version du code que vous souhaitez utiliser
  • COMPILATION_RESULT: nom du résultat de la compilation que vous souhaitez utiliser pour cet appel de workflow

Ajouter des paramètres de configuration de compilation

Vous pouvez ajouter des paramètres de configuration de compilation supplémentaires au create_compilation_result objet DAG Airflow. Pour en savoir plus sur disponibles, consultez la documentation de référence de l'API Dataform CodeCompilationConfig.

  • Pour ajouter des paramètres de configuration de compilation à create_compilation_result Objet DAG Airflow, ajoutez les paramètres sélectionnés à code_compilation_config au format suivant:
    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
            "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
        },
    )

Remplacez les éléments suivants :

  • PROJECT_ID: ID de votre projet Google Cloud Dataform
  • REPOSITORY_ID: nom de votre dépôt Dataform
  • REGION par la région dans laquelle le dépôt Dataform ; se trouve
  • GIT_COMMITISH: commitish Git. Par exemple, une branche ou un SHA Git, dans le dépôt Git distant de la version du code que vous souhaitez utiliser
  • PARAMETER: paramètre CodeCompilationConfig sélectionné. Vous pouvez ajouter plusieurs paramètres.
  • PARAMETER_VALUE: valeur du paramètre sélectionné

L'exemple de code suivant montre le paramètre defaultDatabase ajouté au create_compilation_result Objet DAG Airflow:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": REMOTE_BRANCH,
            "code_compilation_config": { "default_database": "my-custom-gcp-project"}
        },
    )

Ajouter des paramètres de configuration d'appel de workflow

Vous pouvez ajouter des paramètres de configuration d'appel de workflow supplémentaires create_workflow_invocation objet DAG Airflow. Pour en savoir plus sur disponibles, consultez la documentation de référence de l'API Dataform InvocationConfig.

  • Pour ajouter des paramètres de configuration d'appel de workflow à create_workflow_invocation objet DAG Airflow, ajouter les paramètres sélectionnés par invocation_config, au format suivant:
    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            
            "invocation_config": { "PARAMETER": PARAMETER_VALUE }
        },

    )

Remplacez les éléments suivants :

  • PROJECT_ID: ID de votre projet Google Cloud Dataform
  • REPOSITORY_ID: nom de votre dépôt Dataform
  • REGION: région dans laquelle le dépôt Dataform se trouve
  • PARAMETER: paramètre InvocationConfig sélectionné. Vous pouvez ajouter plusieurs paramètres.
  • PARAMETER_VALUE: valeur du paramètre sélectionné

L'exemple de code suivant montre les classes includedTags[] et Paramètres transitiveDependenciesIncluded ajoutés à create_workflow_invocation Objet DAG Airflow:

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
        },
    )

Étape suivante