Auf dieser Seite wird beschrieben, wie Sie Cloud Data Fusion-Pipelineereignisse wie den Pipelinestatus in Pub/Sub-Themen veröffentlichen. Außerdem wird beschrieben, wie Sie Cloud Run-Funktionen erstellen, die die Pub/Sub-Nachrichten verarbeiten und Aktionen ausführen, z. B. fehlgeschlagene Pipelines identifizieren und wiederholen.
Hinweise
- Erstellen Sie ein Thema, in dem Pub/Sub Cloud Data Fusion-Pipelineereignisse veröffentlichen kann.
Erforderliche Rollen
Damit das Cloud Data Fusion-Dienstkonto die erforderlichen Berechtigungen zum Veröffentlichen von Pipelineereignissen in einem Pub/Sub-Thema hat, bitten Sie Ihren Administrator, dem Cloud Data Fusion-Dienstkonto die IAM-Rolle Pub/Sub-Publisher (roles/pubsub.publisher
) für das Projekt zuzuweisen, in dem Sie das Pub/Sub-Thema erstellen.
Ihr Administrator kann dem Cloud Data Fusion-Dienstkonto möglicherweise auch die erforderlichen Berechtigungen über benutzerdefinierte Rollen oder andere vordefinierte Rollen erteilen.
Ereignisveröffentlichung in einer Cloud Data Fusion-Instanz verwalten
Sie können die Veröffentlichung von Ereignissen in neuen und vorhandenen Cloud Data Fusion-Instanzen mit der REST API in Version 6.7.0 und höher verwalten.
Termine in einer neuen Instanz veröffentlichen
Erstellen Sie eine neue Instanz und fügen Sie das Feld EventPublishConfig
ein. Weitere Informationen zu Pflichtfeldern für neue Instanzen finden Sie in der Referenz zur Instanzressource.
curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
"https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
-d '{
"version": "VERSION_NUMBER",
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
Ersetzen Sie Folgendes:
PROJECT_ID
: die Google Cloud Projekt-IDLOCATION
: Der Standort Ihres Projekts.INSTANCE_ID
: die ID Ihrer Cloud Data Fusion-InstanzVERSION_NUMBER
: Die Version von Cloud Data Fusion, in der Sie die Instanz erstellen, z. B.6.10.1
TOPIC_ID
: die ID des Pub/Sub-Themas
Ereignisveröffentlichung in einer vorhandenen Cloud Data Fusion-Instanz aktivieren
So aktualisieren Sie das Feld EventPublishConfig
in einer vorhandenen Cloud Data Fusion-Instanz:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
-d '{
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
Ersetzen Sie Folgendes:
PROJECT_ID
: die Google Cloud Projekt-IDLOCATION
: der Standort Ihres ProjektsINSTANCE_ID
: die ID Ihrer Cloud Data Fusion-InstanzTOPIC_ID
: die ID des Pub/Sub-Themas
Veröffentlichung von Ereignissen aus einer Instanz entfernen
Wenn Sie die Ereignisveröffentlichung aus einer Instanz entfernen möchten, aktualisieren Sie den Wert enabled
für die Ereignisveröffentlichung auf false
:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
-d '{ "event_publish_config": { "enabled": false } }'
Funktionen zum Lesen von Pub/Sub-Nachrichten erstellen
Cloud Run Functions können Pub/Sub-Nachrichten lesen und darauf reagieren, z. B. fehlgeschlagene Pipelines noch einmal ausführen. So erstellen Sie eine Cloud Run Functions-Funktion:
Wechseln Sie in der Google Cloud Console zur Seite Cloud Run-Funktionen.
Klicken Sie auf Funktion erstellen.
Geben Sie einen Funktionsnamen und eine Region ein.
Wählen Sie im Feld Triggertyp die Option Cloud Pub/Sub aus.
Geben Sie die Pub/Sub-Themen-ID ein.
Klicken Sie auf Weiter.
Fügen Sie Funktionen hinzu, um die Pub/Sub-Nachrichten zu lesen und andere Aktionen auszuführen. Sie können beispielsweise Funktionen für die folgenden Anwendungsfälle hinzufügen:
- Benachrichtigungen für Pipelinefehler senden
- Benachrichtigungen für KPIs wie die Anzahl der Datensätze oder Informationen zum Lauf senden.
- Eine fehlgeschlagene Pipeline neu starten, die noch nicht neu ausgeführt wurde
Beispiele für Cloud Run-Funktionen finden Sie im Abschnitt Anwendungsfall.
Klicken Sie auf Bereitstellen. Weitere Informationen finden Sie unter Cloud Run Functions bereitstellen.
Anwendungsfall: Pipelinestatus dokumentieren und fehlgeschlagene Pipelines noch einmal ausführen
Die folgenden Cloud Run-Funktionen lesen Pub/Sub-Nachrichten zum Status von Pipelineausführungen und versuchen dann, die fehlgeschlagenen Pipelines in Cloud Data Fusion noch einmal auszuführen.
Diese Beispielfunktionen beziehen sich auf die folgenden Google Cloud Komponenten:
- Google Cloud project: Das Projekt, in dem Cloud Run-Funktionen und Pub/Sub-Themen erstellt werden.
- Pub/Sub-Thema: Das Pub/Sub-Thema, das mit Ihrer Cloud Data Fusion-Instanz verknüpft ist
- Cloud Data Fusion-Instanz: Die Cloud Data Fusion-Instanz, in der Sie Pipelines entwerfen und ausführen
- BigQuery-Tabelle: Die BigQuery-Tabelle, in der der Pipelinestatus sowie die Details zu Ausführungen und erneuten Ausführungen erfasst werden.
- Cloud Run-Funktion: Die Cloud Run-Funktion, in der Sie den Code bereitstellen, mit dem fehlgeschlagene Pipelines noch einmal ausgeführt werden.
Im folgenden Beispiel für eine Cloud Run-Funktion werden die Pub/Sub-Nachrichten zu Cloud Data Fusion-Statusereignissen gelesen.
# Triggered from a message on a Pub/Sub topic. @functions_framework.cloud_event def cdf_event_trigger(cloud_event): decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message. pubsub_message = json.loads(decoded_message) # Extract pipeline run details. projectName = pubsub_message["projectName"] publishTime = pubsub_message["publishTime"] instanceName = pubsub_message["instanceName"] namespace = pubsub_message["programStatusEventDetails"]["namespace"] applicationName = pubsub_message["programStatusEventDetails"]["applicationName"] status = pubsub_message["programStatusEventDetails"]["status"] event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms') print(f"projectName: {projectName}") print(f"publishTime: {publishTime}") print(f"instanceName: {instanceName}") print(f"namespace: {namespace}") print(f"applicationName: {applicationName}") print(f"status: {status}") print(f"event timestamp: {event_timestamp}") try: error = pubsub_message["programStatusEventDetails"]["error"] print(f"error: {error}") except: print(f"Pipeline: {applicationName}'s current status: {status}")
Die folgende Beispielfunktion erstellt und speichert eine BigQuery-Tabelle und fragt die Details des Pipeline-Laufs ab.
# Global variables. pipeline_rerun_count = 0 has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes. table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information. # Update BigQuery table with the pipeline status and rerun details. schema=[ bigquery.SchemaField("Project_Name", "STRING"), bigquery.SchemaField("Instance_Name", "STRING"), bigquery.SchemaField("Namespace", "STRING"), bigquery.SchemaField("Pipeline_Name", "STRING"), bigquery.SchemaField("Pipeline_Status", "STRING"), bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"), bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"), ] # Prepare DataFrame to load the data in BigQuery. data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]} dataframe = pd.DataFrame(data) # Prepare BigQuery data load job configuration. job_config = bigquery.LoadJobConfig(schema=schema) job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config) job.result() # Wait for the job to complete. table = bq_client.get_table(table_id) # Make an API request. print("BigQuery table: {} updated.".format(table_id))
Die folgende Beispielfunktion prüft, ob Pipelines fehlgeschlagen sind und ob sie in der letzten Stunde noch einmal ausgeführt wurden.
bq_client = bigquery.Client() if status == "FAILED": print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.") QUERY = f""" SELECT * FROM `{table_id}` WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED" AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE) AND Pipeline_Rerun_Count > 0 """ query_job = bq_client.query_and_wait(QUERY) # API request. row_count = query_job.total_rows # Waits for query to finish. print(f"Query job result row count: {row_count}") if (row_count > 0): print("Pipeline has FAILED and rerun recently...") global has_pipeline_failed_and_rerun_recently has_pipeline_failed_and_rerun_recently = True
Wenn die fehlgeschlagene Pipeline nicht vor Kurzem ausgeführt wurde, wird sie mit der folgenden Beispielfunktion noch einmal ausgeführt.
if not has_pipeline_failed_and_rerun_recently: applicationName = applicationName auth_token = get_access_token() post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"} cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api" run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName) # Start the job. response = requests.post(run_pipeline_endpoint,headers=post_headers) print(f"Response for restarting the failed pipeline: {response}") global pipeline_rerun_count pipeline_rerun_count = 1