Planifier des exécutions avec Cloud Composer

Ce document explique comment exécuter des exécutions planifiées de workflows Dataform SQL à l'aide de Cloud Composer 2.

Vous pouvez utiliser Cloud Composer 2 pour planifier des exécutions Dataform. Dataform n'est pas compatible avec Cloud Composer 1.

Pour gérer les planifications des exécutions Dataform avec Cloud Composer 2, vous pouvez utiliser des opérateurs Dataform dans les graphes orientés acycliques (DAG) d'Airflow. Vous pouvez créer un DAG Airflow qui planifie les appels du workflow Dataform.

Dataform propose divers opérateurs Airflow. Ceux-ci incluent les opérateurs permettant d'obtenir un résultat de compilation, d'obtenir un appel de workflow et d'annuler un appel de workflow. Pour afficher la liste complète des opérateurs Dataform Airflow disponibles, consultez la page Opérateurs Google Dataform.

Avant de commencer

  1. Sélectionnez ou créez un dépôt Dataform.
  2. Autorisez Dataform à accéder à BigQuery.
  3. Sélectionnez ou créez un espace de travail Dataform.
  4. Créez au moins une table.
  5. Créez un environnement Cloud Composer 2.
    1. Attribuez les rôles roles/composer.worker et roles/dataform.editor au compte de service de votre environnement Cloud Composer.

Installer le package PyPi google-cloud-dataform

Si vous utilisez les versions 2.0.25 et ultérieures de Cloud Composer, ce package est préinstallé dans votre environnement. Vous n'avez pas besoin de l'installer.

Si vous utilisez des versions antérieures de Cloud Composer 2, installez le package PyPi google-cloud-dataform.

Dans la section "Packages PyPI", spécifiez la version ==0.2.0.

Créer un DAG Airflow qui planifie les appels du workflow Dataform

Pour gérer les exécutions planifiées de workflows Dataform SQL avec Cloud Composer 2, rédigez le DAG à l'aide des opérateurs Dataform Airflow, puis importez-le dans le bucket de votre environnement.

L'exemple de code suivant montre un DAG Airflow qui crée un résultat de compilation Dataform et lance un appel de workflow Dataform:

from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )

create_compilation_result >> create_workflow_invocation

Remplacez les éléments suivants :

  • PROJECT_ID: ID de votre projet Google Cloud Dataform
  • REPOSITORY_ID: nom de votre dépôt Dataform
  • REGION: région dans laquelle se trouve le dépôt Dataform
  • COMPILATION_RESULT: nom du résultat de compilation que vous souhaitez utiliser pour cet appel de workflow
  • GIT_COMMITISH: le commitish Git, par exemple une branche ou un SHA Git, stocké dans votre dépôt Git distant de la version de votre code que vous souhaitez utiliser.

L'exemple de code suivant montre un DAG Airflow qui:

  1. Crée un résultat de compilation Dataform.
  2. Démarre un appel de workflow Dataform asynchrone.
  3. Interroge l'état de votre workflow jusqu'à ce qu'il passe à l'état attendu à l'aide de DataformWorkflowInvocationStateSensor.
from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": COMPILATION_RESULT
    }
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)


create_compilation_result >> create_workflow_invocation

Remplacez les éléments suivants :

  • PROJECT_ID: ID de votre projet Google Cloud Dataform
  • REPOSITORY_ID: nom de votre dépôt Dataform
  • REGION: région dans laquelle se trouve le dépôt Dataform
  • COMPILATION_RESULT: nom du résultat de compilation que vous souhaitez utiliser pour cet appel de workflow
  • GIT_COMMITISH: commitish Git, par exemple une branche ou un SHA Git, stocké dans votre dépôt Git distant de la version de votre code que vous souhaitez utiliser.
  • COMPILATION_RESULT: nom du résultat de compilation que vous souhaitez utiliser pour cet appel de workflow

Ajouter des paramètres de configuration de compilation

Vous pouvez ajouter des paramètres de configuration de compilation supplémentaires à l'objet DAG Airflow create_compilation_result. Pour en savoir plus sur les paramètres disponibles, consultez la documentation de référence de l'API Dataform CodeCompilationConfig.

  • Pour ajouter des paramètres de configuration de compilation à l'objet DAG Airflow create_compilation_result, ajoutez les paramètres sélectionnés à code_compilation_config au format suivant:
    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
            "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
        },
    )

Remplacez les éléments suivants :

  • PROJECT_ID: ID de votre projet Google Cloud Dataform
  • REPOSITORY_ID: nom de votre dépôt Dataform
  • REGION par la région dans laquelle se trouve le dépôt Dataform.
  • GIT_COMMITISH: commitish Git, par exemple une branche ou un SHA Git, stocké dans votre dépôt Git distant de la version de votre code que vous souhaitez utiliser.
  • PARAMETER: paramètre CodeCompilationConfig sélectionné. Vous pouvez ajouter plusieurs paramètres.
  • PARAMETER_VALUE: valeur du paramètre sélectionné

L'exemple de code suivant montre le paramètre defaultDatabase ajouté à l'objet DAG Airflow create_compilation_result:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": REMOTE_BRANCH,
            "code_compilation_config": { "default_database": "my-custom-gcp-project"}
        },
    )

Ajouter des paramètres de configuration d'appel de workflow

Vous pouvez ajouter des paramètres de configuration d'appel de workflow supplémentaires à l'objet DAG Airflow create_workflow_invocation. Pour en savoir plus sur les paramètres disponibles, consultez la documentation de référence de l'API Dataform InvocationConfig.

  • Pour ajouter des paramètres de configuration d'appel de workflow à l'objet DAG Airflow create_workflow_invocation, ajoutez les paramètres sélectionnés à invocation_config au format suivant:
    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            
            "invocation_config": { "PARAMETER": PARAMETER_VALUE }
        },

    )

Remplacez les éléments suivants :

  • PROJECT_ID: ID de votre projet Google Cloud Dataform
  • REPOSITORY_ID: nom de votre dépôt Dataform
  • REGION: région dans laquelle se trouve le dépôt Dataform
  • PARAMETER: paramètre InvocationConfig sélectionné. Vous pouvez ajouter plusieurs paramètres.
  • PARAMETER_VALUE: valeur du paramètre sélectionné

L'exemple de code suivant montre les paramètres includedTags[] et transitiveDependenciesIncluded ajoutés à l'objet DAG Airflow create_workflow_invocation:

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
        },
    )

Étapes suivantes