Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Datenreihenintegration
Die Datenherkunft ist eine Dataplex-Funktion, mit der Sie verfolgen können, wie sich Daten durch Ihre Systeme bewegen – woher sie kommen, wohin sie übergeben werden und welche Transformationen auf sie angewendet werden. Die Datenherkunft ist für Folgendes verfügbar:
Cloud Composer 2-Umgebungen ab Version 2.1.2 mit Airflow-Versionen 2.2.5 und höher.
Cloud Composer 2-Umgebungen in denselben Regionen wie Data Catalog-Regionen, die Data Lineage unterstützen
Sobald das Feature in Ihrer Cloud Composer-Umgebung aktiviert ist, wird der folgende Befehl ausgeführt: DAGs, die einen der unterstützten Operatoren verwenden, führen dazu, Cloud Composer, um Herkunftsinformationen an die Data Lineage API zu melden.
Sie können dann folgendermaßen auf diese Informationen zugreifen:
- Data Lineage API
- Lineage-Visualisierungsdiagramme für unterstützte Data Catalog-Einträge in Dataplex aus. Weitere Informationen finden Sie unter Visualisierungsdiagramme für Herkunft in der Dataplex-Dokumentation.
Unterstützte Operatoren
Die folgenden Betreiber unterstützen automatische Abstammungsberichte in Cloud Composer:
airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator
airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator
airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryToBigQueryOperator
airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator
airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryToGCSOperator
airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSToBigQueryOperator
airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator
airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator
Führen Sie beispielsweise die folgende Aufgabe aus:
task = BigQueryInsertJobOperator(
task_id='snapshot_task',
dag=dag,
location='<dataset-location>',
configuration={
'query': {
'query': 'SELECT * FROM dataset.tableA',
'useLegacySql': False,
'destinationTable': {
'project_id': GCP_PROJECT,
'dataset_id': 'dataset',
'table_id': 'tableB',
},
}
},
)
Zum Erstellen des folgenden Lineage-Diagramms in der Dataplex-UI:
Überlegungen zu Cloud Composer-Features
Bei jeder Ausführung einer Airflow-Aufgabe, die die Data Lineage meldet, wird Folgendes ausgeführt:
- Eine RPC-Anfrage zum Erstellen oder Aktualisieren für einen Herkunftsprozess
- Eine RPC-Anfrage zum Erstellen oder Aktualisieren für eine Herkunftsausführung
- Eine oder mehrere RPC-Anfragen zum Erstellen von Abfolgeereignissen (meistens 0 oder 1)
Weitere Informationen zu diesen Entitäten finden Sie unter Lineage-Informationsmodell und Lineage API-Referenz in Dataplex Dokumentation.
Für den ausgegebenen Lineage-Traffic gelten Kontingente in der Data Lineage API. Cloud Composer verbraucht das Schreibkontingent.
Die Preise für die Verarbeitung von Lineage-Daten unterliegen den Lineage-Preisen. Weitere Informationen finden Sie unter Anforderungen an die Datenableitung.
Auswirkungen auf die Leistung
Die Datenherkunft wird am Ende der Airflow-Taskausführung gemeldet. Im Durchschnitt dauert die Erstellung von Datenabfolgenberichten etwa 1 bis 2 Sekunden.
Dies hat keinen Einfluss auf die Leistung der Aufgabe selbst: Airflow-Aufgaben haben keinen schlägt fehl, wenn die Herkunft nicht erfolgreich an die Lineage API gemeldet wird. Dies hat keine Auswirkungen auf die Logik des Hauptoperators, aber die gesamte Aufgabeninstanz. um die Herkunftsdaten in der Berichterstellung zu berücksichtigen.
Bei einer Umgebung, in der die Datenherkunft erfasst wird, steigen die damit verbundenen Kosten geringfügig, da für die Erfassung der Datenherkunft zusätzliche Zeit benötigt wird.
Compliance
Data Lineage bietet verschiedene Unterstützungsstufen für Funktionen wie VPC Service Controls Sehen Sie sich die Anforderungen an die Datenableitung an, um sicherzustellen, dass die Supportebenen den Anforderungen Ihrer Umgebung entsprechen.
Mit Data Lineage-Integration arbeiten
Data Lineage-Einbindung für Cloud Composer wird auf einer pro Umgebung. Das bedeutet, dass für die Aktivierung der Funktion zwei Schritte erforderlich sind:
- Aktivieren Sie die Data Lineage API in Ihrem Projekt.
- Data Lineage-Einbindung in einen bestimmten Cloud Composer aktivieren zu verbessern.
Hinweise
Wenn Sie eine Umgebung erstellen, ist die Einbindung von Data Lineage automatisch aktiviert, wenn die folgenden Bedingungen erfüllt sind:
Die Data Lineage API ist in Ihrem Projekt aktiviert. Weitere Informationen finden Sie in der Dataplex-Dokumentation unter Data Lineage API aktivieren.
Ein benutzerdefinierter Lineage-Back-End nicht in Airflow konfiguriert ist.
Bei einer vorhandenen Umgebung können Sie die Datenabstammungsintegration jederzeit aktivieren oder deaktivieren.
Erforderliche Rollen
Für die Einbindung in Data Lineage müssen die folgenden Berechtigungen für Ihre Dienstkonto der Cloud Composer-Umgebung:
- Für die Standarddienstkonten: keine Änderungen erforderlich. Standarddienstkonten enthalten die erforderlichen Berechtigungen.
- Für vom Nutzer verwaltete Dienstkonten: Weisen Sie Ihrem Dienstkonto die Rolle „Composer-Arbeiter“ (
roles/composer.worker
) zu. Diese Rolle umfasst alle erforderlichen Berechtigungen für die Datenableitung.
Weitere Informationen finden Sie in der Dataplex-Dokumentation unter Abstammungsrollen und ‑berechtigungen.
Data Lineage in Cloud Composer aktivieren
Console
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Wählen Sie den Tab Umgebungskonfiguration aus.
Klicken Sie im Abschnitt Dataplex Data Lineage Integration auf Bearbeiten.
Wählen Sie im Bereich Integration von Dataplex Data Lineage die Option Integration mit Dataplex Data Lineage aktivieren aus und klicken Sie auf Speichern.
gcloud
Verwenden Sie das Argument --enable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--enable-cloud-data-lineage-integration
Ersetzen Sie:
ENVIRONMENT_NAME
durch den Namen der Umgebung.Der Name muss mit einem Kleinbuchstaben beginnen, gefolgt von bis zu 62 Kleinbuchstaben, Ziffern oder Bindestrichen. Das letzte Zeichen darf kein Bindestrich sein. Anhand des Umgebungsnamens werden Unterkomponenten für die Umgebung erstellt. Daher müssen Sie einen Namen angeben, der auch als Cloud Storage-Bucket-Name gültig ist. Eine Liste der Einschränkungen finden Sie in den Benennungsrichtlinien für Buckets.
LOCATION
durch die Region für die Umgebung.Ein Speicherort ist die Region, in der sich der GKE-Cluster der Umgebung befindet.
Beispiel:
gcloud composer environments update example-environment \
--location us-central1 \
--enable-cloud-data-lineage-integration
Benutzerdefinierte Herkunftsereignisse senden
Sie können benutzerdefinierte Herkunftsereignisse senden, wenn Sie die Herkunft für einen Operator melden möchten die für automatische Herkunftsberichte nicht unterstützt wird.
So senden Sie benutzerdefinierte Ereignisse beispielsweise mit:
BashOperator
, ändern Sie den Parameterinlets
oderoutlets
in der Aufgabe Definition.PythonOperator
, ändern Sie den Parametertask.inlets
odertask.outlets
in der Aufgabendefinition. Wenn SieAUTO
für den Parameterinlets
verwenden, wird sein Wert auf denoutlets
der vorgeschalteten Aufgabe festgelegt.
Führen Sie beispielsweise diese Aufgabe aus:
from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO
…
bash_task = BashOperator(
task_id='bash_task',
dag=dag,
bash_command='sleep 0',
inlets=[BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table1',
)],
outlets=[BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table2',
)]
)
def _python_task(task):
task.inlets.append(BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table3',
))
task.outlets.append(BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table4',
))
python_task = PythonOperator(
task_id='python_task',
dag=dag,
python_callable=_python_task,
inlets=[AUTO],
)
bash_task >> python_task
Zum Erstellen des folgenden Lineage-Diagramms in der Dataplex-UI:
Data Lineage in Cloud Composer deaktivieren
Das Deaktivieren der Lineage-Einbindung in einer Cloud Composer-Umgebung Data Lineage API deaktivieren. Wenn Sie Lineage-Berichte für Ihr Projekt vollständig deaktivieren möchten, deaktivieren Sie auch die Data Lineage API. Weitere Informationen finden Sie unter Dienste deaktivieren.
Console
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Wählen Sie den Tab Umgebungskonfiguration aus.
Klicken Sie im Abschnitt Einbindung von Dataplex Data Lineage auf Bearbeiten.
Wählen Sie im Bereich Integration von Dataplex Data Lineage die Option Integration von Dataplex Data Lineage deaktivieren aus und klicken Sie auf Speichern.
gcloud
Verwenden Sie das Argument --disable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--disable-cloud-data-lineage-integration
Ersetzen Sie:
ENVIRONMENT_NAME
durch den Namen der Umgebung.Der Name muss mit einem Kleinbuchstaben beginnen, gefolgt von bis zu 62 Kleinbuchstaben, Ziffern oder Bindestrichen. Das letzte Zeichen darf kein Bindestrich sein. Anhand des Umgebungsnamens werden Unterkomponenten für die Umgebung erstellt. Daher müssen Sie einen Namen angeben, der auch als Cloud Storage-Bucket-Name gültig ist. Eine Liste der Einschränkungen finden Sie in den Benennungsrichtlinien für Buckets.
LOCATION
durch die Region für die Umgebung.Ein Speicherort ist die Region, in der sich der GKE-Cluster der Umgebung befindet.
Beispiel:
gcloud composer environments update example-environment \
--location us-central1 \
--disable-cloud-data-lineage-integration
Abstammungslogs in Cloud Composer ansehen
Sie können Logs im Zusammenhang mit Data Lineage über den Link auf der Seite Umgebungskonfiguration in Dataplex Data Lineage-Integration.
Fehlerbehebung
Wenn die Herkunftsdaten nicht an die Lineage API gemeldet werden oder Sie sie in Führen Sie in Dataplex die folgenden Schritte zur Fehlerbehebung aus:
- Achten Sie darauf, dass die Data Lineage API im Projekt Ihres Cloud Composer-Umgebung.
- Prüfen Sie, ob die Einbindung der Datenabfolge in der Cloud Composer-Umgebung aktiviert ist.
- Prüfen, ob der von Ihnen verwendete Bediener in der automatisierten Herkunft enthalten ist Berichterstellung unterstützt. Siehe Unterstützte Airflow-Operatoren.
- Prüfen Sie die Lineage-Logs in Cloud Composer auf mögliche Probleme.