Dataform-Ausführungen können mit Cloud Composer 2 geplant werden. Cloud Composer 1 wird von Dataform nicht unterstützt.
Zum Verwalten von Zeitplänen für Dataform-Ausführungen mit Cloud Composer 2 können Sie Dataform-Operatoren in Airflow-Directed Acyclic Graphs (DAGs) verwenden. Sie können einen Airflow-DAG erstellen, der Dataform-Workflowaufrufe plant.
Dataform bietet verschiedene Airflow-Operatoren. Dazu gehören Operatoren zum Abrufen eines Kompilierungsergebnisses, zum Abrufen eines Workflowaufrufs und zum Abbrechen eines Workflowaufrufs. Eine vollständige Liste der verfügbaren Dataform Airflow-Operatoren finden Sie unter Google Dataform-Operatoren.
Hinweise
- Wählen oder erstellen Sie ein Dataform-Repository.
- Dataform Zugriff auf BigQuery gewähren
- Wählen Sie einen Dataform-Arbeitsbereich aus oder erstellen Sie einen.
- Erstellen Sie mindestens eine Tabelle.
- Erstellen Sie eine Cloud Composer 2-Umgebung.
- Weisen Sie dem Dienstkonto Ihrer Cloud Composer-Umgebung die Rollen roles/composer.worker und roles/dataform.editor zu.
PyPi-Paket google-cloud-dataform
installieren
Wenn Sie die Cloud Composer 2-Version 2.0.25
und höher verwenden, ist dieses Paket in Ihrer Umgebung vorinstalliert. Sie müssen es nicht installieren.
Wenn Sie frühere Versionen von Cloud Composer 2 verwenden, installieren Sie das PyPi-Paket google-cloud-dataform
.
Geben Sie im Abschnitt „PyPI-Pakete“ die Version ==0.2.0
an.
Airflow-DAG zum Planen von Dataform-Workflowaufrufen erstellen
Wenn Sie geplante Ausführungen von Dataform SQL-Workflows mit Cloud Composer 2 verwalten möchten, schreiben Sie den DAG mit Dataform Airflow-Operatoren und laden Sie ihn dann in den Bucket Ihrer Umgebung hoch.
Das folgende Codebeispiel zeigt einen Airflow-DAG, der ein Dataform-Kompilierungsergebnis erstellt und einen Dataform-Workflowaufruf startet:
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
Ersetzen Sie Folgendes:
- PROJECT_ID: Ihre Dataform-Google Cloud-Projekt-ID
- REPOSITORY_ID: der Name Ihres Dataform-Repositorys
- REGION: Region, in der sich das Dataform-Repository befindet
- COMPILATION_RESULT: der Name des Kompilierungsergebnisses, das für diesen Workflowaufruf verwendet werden soll
- GIT_COMMITISH: das Git-Commitish, z. B. ein Zweig oder ein Git-SHA in Ihrem Remote-Git-Repository der Version Ihres Codes, die Sie verwenden möchten
Das folgende Codebeispiel zeigt einen Airflow-DAG, der:
- Erstellt ein Dataform-Kompilierungsergebnis.
- Startet einen asynchronen Dataform-Workflowaufruf.
- Fragt den Status Ihres Workflows ab, bis er mit
DataformWorkflowInvocationStateSensor
in den erwarteten Status wechselt.
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
Ersetzen Sie Folgendes:
- PROJECT_ID: Ihre Dataform-Google Cloud-Projekt-ID
- REPOSITORY_ID: der Name Ihres Dataform-Repositorys
- REGION: Region, in der sich das Dataform-Repository befindet
- COMPILATION_RESULT: der Name des Kompilierungsergebnisses, das für diesen Workflowaufruf verwendet werden soll
- GIT_COMMITISH: das Git-Commitish, z. B. ein Zweig oder ein Git-SHA in Ihrem Remote-Git-Repository der Version Ihres Codes, die Sie verwenden möchten
- COMPILATION_RESULT: der Name des Kompilierungsergebnisses, das für diesen Workflowaufruf verwendet werden soll
Kompilierungskonfigurationsparameter hinzufügen
Sie können dem Airflow-DAG-Objekt create_compilation_result
zusätzliche Kompilierungskonfigurationsparameter hinzufügen. Weitere Informationen zu verfügbaren Parametern finden Sie in der Referenz zur CodeCompilationConfig
Dataform API.
- Wenn Sie dem Airflow-DAG-Objekt
create_compilation_result
Kompilierungskonfigurationsparameter hinzufügen möchten, fügen Sie die ausgewählten Parameter im folgenden Format zucode_compilation_config
hinzu:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
"code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
},
)
Ersetzen Sie Folgendes:
- PROJECT_ID: Ihre Dataform-Google Cloud-Projekt-ID
- REPOSITORY_ID: der Name Ihres Dataform-Repositorys
- REGION durch die Region, in der sich das Dataform-Repository befindet
- GIT_COMMITISH: das Git-Commitish, z. B. ein Zweig oder ein Git-SHA in Ihrem Remote-Git-Repository der Version Ihres Codes, die Sie verwenden möchten
- PARAMETER: ausgewählter Parameter
CodeCompilationConfig
. Sie können mehrere Parameter hinzufügen. - PARAMETER_VALUE: Wert des ausgewählten Parameters
Im folgenden Codebeispiel wird der Parameter defaultDatabase
dem Airflow-DAG-Objekt create_compilation_result
hinzugefügt:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
Konfigurationsparameter für Workflowaufruf hinzufügen
Sie können dem Airflow-DAG-Objekt create_workflow_invocation
zusätzliche Konfigurationsparameter für Workflowaufrufe hinzufügen. Weitere Informationen zu verfügbaren Parametern finden Sie in der Referenz zur InvocationConfig
Dataform API.
- Wenn Sie dem Airflow-DAG-Objekt
create_workflow_invocation
Workflowaufrufkonfigurationsparameter hinzufügen möchten, fügen Sie die ausgewählten Parameter im folgenden Format zuinvocation_config
hinzu:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "PARAMETER": PARAMETER_VALUE }
},
)
Ersetzen Sie Folgendes:
- PROJECT_ID: Ihre Dataform-Google Cloud-Projekt-ID
- REPOSITORY_ID: der Name Ihres Dataform-Repositorys
- REGION: Region, in der sich das Dataform-Repository befindet
- PARAMETER: ausgewählter Parameter
InvocationConfig
. Sie können mehrere Parameter hinzufügen. - PARAMETER_VALUE: Wert des ausgewählten Parameters
Im folgenden Codebeispiel werden die Parameter includedTags[]
und transitiveDependenciesIncluded
dem Airflow-DAG-Objekt create_workflow_invocation
hinzugefügt:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
},
)
Nächste Schritte
- Informationen zum Konfigurieren von Kompilierungsüberschreibungen für Workflowausführungen finden Sie unter Kompilierungsüberschreibungen konfigurieren.
- Weitere Informationen zur Dataform API finden Sie unter Dataform API.
- Weitere Informationen zu Cloud Composer-Umgebungen finden Sie unter Übersicht über Cloud Composer.
- Informationen zum Planen von Ausführungen mit Workflows und Cloud Scheduler finden Sie unter Ausführungen mit Workflows und Cloud Scheduler planen.