Dataform-Ausführungen können mit Cloud Composer 2 geplant werden. Cloud Composer 1 wird von Dataform nicht unterstützt.
Um Zeitpläne für Dataform-Ausführungen mit Cloud Composer 2 zu verwalten, Sie können Dataform-Operatoren in Airflow-Directed Acyclic Graphs (DAGs) verwenden. Sie können einen Airflow-DAG erstellen, der Dataform-Workflowaufrufe.
Dataform bietet verschiedene Airflow-Operatoren. Dazu gehören Operatoren zum Abrufen eines Kompilierungsergebnisses, zum Abrufen eines Workflow-Aufrufs und zum Abbrechen einen Workflowaufruf. So rufen Sie die vollständige Liste der verfügbaren Dataform-Tools auf: Airflow-Operatoren finden Sie unter Google Dataform-Operatoren.
Hinweise
- Auswählen oder Erstellen Sie ein Dataform-Repository.
- Dataform Zugriff auf BigQuery gewähren
- Wählen Sie einen Dataform-Arbeitsbereich aus oder erstellen Sie einen.
- Erstellen Sie mindestens eine Tabelle.
- Erstellen Sie eine Cloud Composer 2-Umgebung.
- Die Rolle roles/composer.worker zuweisen und roles/dataform.editor Rollen für das Dienstkonto Ihrer Cloud Composer-Umgebung.
PyPi-Paket google-cloud-dataform
installieren
Wenn Sie die Cloud Composer 2-Version 2.0.25
und höher verwenden, hat dieses Paket
ist in Ihrer Umgebung vorinstalliert. Sie müssen es nicht installieren.
Wenn Sie frühere Versionen von Cloud Composer 2 verwenden,
Installieren Sie das PyPi-Paket google-cloud-dataform
.
Geben Sie im Abschnitt „PyPI-Pakete“ die Version ==0.2.0
an.
Airflow-DAG zum Planen von Dataform-Workflowaufrufen erstellen
Um geplante Ausführungen von Dataform SQL-Workflows mit Cloud Composer 2, DAG schreiben mit Dataform-Airflow-Operatoren, dann laden Sie sie in den Bucket Ihrer Umgebung hoch.
Das folgende Codebeispiel zeigt einen Airflow-DAG, der ein Dataform-Objekt erstellt Kompilierungsergebnis und startet einen Dataform-Workflowaufruf:
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
Ersetzen Sie Folgendes:
- PROJECT_ID: Ihre Dataform-Google Cloud-Projekt-ID
- REPOSITORY_ID: der Name Ihres Dataform-Repositorys
- REGION: die Region, in der das Dataform-Objekt verwendet wird befindet sich
- COMPILATION_RESULT: der Name des Kompilierungsergebnisses Sie für diesen Workflowaufruf verwenden möchten
- GIT_COMMITISH: das Git-Commitish, z. B. einem Zweig oder Git-SHA in Ihrem Remote-Git-Repository der Version von den Code, den Sie verwenden möchten,
Das folgende Codebeispiel zeigt einen Airflow-DAG, der:
- Erstellt ein Dataform-Kompilierungsergebnis.
- Startet einen asynchronen Dataform-Workflowaufruf.
- Fragt den Status Ihres Workflows ab, bis er den erwarteten Status erreicht
mit
DataformWorkflowInvocationStateSensor
.
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
Ersetzen Sie Folgendes:
- PROJECT_ID: Ihre Dataform-Google Cloud-Projekt-ID
- REPOSITORY_ID: der Name Ihres Dataform-Repositorys
- REGION: die Region, in der das Dataform-Objekt verwendet wird befindet sich
- COMPILATION_RESULT: der Name des Kompilierungsergebnisses Sie für diesen Workflowaufruf verwenden möchten
- GIT_COMMITISH: das Git-Commitish, z. B. einem Zweig oder Git-SHA in Ihrem Remote-Git-Repository der Version Ihres Code, den Sie verwenden möchten,
- COMPILATION_RESULT: der Name des Kompilierungsergebnisses Sie für diesen Workflowaufruf verwenden möchten
Kompilierungskonfigurationsparameter hinzufügen
Sie können zusätzliche Kompilierungskonfigurationsparameter zur
create_compilation_result
Airflow-DAG-Objekt. Weitere Informationen zu
verfügbaren Parametern finden Sie in der Referenz zur CodeCompilationConfig
Dataform API.
- So fügen Sie dem
create_compilation_result
Kompilierungskonfigurationsparameter hinzu: Airflow-DAG-Objekt, fügen Sie die ausgewählten Parameter zucode_compilation_config
hinzu im folgenden Format:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
"code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
},
)
Ersetzen Sie Folgendes:
- PROJECT_ID: Ihre Dataform-Google Cloud-Projekt-ID
- REPOSITORY_ID: der Name Ihres Dataform-Repositorys
- REGION durch die Region, in der das Dataform-Repository gespeichert ist befindet sich
- GIT_COMMITISH: das Git-Commitish, z. B. einem Zweig oder Git-SHA in Ihrem Remote-Git-Repository der Version Ihres Code, den Sie verwenden möchten,
- PARAMETER: ausgewählter Parameter
CodeCompilationConfig
. Sie können mehrere Parameter hinzufügen. - PARAMETER_VALUE: Wert des ausgewählten Parameters
Im folgenden Codebeispiel wird der Parameter defaultDatabase
dem Parameter
create_compilation_result
Airflow-DAG-Objekt:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
Konfigurationsparameter für Workflowaufruf hinzufügen
Sie können zusätzliche Parameter für die Workflowaufruf-Konfiguration
create_workflow_invocation
Airflow-DAG-Objekt. Weitere Informationen zu
verfügbaren Parametern finden Sie in der Referenz zur InvocationConfig
Dataform API.
- Um dem Parameter
create_workflow_invocation
Airflow-DAG-Objekt, ausgewählte Parameter hinzufügen ininvocation_config
im folgenden Format:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "PARAMETER": PARAMETER_VALUE }
},
)
Ersetzen Sie Folgendes:
- PROJECT_ID: Ihre Dataform-Google Cloud-Projekt-ID
- REPOSITORY_ID: der Name Ihres Dataform-Repositorys
- REGION: die Region, in der das Dataform-Repository verwendet wird befindet sich
- PARAMETER: ausgewählter Parameter
InvocationConfig
. Sie können mehrere Parameter hinzufügen. - PARAMETER_VALUE: Wert des ausgewählten Parameters
Das folgende Codebeispiel zeigt die includedTags[]
und
transitiveDependenciesIncluded
Parameter hinzugefügt zu
create_workflow_invocation
Airflow-DAG-Objekt:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_Tags": ["daily"], "transitive_dependencies_included": true }
},
)
Nächste Schritte
- Informationen zum Konfigurieren von Kompilierungsüberschreibungen für Workflowausführungen finden Sie unter Konfigurieren Sie Kompilierungsüberschreibungen.
- Weitere Informationen zur Dataform API finden Sie unter Dataform API.
- Weitere Informationen zu Cloud Composer-Umgebungen finden Sie unter Übersicht über Cloud Composer.
- Um zu erfahren, wie Sie mit Workflows und Cloud Scheduler, siehe Ausführungen mit Workflows und Cloud Scheduler planen