Pipelinestatus überwachen

Auf dieser Seite wird beschrieben, wie Sie Cloud Data Fusion-Pipelineereignisse wie z. B. zu Pub/Sub-Themen. Außerdem wird beschrieben, wie Sie Cloud Run-Funktionen, die die Pub/Sub-Nachrichten verarbeiten, Maßnahmen ergreifen, z. B. fehlgeschlagene Pipelines identifizieren und wiederholen.

Hinweis

Erforderliche Rollen

Damit das Cloud Data Fusion-Dienstkonto die erforderlichen Berechtigungen zum Veröffentlichen von Pipeline-Ereignissen in einem Pub/Sub-Thema hat, bitten Sie Ihren Administrator, dem Cloud Data Fusion-Dienstkonto die IAM-Rolle Pub/Sub-Publisher (roles/pubsub.publisher) für das Projekt zuzuweisen, in dem Sie das Pub/Sub-Thema erstellen. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Ihr Administrator kann dem Cloud Data Fusion-Dienstkonto möglicherweise auch die erforderlichen Berechtigungen über benutzerdefinierte Rollen oder andere vordefinierte Rollen erteilen.

Ereignisveröffentlichung in einer Cloud Data Fusion-Instanz verwalten

Sie können die Ereignisveröffentlichung in neuen und vorhandenen Cloud Data Fusion-Instanzen mit der REST API in Version 6.7.0 und höher verwalten.

Ereignisse in einer neuen Instanz veröffentlichen

Erstellen Sie eine neue Instanz und fügen Sie das Feld EventPublishConfig ein. Weitere Informationen Informationen zu Pflichtfeldern für neue Instanzen finden Sie in der Instanzressource Referenz.

curl -X POST \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
  -d '{
    "version": "VERSION_NUMBER",
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
  }'

Ersetzen Sie Folgendes:

  • PROJECT_ID: die Google Cloud-Projekt-ID
  • LOCATION: Standort Ihres Projekts
  • INSTANCE_ID: die ID Ihrer Cloud Data Fusion-Instanz
  • VERSION_NUMBER: Die Version von Cloud Data Fusion in dem Sie die Instanz erstellen, z. B. 6.10.1
  • TOPIC_ID: die ID des Pub/Sub-Themas

Ereignisveröffentlichung in einer vorhandenen Cloud Data Fusion-Instanz aktivieren

Aktualisieren Sie die EventPublishConfig in einer vorhandenen Cloud Data Fusion-Instanz:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
  -d '{
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
}'

Ersetzen Sie Folgendes:

  • PROJECT_ID: die Google Cloud-Projekt-ID
  • LOCATION: den Standort Ihres Projekts
  • INSTANCE_ID: die ID Ihrer Cloud Data Fusion-Instanz
  • TOPIC_ID: die ID des Pub/Sub-Themas

Ereignisveröffentlichung aus einer Instanz entfernen

Wenn Sie die Ereignisveröffentlichung aus einer Instanz entfernen möchten, aktualisieren Sie den Wert enabled für die Ereignisveröffentlichung auf false:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
  -d '{ "event_publish_config": { "enabled": false } }'

Funktionen zum Lesen von Pub/Sub-Nachrichten erstellen

Cloud Run-Funktionen können Pub/Sub-Nachrichten lesen und Aktionen ausführen z. B. durch das Wiederholen fehlgeschlagener Pipelines. Um Cloud Run-Funktionen zu erstellen, Gehen Sie so vor:

  1. Wechseln Sie in der Google Cloud Console zur Seite Cloud Run-Funktionen.

    Zu den Cloud Run-Funktionen

  2. Klicken Sie auf Funktion erstellen.

  3. Geben Sie einen Funktionsnamen und eine Region ein.

  4. Wählen Sie im Feld Triggertyp die Option Cloud Pub/Sub aus.

  5. Geben Sie die ID des Pub/Sub-Themas ein.

  6. Klicken Sie auf Weiter.

  7. Fügen Sie Funktionen hinzu, um die Pub/Sub-Nachrichten zu lesen und andere Aktionen auszuführen. Sie können beispielsweise Funktionen für die folgenden Anwendungsfälle hinzufügen:

    • Benachrichtigungen bei Pipelinefehlern senden
    • Benachrichtigungen für KPIs wie die Anzahl der Datensätze oder Ausführungsinformationen senden.
    • Eine fehlgeschlagene Pipeline, die noch nicht noch einmal ausgeführt wurde, neu starten.

    Beispiele für Cloud Run-Funktionen finden Sie in der Anwendungsfall.

  8. Klicken Sie auf Bereitstellen. Weitere Informationen finden Sie unter Cloud Run-Funktion bereitstellen.

Anwendungsfall: Pipelinestatus dokumentieren und fehlgeschlagene Pipelines noch einmal versuchen

In den folgenden Beispielen für Cloud Run-Funktionen werden Pub/Sub-Nachrichten zum Status der Pipelineausführung gelesen und die fehlgeschlagenen Pipelines in Cloud Data Fusion noch einmal ausgeführt.

Diese Beispielfunktionen beziehen sich auf die folgenden Google Cloud-Komponenten:

  • Google Cloud-Projekt: Das Projekt, in dem Cloud Run-Funktionen und Pub/Sub-Themen erstellt werden.
  • Pub/Sub-Thema: das mit Ihrem Netzwerk verknüpfte Pub/Sub-Thema Ihre Cloud Data Fusion-Instanz
  • Cloud Data Fusion-Instanz: Cloud Data Fusion Instanz, in der Sie Pipelines entwerfen und ausführen
  • BigQuery-Tabelle: Die BigQuery-Tabelle, in der der Pipelinestatus und die Details zur Ausführung und Wiederholung erfasst werden
  • Cloud Run-Funktion: die Cloud Run-Funktion, in der Sie den Code bereitstellen, mit dem fehlgeschlagene Pipelines noch einmal ausgeführt werden
  1. Im folgenden Beispiel für eine Cloud Run-Funktion wird die Pub/Sub-Nachrichten zum Status von Cloud Data Fusion Ereignisse.

    # Triggered from a message on a Pub/Sub topic.
    @functions_framework.cloud_event
    def cdf_event_trigger(cloud_event):
    
    decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message.
    pubsub_message = json.loads(decoded_message)
    
    # Extract pipeline run details.
    projectName = pubsub_message["projectName"]
    publishTime = pubsub_message["publishTime"]
    instanceName = pubsub_message["instanceName"]
    namespace = pubsub_message["programStatusEventDetails"]["namespace"]
    applicationName = pubsub_message["programStatusEventDetails"]["applicationName"]
    status = pubsub_message["programStatusEventDetails"]["status"]
    event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms')
    
    print(f"projectName: {projectName}")
    print(f"publishTime: {publishTime}")
    print(f"instanceName: {instanceName}")
    print(f"namespace: {namespace}")
    print(f"applicationName: {applicationName}")
    print(f"status: {status}")
    print(f"event timestamp: {event_timestamp}")
    try:
        error = pubsub_message["programStatusEventDetails"]["error"]
        print(f"error: {error}")
    except:
        print(f"Pipeline: {applicationName}'s current status: {status}")
    
  2. Die folgende Beispielfunktion erstellt und speichert eine BigQuery-Tabelle und fragt die Details zum Pipelinelauf ab.

    # Global variables.
    pipeline_rerun_count = 0
    has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes.
    table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information.
    
    # Update BigQuery table with the pipeline status and rerun details.
    schema=[
        bigquery.SchemaField("Project_Name", "STRING"),
        bigquery.SchemaField("Instance_Name", "STRING"),
        bigquery.SchemaField("Namespace", "STRING"),
        bigquery.SchemaField("Pipeline_Name", "STRING"),
        bigquery.SchemaField("Pipeline_Status", "STRING"),
        bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"),
        bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"),
    ]
    
    # Prepare DataFrame to load the data in BigQuery.
    data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]}
    dataframe = pd.DataFrame(data)
    
    # Prepare BigQuery data load job configuration.
    job_config = bigquery.LoadJobConfig(schema=schema)
    
    job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config)
    job.result()  # Wait for the job to complete.
    
    table = bq_client.get_table(table_id)  # Make an API request.
    print("BigQuery table: {} updated.".format(table_id))
    
  3. Die folgende Beispielfunktion sucht nach Pipelines, die fehlgeschlagen sind und ob sie in der letzten Stunde noch einmal ausgeführt wurden.

    bq_client = bigquery.Client()
    
    if status == "FAILED":
        print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.")
    
        QUERY = f"""
            SELECT * FROM `{table_id}`
            WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED"
            AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE)
            AND Pipeline_Rerun_Count > 0
            """
    
        query_job = bq_client.query_and_wait(QUERY)  # API request.
        row_count = query_job.total_rows  # Waits for query to finish.
        print(f"Query job result row count: {row_count}")
    
        if (row_count > 0):
            print("Pipeline has FAILED and rerun recently...")
            global has_pipeline_failed_and_rerun_recently
            has_pipeline_failed_and_rerun_recently = True
    
  4. Wenn die fehlgeschlagene Pipeline in letzter Zeit nicht ausgeführt wurde, funktioniert die folgende Beispielfunktion: führt die fehlgeschlagene Pipeline noch einmal aus.

    if not has_pipeline_failed_and_rerun_recently:
        applicationName = applicationName
        auth_token = get_access_token()
        post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
        cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api"
        run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName)
    
        # Start the job.
        response = requests.post(run_pipeline_endpoint,headers=post_headers)
        print(f"Response for restarting the failed pipeline: {response}")
        global pipeline_rerun_count
        pipeline_rerun_count = 1
    

Nächste Schritte