Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Auf dieser Seite wird beschrieben, wie Sie die Integration der Datenherkunft in Cloud Composer aktivieren.
Einbindung von Data Lineage
Die Datenherkunft ist ein Dataplex Universal Catalog-Feature, mit dem Sie verfolgen können, wie sich Daten durch Ihre Systeme bewegen – woher sie kommen, wohin sie übergeben werden und welche Transformationen auf sie angewendet werden.
Cloud Composer verwendet das apache-airflow-providers-openlineage
-Paket, um die Lineage-Ereignisse zu generieren, die an die Data Lineage API gesendet werden.
Dieses Paket ist bereits in Cloud Composer-Umgebungen installiert. Wenn Sie eine andere Version dieses Pakets installieren, kann sich die Liste der unterstützten Operatoren ändern. Wir empfehlen, dies nur bei Bedarf zu tun und ansonsten die vorinstallierte Version des Pakets beizubehalten.
Data Lineage ist für Umgebungen in denselben Regionen wie Dataplex Universal Catalog-Regionen verfügbar, die Data Lineage unterstützen.
Wenn die Datenherkunft in Ihrer Cloud Composer-Umgebung aktiviert ist, meldet Cloud Composer Herkunftsinformationen an die Data Lineage API für DAGs, in denen einer der unterstützten Operatoren verwendet wird. Sie können auch benutzerdefinierte Lineage-Ereignisse senden, wenn Sie die Lineage für einen nicht unterstützten Operator melden möchten.
Sie können mit folgenden Tools auf Informationen zur Datenherkunft zugreifen:
- Data Lineage API
- Abstammungsdiagramme für unterstützte Einträge in Dataplex Universal Catalog. Weitere Informationen finden Sie in der Dataplex Universal Catalog-Dokumentation unter Abstammungsdiagramme.
Wenn Sie eine Umgebung erstellen, wird die Integration der Datenherkunft automatisch aktiviert, wenn die folgenden Bedingungen erfüllt sind:
Die Data Lineage API ist in Ihrem Projekt aktiviert. Weitere Informationen finden Sie in der Dataplex Universal Catalog-Dokumentation unter Data Lineage API aktivieren.
In Airflow ist kein benutzerdefiniertes Lineage-Backend konfiguriert.
Sie können die Integration der Datenherkunft deaktivieren, wenn Sie eine Umgebung erstellen.
Für eine vorhandene Umgebung können Sie die Einbindung der Datenherkunft jederzeit aktivieren oder deaktivieren.
Überlegungen zu Funktionen in Cloud Composer
Cloud Composer führt in den folgenden Fällen einen RPC-Aufruf aus, um Lineage-Ereignisse zu erstellen:
- Wenn eine Airflow-Aufgabe beginnt oder endet
- Wenn eine DAG-Ausführung beginnt oder endet
Weitere Informationen zu diesen Einheiten finden Sie in der Dataplex Universal Catalog-Dokumentation unter Lineage-Informationsmodell und Lineage API-Referenz.
Für den ausgegebenen Lineage-Traffic gelten Kontingente in der Data Lineage API. Cloud Composer verbraucht Schreibkontingent.
Die Preise für die Verarbeitung von Herkunftsdaten unterliegen der Herkunftspreisgestaltung. Hinweise zur Datenherkunft
Leistungsaspekte in Cloud Composer
Die Data Lineage wird am Ende der Ausführung der Airflow-Aufgabe gemeldet. Im Durchschnitt dauert die Berichterstellung zur Datenherkunft etwa 1 bis 2 Sekunden.
Dies wirkt sich nicht auf die Leistung der Aufgabe selbst aus: Airflow-Aufgaben schlagen nicht fehl, wenn die Herkunft nicht erfolgreich an die Lineage API gemeldet wird. Die Hauptoperatorlogik ist davon nicht betroffen, aber die gesamte Task-Instanz wird etwas länger ausgeführt, um die Daten zur Berichterstellungs-Lineage zu berücksichtigen.
In einer Umgebung, in der die Datenherkunft angegeben wird, fallen geringfügig höhere Kosten an, da für die Angabe der Datenherkunft zusätzliche Zeit benötigt wird.
Compliance
Die Datenherkunft bietet unterschiedliche Supportstufen für Funktionen wie VPC Service Controls. Sehen Sie sich die Überlegungen zur Datenherkunft an, um sicherzustellen, dass die Supportstufen Ihren Umgebungsanforderungen entsprechen.
Hinweise
Diese Funktion bietet unterschiedliche Compliance-Unterstützung. Sehen Sie sich zuerst die Funktionsüberlegungen für Cloud Composer und die Überlegungen zur Datenherkunft an.
Alle erforderlichen IAM-Berechtigungen für den Datenursprung sind bereits in der Rolle „Composer Worker“ (
roles/composer.worker
) enthalten. Diese Rolle ist die erforderliche Rolle für Dienstkonten der Umgebung.Weitere Informationen zu Berechtigungen für die Datenherkunft finden Sie in der Dataplex Universal Catalog-Dokumentation unter Rollen und Berechtigungen für die Datenherkunft.
Prüfen, ob ein Operator unterstützt wird
Die Unterstützung der Datenherkunft wird vom Anbieterpaket bereitgestellt, in dem sich der Operator befindet:
Prüfen Sie die Changelogs des Anbieterpakets, in dem sich der Operator befindet, auf Einträge, die OpenLineage-Unterstützung hinzufügen.
BigQueryToBigQueryOperator unterstützt OpenLineage beispielsweise ab
apache-airflow-providers-google
Version 11.0.0.Prüfen Sie die Version des Anbieterpakets, das von Ihrer Umgebung verwendet wird. Eine Liste der vorinstallierten Pakete für den in Ihrer Umgebung verwendeten Airflow-Build finden Sie hier. Sie können auch eine andere Version des Pakets in Ihrer Umgebung installieren.
Außerdem finden Sie auf der Seite Unterstützte Klassen in der apache-airflow-providers-openlineage
-Dokumentation die neuesten unterstützten Operatoren.
Einbindung von Data Lineage konfigurieren
Die Einbindung von Data Lineage für Cloud Composer wird pro Umgebung verwaltet. Das bedeutet, dass die Aktivierung des Features zwei Schritte erfordert:
- Aktivieren Sie die Data Lineage API in Ihrem Projekt.
- Aktivieren Sie die Data Lineage-Integration in einer bestimmten Cloud Composer-Umgebung.
Data Lineage in Cloud Composer aktivieren
Console
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Wählen Sie den Tab Umgebungskonfiguration aus.
Klicken Sie im Abschnitt Einbindung von Dataplex Data Lineage auf Bearbeiten.
Wählen Sie im Bereich Einbindung von Dataplex Data Lineage die Option Einbindung von Dataplex Data Lineage aktivieren aus und klicken Sie auf Speichern.
gcloud
Verwenden Sie das Argument --enable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--enable-cloud-data-lineage-integration
Ersetzen Sie Folgendes:
ENVIRONMENT_NAME
: der Name Ihrer UmgebungLOCATION
: die Region, in der sich die Umgebung befindet.
Beispiel:
gcloud composer environments update example-environment \
--location us-central1 \
--enable-cloud-data-lineage-integration
Data Lineage in Cloud Composer deaktivieren
Wenn Sie die Lineage-Einbindung in einer Cloud Composer-Umgebung deaktivieren, wird die Data Lineage API nicht deaktiviert. Wenn Sie die Herkunftsberichterstellung für Ihr Projekt vollständig deaktivieren möchten, deaktivieren Sie auch die Data Lineage API. Weitere Informationen finden Sie unter Dienste deaktivieren.
Console
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Wählen Sie den Tab Umgebungskonfiguration aus.
Klicken Sie im Abschnitt Einbindung von Dataplex Data Lineage auf Bearbeiten.
Wählen Sie im Bereich Einbindung von Dataplex Data Lineage die Option Einbindung in Dataplex Data Lineage deaktivieren aus und klicken Sie auf Speichern.
gcloud
Verwenden Sie das Argument --disable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--disable-cloud-data-lineage-integration
Ersetzen Sie Folgendes:
ENVIRONMENT_NAME
: der Name Ihrer UmgebungLOCATION
: die Region, in der sich die Umgebung befindet.
Beispiel:
gcloud composer environments update example-environment \
--location us-central1 \
--disable-cloud-data-lineage-integration
Lineage-Ereignisse in unterstützten Operatoren senden
Wenn die Data Lineage aktiviert ist, senden unterstützte Operatoren automatisch Lineage-Ereignisse. Sie müssen Ihren DAG-Code nicht ändern.
Führen Sie beispielsweise die folgende Aufgabe aus:
task = BigQueryInsertJobOperator(
task_id='snapshot_task',
dag=dag,
location='<dataset-location>',
configuration={
'query': {
'query': 'SELECT * FROM dataset.tableA',
'useLegacySql': False,
'destinationTable': {
'project_id': 'example-project',
'dataset_id': 'dataset',
'table_id': 'tableB',
},
}
},
)
Dadurch wird das folgende Lineage-Diagramm in der Dataplex Universal Catalog-UI erstellt:

Benutzerdefinierte Herkunftsereignisse senden
Sie können benutzerdefinierte Lineage-Ereignisse senden, wenn Sie die Lineage für einen Operator melden möchten, der nicht für die automatische Lineage-Berichterstellung unterstützt wird.
Beispiel: So senden Sie benutzerdefinierte Ereignisse mit:
- BashOperator: Ändern Sie den Parameter
inlets
oderoutlets
in der Aufgabendefinition. - PythonOperator: Ändern Sie den Parameter
task.inlets
odertask.outlets
in der Aufgabendefinition. - Sie können
AUTO
für den Parameterinlets
verwenden. Dadurch wird der Wert auf denoutlets
der Upstream-Aufgabe festgelegt.
Im folgenden Beispiel wird die Verwendung von Ein- und Ausgängen demonstriert:
from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO
...
bash_task = BashOperator(
task_id="bash_task",
dag=dag,
bash_command="sleep 0",
inlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table1",
)
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table2",
)
],
)
def _python_task(task):
print("Python task")
python_task = PythonOperator(
task_id="python_task",
dag=dag,
python_callable=_python_task,
inlets=[
AUTO,
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table3",
),
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table4",
)
],
)
bash_task >> python_task
In der Dataplex Universal Catalog-UI wird das folgende Lineage-Diagramm erstellt:

Lineage-Logs in Cloud Composer ansehen
Sie können Logs zur Datenherkunft über den Link auf der Seite Umgebungskonfiguration im Abschnitt Dataplex Universal Catalog-Integration für Datenherkunft ansehen.
Fehlerbehebung
Wenn keine Herkunftsdaten an die Lineage API gemeldet werden oder Sie sie nicht in Dataplex Universal Catalog sehen können, führen Sie die folgenden Schritte zur Fehlerbehebung aus:
- Achten Sie darauf, dass die Data Lineage API im Projekt Ihrer Cloud Composer-Umgebung aktiviert ist.
- Prüfen Sie, ob die Einbindung von Data Lineage in der Cloud Composer-Umgebung aktiviert ist.
- Prüfen Sie, ob der von Ihnen verwendete Operator in der automatischen Unterstützung für Lineage-Berichte enthalten ist. Unterstützte Airflow-Operatoren
- Prüfen Sie die Lineage-Logs in Cloud Composer auf mögliche Probleme.