Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Einbindung von Datenreihen
Die Datenherkunft ist eine Dataplex-Funktion, mit der Sie verfolgen können, wie sich Daten durch Ihre Systeme bewegen – woher sie kommen, wohin sie übergeben werden und welche Transformationen auf sie angewendet werden. Die Datenherkunft ist für Folgendes verfügbar:
Cloud Composer 2-Umgebungen mit Version 2.1.2 und höher mit Airflow-Versionen 2.2.5 und höher
Cloud Composer 2-Umgebungen in denselben Regionen wie Dataplex-Regionen, die die Datenabfolge unterstützen
Sobald die Funktion in Ihrer Cloud Composer-Umgebung aktiviert ist, meldet Cloud Composer bei der Ausführung von DAGs, die einen der unterstützten Operatoren verwenden, Informationen zur Datenabfolge an die Data Lineage API.
Sie können dann auf diese Informationen zugreifen:
- Data Lineage API
- Herkunftsdiagramme für unterstützte Einträge in Dataplex. Weitere Informationen finden Sie in der Dataplex-Dokumentation unter Abstammungsdiagramme.
Unterstützte Operatoren
Die folgenden Betreiber unterstützen automatische Abstammungsberichte in Cloud Composer:
airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator
airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator
airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryToBigQueryOperator
airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator
airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryToGCSOperator
airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSToBigQueryOperator
airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator
airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator
Führen Sie beispielsweise die folgende Aufgabe aus:
task = BigQueryInsertJobOperator(
task_id='snapshot_task',
dag=dag,
location='<dataset-location>',
configuration={
'query': {
'query': 'SELECT * FROM dataset.tableA',
'useLegacySql': False,
'destinationTable': {
'project_id': GCP_PROJECT,
'dataset_id': 'dataset',
'table_id': 'tableB',
},
}
},
)
Das führt dazu, dass in der Dataplex-Benutzeroberfläche das folgende Herkunftsdiagramm erstellt wird:
![Beispiel für einen Stammbaum in der Dataplex-Benutzeroberfläche](https://cloud.google.com/static/composer/docs/images/lineage-auto-sample.png?authuser=19&hl=de)
Funktionsüberlegungen für Cloud Composer
Bei jeder Airflow-Aufgabenausführung, bei der die Datenherkunft erfasst wird, geschieht Folgendes:
- Eine RPC-Anfrage zum Erstellen oder Aktualisieren eines Abstammungsvorgangs
- Eine RPC-Anfrage zum Erstellen oder Aktualisieren einer Abstammungsausführung
- Eine oder mehrere RPC-Anfragen zum Erstellen von Abfolgeereignissen (meistens 0 oder 1)
Weitere Informationen zu diesen Entitäten finden Sie in der Dataplex-Dokumentation unter Informationsmodell für die Datenherkunft und Referenz für die Datenherkunft API.
Für gesendeten Lineage-Traffic gelten die Kontingente der Data Lineage API. Cloud Composer verbraucht das Schreibkontingent.
Die Preise für die Verarbeitung von Daten zu Herkunftsinformationen unterliegen den Preisen für Herkunftsinformationen. Weitere Informationen finden Sie unter Anforderungen an die Datenableitung.
Auswirkungen auf die Leistung
Die Datenabfolge wird am Ende der Ausführung der Airflow-Aufgabe erfasst. Im Durchschnitt dauert die Erstellung von Datenabfolgenberichten etwa 1 bis 2 Sekunden.
Dies hat keine Auswirkungen auf die Leistung der Aufgabe selbst: Airflow-Aufgaben schlagen nicht fehl, wenn die Abstammung nicht erfolgreich an die Lineage API gemeldet wurde. Die Hauptoperatorlogik ist davon nicht betroffen, aber die gesamte Aufgabeninstanz wird etwas länger ausgeführt, um Daten zur Berichtsabstammung zu berücksichtigen.
Bei einer Umgebung, in der die Datenherkunft erfasst wird, steigen die damit verbundenen Kosten geringfügig, da für die Erfassung der Datenherkunft zusätzliche Zeit benötigt wird.
Compliance
Die Datenableitung bietet unterschiedliche Supportebenen für Funktionen wie VPC Service Controls. Sehen Sie sich die Anforderungen an die Datenableitung an, um sicherzustellen, dass die Supportebenen den Anforderungen Ihrer Umgebung entsprechen.
Datenabstammungsintegration verwenden
Die Einbindung der Datenabfolge für Cloud Composer wird pro Umgebung verwaltet. Das bedeutet, dass die Aktivierung der Funktion zwei Schritte umfasst:
- Aktivieren Sie die Data Lineage API in Ihrem Projekt.
- Datenabstammungsintegration in einer bestimmten Cloud Composer-Umgebung aktivieren
Hinweise
Wenn Sie eine Umgebung erstellen, wird die Datenabstammungsintegration automatisch aktiviert, wenn die folgenden Bedingungen erfüllt sind:
Die Data Lineage API ist in Ihrem Projekt aktiviert. Weitere Informationen finden Sie in der Dataplex-Dokumentation unter Data Lineage API aktivieren.
Ein benutzerdefiniertes Abstammungs-Back-End ist in Airflow nicht konfiguriert.
Bei einer vorhandenen Umgebung können Sie die Datenabstammungsintegration jederzeit aktivieren oder deaktivieren.
Erforderliche Rollen
Für die Einbindung in die Datenabfolge müssen dem Dienstkonto Ihrer Cloud Composer-Umgebung die folgenden Berechtigungen hinzugefügt werden:
- Für die Standarddienstkonten sind keine Änderungen erforderlich. Standarddienstkonten enthalten die erforderlichen Berechtigungen.
- Für vom Nutzer verwaltete Dienstkonten: Weisen Sie Ihrem Dienstkonto die Rolle „Composer-Arbeiter“ (
roles/composer.worker
) zu. Diese Rolle umfasst alle erforderlichen Berechtigungen für die Datenableitung.
Weitere Informationen finden Sie in der Dataplex-Dokumentation unter Abstammungsrollen und ‑berechtigungen.
Datenabfolge in Cloud Composer aktivieren
Console
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Wählen Sie den Tab Umgebungskonfiguration aus.
Klicken Sie im Abschnitt Einbindung von Dataplex Data Lineage auf Bearbeiten.
Wählen Sie im Bereich Einbindung von Dataplex Data Lineage die Option Einbindung in Dataplex Data Lineage aktivieren aus und klicken Sie auf Speichern.
gcloud
Verwenden Sie das Argument --enable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--enable-cloud-data-lineage-integration
Ersetzen Sie:
ENVIRONMENT_NAME
durch den Namen der Umgebung.Der Name muss mit einem Kleinbuchstaben beginnen, gefolgt von bis zu 62 Kleinbuchstaben, Ziffern oder Bindestrichen. Das letzte Zeichen darf kein Bindestrich sein. Anhand des Umgebungsnamens werden Unterkomponenten für die Umgebung erstellt. Daher müssen Sie einen Namen angeben, der auch als Cloud Storage-Bucket-Name gültig ist. Eine Liste der Einschränkungen finden Sie in den Benennungsrichtlinien für Buckets.
LOCATION
durch die Region für die Umgebung.Ein Speicherort ist die Region, in der sich der GKE-Cluster der Umgebung befindet.
Beispiel:
gcloud composer environments update example-environment \
--location us-central1 \
--enable-cloud-data-lineage-integration
Benutzerdefinierte Herkunftsereignisse senden
Sie können benutzerdefinierte Abfolgeereignisse senden, wenn Sie die Abfolge für einen Betreiber erfassen möchten, der für automatisierte Abfolgeberichte nicht unterstützt wird.
Beispielsweise können Sie benutzerdefinierte Ereignisse mit folgenden Methoden senden:
BashOperator
, ändern Sie den Parameterinlets
oderoutlets
in der Aufgabendefinition.PythonOperator
, ändern Sie den Parametertask.inlets
odertask.outlets
in der Aufgabendefinition. Wenn SieAUTO
für den Parameterinlets
verwenden, wird sein Wert auf denoutlets
der vorgeschalteten Aufgabe festgelegt.
Führen Sie beispielsweise diese Aufgabe aus:
from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO
…
bash_task = BashOperator(
task_id='bash_task',
dag=dag,
bash_command='sleep 0',
inlets=[BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table1',
)],
outlets=[BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table2',
)]
)
def _python_task(task):
task.inlets.append(BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table3',
))
task.outlets.append(BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table4',
))
python_task = PythonOperator(
task_id='python_task',
dag=dag,
python_callable=_python_task,
inlets=[AUTO],
)
bash_task >> python_task
Das führt dazu, dass in der Dataplex-Benutzeroberfläche das folgende Herkunftsdiagramm erstellt wird:
![Beispiel für eine Stammbaumgrafik für benutzerdefinierte Ereignisse in der Dataplex-Benutzeroberfläche](https://cloud.google.com/static/composer/docs/images/lineage-custom-sample.png?authuser=19&hl=de)
Datenabfolge in Cloud Composer deaktivieren
Wenn Sie die Lineage-Integration in einer Cloud Composer-Umgebung deaktivieren, wird die Data Lineage API nicht deaktiviert. Wenn Sie Lineage-Berichte für Ihr Projekt vollständig deaktivieren möchten, deaktivieren Sie auch die Data Lineage API. Weitere Informationen finden Sie unter Dienste deaktivieren.
Console
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Wählen Sie den Tab Umgebungskonfiguration aus.
Klicken Sie im Abschnitt Einbindung von Dataplex Data Lineage auf Bearbeiten.
Wählen Sie im Bereich Einbindung von Dataplex Data Lineage die Option Einbindung in Dataplex Data Lineage deaktivieren aus und klicken Sie auf Speichern.
gcloud
Verwenden Sie das Argument --disable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--disable-cloud-data-lineage-integration
Ersetzen Sie:
ENVIRONMENT_NAME
durch den Namen der Umgebung.Der Name muss mit einem Kleinbuchstaben beginnen, gefolgt von bis zu 62 Kleinbuchstaben, Ziffern oder Bindestrichen. Das letzte Zeichen darf kein Bindestrich sein. Anhand des Umgebungsnamens werden Unterkomponenten für die Umgebung erstellt. Daher müssen Sie einen Namen angeben, der auch als Cloud Storage-Bucket-Name gültig ist. Eine Liste der Einschränkungen finden Sie in den Benennungsrichtlinien für Buckets.
LOCATION
durch die Region für die Umgebung.Ein Speicherort ist die Region, in der sich der GKE-Cluster der Umgebung befindet.
Beispiel:
gcloud composer environments update example-environment \
--location us-central1 \
--disable-cloud-data-lineage-integration
Abstammungslogs in Cloud Composer ansehen
Sie können Logs zur Datenableitung über den Link auf der Seite Umgebungskonfiguration im Abschnitt Dataplex-Integration für die Datenableitung prüfen.
Fehlerbehebung
Wenn Stammbaumdaten nicht an die Lineage API gesendet werden oder Sie sie nicht in Dataplex sehen können, führen Sie die folgenden Schritte zur Fehlerbehebung aus:
- Die Data Lineage API muss im Projekt Ihrer Cloud Composer-Umgebung aktiviert sein.
- Prüfen Sie, ob die Einbindung der Datenabfolge in der Cloud Composer-Umgebung aktiviert ist.
- Prüfen Sie, ob der von Ihnen verwendete Operator in der automatischen Unterstützung für Abstammungsberichte enthalten ist. Weitere Informationen finden Sie unter Unterstützte Airflow-Operatoren.
- Prüfen Sie in Cloud Composer die Abstammungsprotokolle auf mögliche Probleme.