Daten mit Cloud Functions aus Cloud Storage in BigQuery streamen

In dieser Anleitung wird gezeigt, wie Sie mit Cloud Functions neue Objekte aus einem Cloud Storage-Bucket in BigQuery streamen können. Cloud Functions ist eine ereignisgesteuerte serverlose Computing-Plattform von Google Cloud und ermöglicht automatische Skalierung, hohe Verfügbarkeit und Fehlertoleranz, ohne dass Server bereitgestellt, verwaltet, aktualisiert oder gepatcht werden müssen. Sie streamen Daten über Cloud Functions, um andere Google Cloud-Dienste zu verbinden und zu erweitern, bezahlen jedoch nur, wenn die Anwendung ausgeführt wird.

Dieser Artikel richtet sich an Datenanalysten, Entwickler oder Betreiber, die Berichte nahezu in Echtzeit für Dateien in Cloud Storage ausführen müssen. Dabei wird davon ausgegangen, dass Sie mit Linux, Cloud Storage und BigQuery vertraut sind.

Architektur

Im folgenden Architekturdiagramm werden alle Komponenten und der gesamte Ablauf der Streamingpipeline in dieser Anleitung gezeigt. Bei dieser Pipeline wird erwartet, dass JSON-Dateien in Cloud Storage hochgeladen werden, zur Unterstützung anderer Dateiformate sind jedoch kleinere Änderungen des Codes erforderlich. Die Aufnahme anderer Dateiformate wird in diesem Artikel nicht behandelt.

Architekturdiagramm der Pipeline

Im obigen Diagramm besteht die Pipeline aus folgenden Schritten:

  1. JSON-Dateien werden in den Cloud Storage-Bucket FILES_SOURCE hochgeladen.
  2. Durch dieses Ereignis wird die Cloud Functions-Funktion streamingausgelöst.
  3. Die Daten werden geparst und in BigQuery eingefügt.
  4. Der Aufnahmestatus wird in Firestore und Cloud Logging erfasst.
  5. In einem der folgenden Pub/Sub-Themen wird eine Nachricht veröffentlicht:
    • streaming_success_topic
    • streaming_error_topic
  6. Abhängig von den Ergebnissen wird die JSON-Datei von Cloud Functions aus dem Bucket FILES_SOURCE in einen der folgenden Buckets verschoben:
    • FILES_ERROR
    • FILES_SUCCESS

Ziele

  • Cloud Storage-Bucket zum Speichern der JSON-Dateien erstellen
  • BigQuery-Dataset und -Tabelle erstellen, in das bzw. die die Daten gestreamt werden sollen
  • Cloud Functions-Funktion konfigurieren, die ausgelöst wird, wenn dem Bucket Dateien hinzugefügt werden
  • Pub/Sub-Themen einrichten
  • Weitere Funktionen für die Verarbeitung der Funktionsausgabe konfigurieren
  • Streamingpipeline testen
  • Cloud Monitoring so konfigurieren, dass bei unerwartetem Verhalten eine Warnung ausgegeben wird

Kosten

In dieser Anleitung werden die folgenden kostenpflichtigen Komponenten von Google Cloud verwendet:

  • Cloud Storage
  • Cloud Functions
  • Firestore
  • BigQuery
  • Logging
  • Überwachung
  • Container Registry
  • Cloud Build

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Hinweis

  1. Melden Sie sich bei Ihrem Google-Konto an.

    Wenn Sie noch kein Konto haben, melden Sie sich hier für ein neues Konto an.

  2. Wählen Sie in der Google Cloud Console auf der Seite der Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.

    Zur Projektauswahl

  3. Die Abrechnung für das Cloud-Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für Ihr Projekt aktiviert ist.

  4. Cloud Functions and Cloud Build APIs aktivieren.

    Aktivieren Sie die APIs

  5. Wechseln Sie in der Cloud Console zu "Monitoring":

    Zu Cloud Monitoring

    Wenn Sie Cloud Monitoring noch nie verwendet haben, wird beim ersten Zugriff auf Monitoring in der Google Cloud Console automatisch ein Arbeitsbereich erstellt und Ihr Projekt mit diesem Arbeitsbereich verknüpft. Wenn Ihr Projekt nicht mit einem Arbeitsbereich verknüpft ist, wird ein Dialogfeld angezeigt, in dem Sie entweder einen Arbeitsbereich erstellen oder dieses Projekt zu einem vorhandenen Arbeitsbereich hinzufügen können. Wir empfehlen, einen Arbeitsbereich zu erstellen. Klicken Sie nach der Auswahl auf Hinzufügen.

Nach Abschluss dieser Anleitung können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.

Umgebung einrichten

In dieser Anleitung verwenden Sie Cloud Shell zum Eingeben von Befehlen. Cloud Shell bietet Zugriff auf die Befehlszeile in der Cloud Console und umfasst das Cloud SDK und andere Tools, die Sie in Google Cloud entwickeln. Cloud Shell wird als Fenster unten in der Cloud Console angezeigt. Die Initialisierung kann einige Minuten dauern, aber das Fenster ist sofort sichtbar.

So richten Sie mit Cloud Shell die Umgebung ein und klonen das Git-Repository, das in dieser Anleitung verwendet wird:

  1. Öffnen Sie Cloud Shell in der Cloud Console.

    Zu Cloud Shell

  2. Sie müssen in dem Projekt arbeiten, das Sie gerade erstellt haben. Ersetzen Sie [YOUR_PROJECT_ID] durch Ihr neu erstelltes Google Cloud-Projekt.

    gcloud config set project [YOUR_PROJECT_ID]
    
  3. Stellen Sie die Standardzone für die Verarbeitung ein. Für diese Anleitung ist dies us-east1. Wenn die Bereitstellung in einer Produktionsumgebung erfolgt, wählen Sie hierzu eine Region Ihrer Wahl aus.

    REGION=us-east1
    
  4. Klonen Sie das Repository mit den Funktionen, die in dieser Anleitung verwendet werden.

    git clone https://github.com/GoogleCloudPlatform/solutions-gcs-bq-streaming-functions-python
    cd solutions-gcs-bq-streaming-functions-python
    

Quell- und Zielsenken für Streaming erstellen

Sie benötigen einen Cloud Storage-Bucket des Typs FILES_SOURCE und eine Zieltabelle in BigQuery, um Inhalte nach BigQuery zu streamen.

Cloud Storage-Bucket erstellen

Sie erstellen einen Cloud Storage-Bucket, der die Quelle der in dieser Anleitung vorgestellten Streamingpipeline darstellt. Der Hauptzweck dieses Buckets besteht darin, die in BigQuery gestreamten JSON-Dateien vorübergehend zu speichern.

  • Erstellen Sie den Cloud Storage-Bucket FILES_SOURCE, wobei FILES_SOURCE als Umgebungsvariable mit einem eindeutigen Namen eingerichtet wird.

    FILES_SOURCE=${DEVSHELL_PROJECT_ID}-files-source-$(date +%s)
    gsutil mb -c regional -l ${REGION} gs://${FILES_SOURCE}
    

BigQuery-Tabelle erstellen

In diesem Abschnitt wird eine BigQuery-Tabelle erstellt, die als Inhaltsziel für die Dateien verwendet wird. In BigQuery können Sie das Tabellenschema angeben, wenn Sie Daten in die Tabelle laden oder eine neue Tabelle erstellen. In diesem Abschnitt erstellen Sie die Tabelle und geben gleichzeitig das zugehörige Schema an.

  1. Erstellen Sie ein BigQuery-Dataset und eine Tabelle. Das in der Datei schema.json definierte Schema muss mit dem Schema der Dateien aus dem Bucket FILES_SOURCE übereinstimmen.

    bq mk mydataset
    bq mk mydataset.mytable schema.json
    
  2. Überprüfen Sie, ob die Tabelle erstellt wurde.

    bq ls --format=pretty mydataset
    

    Die Ausgabe sieht so aus:

    +---------+-------+--------+-------------------+
    | tableId | Type  | Labels | Time Partitioning |
    +---------+-------+--------+-------------------+
    | mytable | TABLE |        |                   |
    +---------+-------+--------+-------------------+
    

Daten nach BigQuery streamen

Nachdem Sie jetzt die Quell- und Zielsenken erstellt haben, generieren Sie nun die Cloud Functions-Funktion, um Daten aus Cloud Storage nach BigQuery zu streamen.

Cloud Functions-Funktion für das Streaming einrichten

Die Streaming-Funktion überwacht neue Dateien, die dem FILES_SOURCE-Bucket hinzugefügt werden, und löst dann einen Prozess aus, mit dem folgende Aktionen ausgeführt werden:

  • Die Datei wird geparst und validiert.
  • Es wird überprüft, ob es Duplikate gibt.
  • Der Inhalt der Datei wird in BigQuery eingefügt.
  • Der Aufnahmestatus wird in Firestore und Logging protokolliert.
  • Es wird eine Nachricht in einem Fehler- oder Erfolgsthema in Pub/Sub veröffentlicht.

So stellen Sie die Funktion bereit:

  1. Erstellen Sie einen Cloud Storage-Bucket für das Staging der Funktionen bei der Bereitstellung, wobei FUNCTIONS_BUCKET als Umgebungsvariable mit einem eindeutigen Namen eingerichtet wird.

    FUNCTIONS_BUCKET=${DEVSHELL_PROJECT_ID}-functions-$(date +%s)
    gsutil mb -c regional -l ${REGION} gs://${FUNCTIONS_BUCKET}
    
  2. Stellen Sie die Funktion streaming bereit. Der Implementierungscode ist im Ordner ./functions/streaming. Dieser Vorgang kann einige Minuten dauern.

    gcloud functions deploy streaming --region=${REGION} \
        --source=./functions/streaming --runtime=python37 \
        --stage-bucket=${FUNCTIONS_BUCKET} \
        --trigger-bucket=${FILES_SOURCE}
    

    Mit diesem Code wird eine in Python geschriebene Cloud Functions-Funktion bereitgestellt, die den Namen streaming trägt. Sie wird ausgelöst, wenn eine Datei zum Bucket FILES_SOURCE hinzugefügt wird.

  3. Überprüfen Sie, ob die Funktion bereitgestellt wurde.

    gcloud functions describe streaming  --region=${REGION} \
        --format="table[box](entryPoint, status, eventTrigger.eventType)"
    

    Die Ausgabe sieht so aus:

    ┌────────────────┬────────┬────────────────────────────────┐
    │  ENTRY_POINT   │ STATUS │           EVENT_TYPE           │
    ├────────────────┼────────┼────────────────────────────────┤
    │ streaming      │ ACTIVE │ google.storage.object.finalize │
    └────────────────┴────────┴────────────────────────────────┘
    
  4. Stellen Sie ein Pub/Sub-Thema namens streaming_error_topic als Fehlerpfad bereit.

    STREAMING_ERROR_TOPIC=streaming_error_topic
    gcloud pubsub topics create ${STREAMING_ERROR_TOPIC}
    
  5. Stellen Sie ein Pub/Sub-Thema namens streaming_success_topic als Erfolgspfad bereit.

    STREAMING_SUCCESS_TOPIC=streaming_success_topic
    gcloud pubsub topics create ${STREAMING_SUCCESS_TOPIC}
    

Firestore-Datenbank einrichten

Es ist wichtig zu verstehen, was beim Streamen von Daten in BigQuery bei den einzelnen Dateiaufnahmen geschieht. Beispiel: Sie haben nicht ordnungsgemäß importierte Dateien. In diesem Fall müssen Sie die Ursache des Problems ermitteln und es beheben, damit Sie am Ende der Pipeline keine fehlerhaften Daten und ungenauen Berichte erhalten. Mit der Funktion streaming, die im vorhergehenden Abschnitt bereitgestellt wurde, wird der Dateiaufnahmestatus in Firestore-Dokumenten gespeichert, damit Sie aktuelle Fehler abfragen und so Probleme beheben können.

Führen Sie die folgenden Schritte aus, um die Firestore-Instanz zu erstellen:

  1. Wechseln Sie in der Google Cloud Console zu Firestore.

    Firestore aufrufen

  2. Klicken Sie im Fenster Cloud Firestore-Modus auswählen auf Nativen Modus auswählen.

  3. Wählen Sie in der Liste Standort auswählen den Eintrag nam5 (United States) aus und klicken Sie auf Datenbank erstellen. Warten Sie, bis die Firestore-Initialisierung abgeschlossen ist. Dieser Vorgang dauert in der Regel einige Minuten.

Streamingfehler behandeln

Sie erstellen eine weitere Cloud Functions-Funktion, die in streaming_error_topic veröffentlichte Nachrichten überwacht, um einen Pfad zum Behandeln von Fehlerdateien bereitzustellen. Sie müssen entscheiden, wie solche Fehler in einer Produktionsumgebung in Ihrem Unternehmen behandelt werden. In dieser Anleitung werden problematische Dateien in einen anderen Cloud Storage-Bucket verschoben, um die Fehlerbehebung zu vereinfachen.

  1. Erstellen Sie einen Cloud Storage-Bucket, in dem problematische Dateien gespeichert werden. FILES_ERROR wird als Umgebungsvariable mit einem eindeutigen Namen für den Bucket eingerichtet, in dem Fehlerdateien gespeichert werden.

    FILES_ERROR=${DEVSHELL_PROJECT_ID}-files-error-$(date +%s)
    gsutil mb -c regional -l ${REGION} gs://${FILES_ERROR}
    
  2. Erstellen Sie die Funktion streaming_error zum Behandeln von Fehlern. Dieser Vorgang kann einige Minuten dauern.

    gcloud functions deploy streaming_error --region=${REGION} \
        --source=./functions/move_file \
        --entry-point=move_file --runtime=python37 \
        --stage-bucket=${FUNCTIONS_BUCKET} \
        --trigger-topic=${STREAMING_ERROR_TOPIC} \
        --set-env-vars SOURCE_BUCKET=${FILES_SOURCE},DESTINATION_BUCKET=${FILES_ERROR}
    

    Dieser Befehl sieht ähnlich wie der Befehl zum Bereitstellen der Funktion streaming aus. Der Hauptunterschied besteht darin, dass die Funktion in diesem Befehl von einer in einem Thema veröffentlichten Nachricht ausgelöst wird und zwei Umgebungsvariablen empfängt: die Variable SOURCE_BUCKET, aus der Dateien kopiert werden, und die Variable DESTINATION_BUCKET, in die Dateien kopiert werden.

  3. Prüfen Sie, ob die Funktion streaming_error erstellt wurde.

    gcloud functions describe streaming_error --region=${REGION} \
        --format="table[box](entryPoint, status, eventTrigger.eventType)"
    

    Die Ausgabe sieht so aus:

    ┌─────────────┬────────┬─────────────────────────────┐
    │ ENTRY_POINT │ STATUS │          EVENT_TYPE         │
    ├─────────────┼────────┼─────────────────────────────┤
    │ move_file   │ ACTIVE │ google.pubsub.topic.publish │
    └─────────────┴────────┴─────────────────────────────┘
    

Erfolgreiches Streaming behandeln

Sie erstellen eine dritte Cloud Functions-Funktion, die in streaming_success_topic veröffentlichte Nachrichten überwacht, um einen Pfad zum Behandeln von Erfolgsdateien bereitzustellen. In dieser Anleitung werden erfolgreich aufgenommene Dateien in einem Coldline Cloud Storage-Bucket archiviert.

  1. Erstellen Sie den Coldline Cloud Storage-Bucket. FILES_SUCCESS wird als Umgebungsvariable mit einem eindeutigen Namen für den Bucket eingerichtet, in dem Erfolgsdateien gespeichert werden.

    FILES_SUCCESS=${DEVSHELL_PROJECT_ID}-files-success-$(date +%s)
    gsutil mb -c coldline -l ${REGION} gs://${FILES_SUCCESS}
    
  2. Erstellen Sie die Funktion streaming_success für den Erfolgsfall. Dieser Vorgang kann einige Minuten dauern.

    gcloud functions deploy streaming_success --region=${REGION} \
        --source=./functions/move_file \
        --entry-point=move_file --runtime=python37 \
        --stage-bucket=${FUNCTIONS_BUCKET} \
        --trigger-topic=${STREAMING_SUCCESS_TOPIC} \
        --set-env-vars SOURCE_BUCKET=${FILES_SOURCE},DESTINATION_BUCKET=${FILES_SUCCESS}
    
  3. Überprüfen Sie, ob die Funktion erstellt wurde.

    gcloud functions describe streaming_success  --region=${REGION} \
        --format="table[box](entryPoint, status, eventTrigger.eventType)"
    

    Die Ausgabe sieht so aus:

    ┌─────────────┬────────┬─────────────────────────────┐
    │ ENTRY_POINT │ STATUS │          EVENT_TYPE         │
    ├─────────────┼────────┼─────────────────────────────┤
    │ move_file   │ ACTIVE │ google.pubsub.topic.publish │
    └─────────────┴────────┴─────────────────────────────┘
    

Streamingpipeline testen

An diesem Punkt haben Sie die Erstellung der Streamingpipeline abgeschlossen. Nun müssen die verschiedenen Pfade getestet werden. Sie beginnen mit dem Test der Aufnahme neuer Dateien. Dann folgt die Aufnahme der Duplikatdateien und schließlich die Aufnahme problematischer Dateien.

Neue Dateien aufnehmen

Sie laden eine Datei hoch, die die gesamte Pipeline erfolgreich durchlaufen muss, um die Aufnahme neuer Dateien zu testen. Damit das Verhalten wie gewünscht ausfällt, müssen Sie alle Speicheroptionen prüfen: BigQuery, Firestore und die Cloud Storage-Buckets.

  1. Laden Sie die Datei data.json in den Bucket FILES_SOURCE hoch.

    gsutil cp test_files/data.json gs://${FILES_SOURCE}
    

    Die Ausgabe:

    Operation completed over 1 objects/312.0 B.
    
  2. Fragen Sie die Daten in BigQuery ab.

    bq query 'select first_name, last_name, dob from mydataset.mytable'
    

    Mit diesem Befehl wird der Inhalt der Datei data.json ausgegeben:

    +------------+-----------+------------+
    | first_name | last_name |    dob     |
    +------------+-----------+------------+
    | John       | Doe       | 1968-01-22 |
    +------------+-----------+------------+
    
  3. Rufen Sie in der Cloud Console die Seite "Firestore" auf.

    Firestore aufrufen

  4. Wechseln Sie zum Dokument / > streaming_files > data.json, um zu überprüfen, ob sich das Feld success: true dort befindet. Die Funktion streaming speichert den Dateistatus in einer Sammlung mit dem Namenstreaming_files und verwendet den Dateinamen als Dokument-ID.

    Gewährleisten, dass der Erfolgsstatus der Datei von der Funktion "streaming" gespeichert wird

  5. Rufen Sie Cloud Shell wieder auf.

    Zu Cloud Shell

  6. Prüfen Sie, ob die aufgenommene Datei von der Funktion streaming_success aus dem Bucket FILES_SOURCE entfernt wurde.

    gsutil ls -l gs://${FILES_SOURCE}/data.json
    

    Die Ausgabe ist eine CommandException, da die Datei nicht mehr im Bucket FILES_SOURCE vorhanden ist.

  7. Überprüfen Sie, ob sich die aufgenommene Datei nun im Bucket FILES_SUCCESS befindet.

    gsutil ls -l gs://${FILES_SUCCESS}/data.json
    

    Die Ausgabe sieht so aus:

    TOTAL: 1 objects, 312 bytes.
    

Bereits verarbeitete Dateien aufnehmen

Der Dateiname wird in Firestore als Dokument-ID verwendet. So kann die Funktion streaming einfach abfragen, ob eine bestimmte Datei verarbeitet wurde. Dateien, die zuvor bereits erfolgreich aufgenommen wurden, können nicht noch einmal hinzugefügt werden. Entsprechende Versuche werden ignoriert. Damit soll verhindert werden, dass Informationen in BigQuery doppelt erfasst werden und zu falschen Berichten führen.

In diesem Abschnitt sorgen Sie dafür, dass die Pipeline wie erwartet funktioniert, wenn doppelt vorhandene Dateien in den Bucket FILES_SOURCE hochgeladen werden.

  1. Laden Sie dieselbe Datei data.json noch einmal in den Bucket FILES_SOURCE hoch.

    gsutil cp test_files/data.json gs://${FILES_SOURCE}
    

    Die Ausgabe sieht so aus:

    Operation completed over 1 objects/312.0 B.
    
  2. Bei der Abfrage von BigQuery wird dasselbe Ergebnis wie zuvor zurückgegeben. Das bedeutet, dass die Datei in der Pipeline verarbeitet, ihr Inhalt jedoch nicht in BigQuery eingefügt wurde, weil er bereits zuvor aufgenommen wurde.

    bq query 'select first_name, last_name, dob from mydataset.mytable'
    

    Die Ausgabe sieht so aus:

    +------------+-----------+------------+
    | first_name | last_name |    dob     |
    +------------+-----------+------------+
    | John       | Doe       | 1968-01-22 |
    +------------+-----------+------------+
    
  3. Rufen Sie in der Cloud Console die Seite "Firestore" auf.

    Firestore aufrufen

  4. Prüfen Sie im Dokument / streaming_files data.json, ob das neue Feld **duplication_attempts** hinzugefügt wurde.

    Gewährleisten, dass "duplication_attempts" von der Funktion "streaming" gespeichert werden

    Jedes Mal, wenn eine Datei mit demselben Namen wie eine vorher erfolgreich verarbeitete Datei dem Bucket FILES_SOURCE hinzugefügt wird, wird der Inhalt der Datei ignoriert und ein neuer Duplikationsversuch an das Feld **duplication_attempts** in Firestore angehängt.

  5. Rufen Sie Cloud Shell wieder auf.

    Zu Cloud Shell

  6. Prüfen Sie, ob sich die Duplikatdatei noch im Bucket FILES_SOURCE befindet.

    gsutil ls -l gs://${FILES_SOURCE}/data.json
    

    Die Ausgabe sieht so aus:

    TOTAL: 1 objects, 312 bytes.
    

    Im Duplikationsszenario protokolliert die Funktion streaming das unerwartete Verhalten in Logging, ignoriert die Aufnahme und belässt die Datei für spätere Analysen im Bucket FILES_SOURCE.

Dateien mit Fehlern aufnehmen

Sie haben nun festgestellt, dass die Streamingpipeline funktioniert und Duplikate nicht in BigQuery aufgenommen werden. Jetzt müssen Sie den Fehlerpfad überprüfen.

  1. Laden Sie data_error.json in den Bucket FILES_SOURCE hoch.

    gsutil cp test_files/data_error.json gs://${FILES_SOURCE}
    

    Die Ausgabe sieht so aus:

    Operation completed over 1 objects/311.0 B.
    
  2. Bei der Abfrage von BigQuery wird dasselbe Ergebnis wie zuvor zurückgegeben. Das bedeutet, dass die Datei in der Pipeline verarbeitet, der Inhalt jedoch nicht in BigQuery eingefügt wurde, weil er nicht dem erwarteten Schema entspricht.

    bq query 'select first_name, last_name, dob from mydataset.mytable'
    

    Die Ausgabe sieht so aus:

    +------------+-----------+------------+
    | first_name | last_name |    dob     |
    +------------+-----------+------------+
    | John       | Doe       | 1968-01-22 |
    +------------+-----------+------------+
    
  3. Rufen Sie in der Cloud Console die Seite "Firestore" auf.

    Firestore aufrufen

  4. Überprüfen Sie im Dokument / > streaming_files > data.json, ob das Feld success: false hinzugefügt wurde.

    Gewährleisten, dass der Fehlerstatus der Datei von der Funktion "streaming" gespeichert wird

    Bei fehlerhaften Dateien wird von der Funktion streaming auch ein error_message-Feld gespeichert, das detaillierte Informationen darüber enthält, warum die Datei nicht aufgenommen wurde.

  5. Rufen Sie Cloud Shell wieder auf.

    Zu Cloud Shell

  6. Prüfen Sie, ob die Datei von der Funktion streaming_error aus dem Bucket FILES_SOURCE entfernt wurde.

    gsutil ls -l gs://${FILES_SOURCE}/data_error.json
    

    Die Ausgabe ist eine CommandException, da die Datei nicht mehr im Bucket FILES_SOURCE vorhanden ist.

  7. Überprüfen Sie, ob sich die Datei jetzt wie erwartet im Bucket FILES_ERROR befindet.

    gsutil ls -l gs://${FILES_ERROR}/data_error.json
    

    Die Ausgabe sieht so aus:

    TOTAL: 1 objects, 311 bytes.
    

Datenaufnahmeprobleme ermitteln und beheben

Mit Abfragen der Sammlung streaming_files in Firestore können Sie schnell Probleme diagnostizieren und beheben. In diesem Abschnitt filtern Sie alle Fehlerdateien mit der standardmäßigen Python API für Firestore.

db = firestore.Client()
docs = db.collection(u'streaming_files')\
    .where(u'success', u'==', False)\
    .get()

So rufen Sie die Ergebnisse der Abfrage in Ihrer Umgebung auf:

  1. Erstellen Sie eine virtuelle Umgebung im Ordner firestore.

    pip install virtualenv
    virtualenv firestore
    source firestore/bin/activate
    
  2. Installieren Sie das Python Firestore-Modul in der virtuellen Umgebung.

    pip install google-cloud-firestore
    
  3. Visualisieren Sie die vorhandenen Pipelineprobleme.

    python firestore/show_streaming_errors.py
    

    Die Datei show_streaming_errors.py enthält die Firestore-Abfrage und anderen Standardcode für Ergebnisschleifen und die Ausgabeformatierung. Nach der Ausführung des vorhergehenden Befehls sieht die Ausgabe etwa so aus:

    +-----------------+-------------------------+----------------------------------------------------------------------------------+
    | File Name       | When                    | Error Message                                                                    |
    +-----------------+-------------------------+----------------------------------------------------------------------------------+
    | data_error.json | 2019-01-22 11:31:58 UTC | Error streaming file 'data_error.json'. Cause: Traceback (most recent call las.. |
    +-----------------+-------------------------+----------------------------------------------------------------------------------+
    
  4. Deaktivieren Sie die virtuelle Umgebung, wenn Sie die Analyse abschließen.

    deactivate
    

    Ermitteln und reparieren Sie die problematischen Dateien und laden Sie sie dann noch einmal mit demselben Dateinamen in den Bucket FILES_SOURCE hoch. Bei diesem Prozess werden die Dateien durch die gesamte Streamingpipeline geführt, um ihren Inhalt in BigQuery einzufügen.

Benachrichtigung zu unerwartetem Verhalten

In Produktionsumgebungen ist es wichtig, Vorgänge zu überwachen und Benachrichtigungen zu senden, wenn unerwartetes Verhalten auftritt. Eine von vielen Logging-Funktionen sind benutzerdefinierte Messwerte. Mit benutzerdefinierten Messwerten können Sie Benachrichtigungsrichtlinien erstellen, damit Sie und Ihr Team informiert werden, wenn der Messwert mit angegebenen Kriterien übereinstimmt.

In diesem Abschnitt konfigurieren Sie Monitoring, um E-Mail-Benachrichtigungen zu senden, wenn eine Dateiaufnahme nicht erfolgreich ist. Die folgende Konfiguration verwendet standardmäßige Python-Meldungen des Typs logging.error(..), um fehlerhafte Aufnahmen zu identifizieren.

  1. Wechseln Sie in der Cloud Console zur Seite "Logbasierte Messwerte".

    Zur Seite "Logbasierte Messwerte"

  2. Klicken Sie auf Messwert erstellen.

  3. Wählen Sie in der Liste Filter die Option In erweiterten Filter umwandeln aus.

    Erweitertes Filtermenü

  4. Fügen Sie die folgende Konfiguration in den erweiterten Filtern ein.

    resource.type="cloud_function"
    resource.labels.function_name="streaming"
    resource.labels.region="us-east1"
    "Error streaming file "
    

    Konfiguration zum Einfügen in den erweiterten Filter

  5. Füllen Sie im Messwerteditor die folgenden Felder aus und klicken Sie dann auf Messwert erstellen.

    • Geben Sie im Feld Name streaming-error ein.
    • Geben Sie im Abschnitt Label im Feld Name den Namen payload_error ein.
    • Wählen Sie in der Liste Labeltyp die Option String aus.
    • Wählen Sie in der Liste Feldname den Eintrag textPayload aus.
    • Geben Sie im Feld Regulärer Ausdruck zur Extraktion den Ausdruck (Error streaming file '.*'.) ein.
    • Wählen Sie in der Liste Typ den Typ Zähler aus.

      Felder des Messwerteditors

  6. Wählen Sie in der Cloud Console Monitoring aus oder verwenden Sie die folgende Schaltfläche:

    Zu Monitoring

  7. Wählen Sie im Navigationsbereich für das Monitoring Benachrichtigungen und dann Richtlinie erstellen aus.

  8. Geben Sie im Feld Richtlinie benennen den Wert streaming-error-alert ein.

  9. Klicken Sie auf Bedingung hinzufügen.

    • Geben Sie im Feld Titel streaming-error-condition ein.
    • Geben Sie im Feld Messwert den Wert logging/user/streaming-error ein.
    • Wählen Sie in der Liste Bedingung erfüllt, wenn die Option Bei jedem Verstoß aus.
    • Wählen Sie in der Liste Bedingung die Option über aus.
    • Geben Sie im Feld Schwellenwert den Wert 0 ein.
    • Wählen Sie in der Liste Für die Option 1 Minute aus.
  10. Wählen Sie in der Liste Typ Benachrichtigungskanal die Option E-Mail aus, geben Sie Ihre E-Mail-Adresse ein und klicken Sie auf Benachrichtigungskanal hinzufügen.

  11. (Optional) Klicken Sie auf Dokumentation und tragen Sie alle Informationen ein, die in einer Benachrichtigung angezeigt werden sollen.

  12. Klicken Sie auf Speichern.

Nach dem Speichern der Benachrichtigungsrichtlinie überwacht Monitoring die Fehlerlogs der Funktion streaming und sendet eine E-Mail-Benachrichtigung, sobald es innerhalb eines Minutenintervalls zu Streamingfehlern kommt.

Bereinigen

Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder behalten Sie das Projekt und löschen Sie die einzelnen Ressourcen.

Projekt löschen

  1. Wechseln Sie in der Cloud Console zur Seite Ressourcen verwalten.

    Zur Seite „Ressourcen verwalten“

  2. Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie dann auf Löschen.
  3. Geben Sie im Dialogfeld die Projekt-ID ein und klicken Sie auf Shut down (Beenden), um das Projekt zu löschen.

Nächste Schritte