Pipelinenachrichten protokollieren

Sie können die integrierte Logging-Infrastruktur des Apache Beam SDK verwenden, um Informationen während der Ausführung Ihrer Pipeline zu protokollieren. Mit der Google Cloud Console können Sie Logging-Informationen während und nach der Ausführung Ihrer Pipeline überwachen.

Einer Pipeline Lognachrichten hinzufügen

Java: SDK 2.x

Beim Apache Beam SDK for Java wird empfohlen, für das Logging von Worker-Nachrichten die Open-Source-Bibliothek SLF4J (Simple Logging Facade for Java) einzusetzen. Das Apache Beam SDK for Java implementiert die erforderliche Logging-Infrastruktur. Daher muss Ihr Java-Code nur die SLF4J API importieren Anschließend wird ein Logger instanziiert, um das Nachrichtenlogging in Ihrem Pipeline-Code zu aktivieren.

Bei bereits vorhandenem Code und/oder Bibliotheken richtet das Apache Beam SDK for Java eine zusätzliche Logging-Infrastruktur ein. Dies erfolgt bei Ausführung auf dem Worker, um Lognachrichten zu erfassen, die von den folgenden Logging-Bibliotheken für Java erstellt wurden:

Python

Das Apache Beam SDK for Python stellt das Bibliothekspaket logging bereit, damit der Code auf den Workern einer Pipeline Lognachrichten ausgeben kann. Damit Sie die Bibliotheksfunktionen nutzen können, müssen Sie die Bibliothek importieren:

import logging

Java: SDK 1.x

Codebeispiel für Worker-Lognachrichten

Java: SDK 2.x

Das WordCount-Beispiel in Apache Beam kann so geändert werden, dass eine Lognachricht ausgegeben wird, wenn das Wort "love" in einer Zeile des verarbeiteten Texts gefunden wird. Der ergänzte Code ist unten fett dargestellt (der Code davor und danach zeigt den Kontext).

 package org.apache.beam.examples;
 // Import SLF4J packages.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ...
 public class WordCount {
   ...
   static class ExtractWordsFn extends DoFn<String, String> {
     // Instantiate Logger.
     // Suggestion: As shown, specify the class name of the containing class
     // (WordCount).
     private static final Logger LOG = LoggerFactory.getLogger(WordCount.class);
     ...
     @ProcessElement
     public void processElement(ProcessContext c) {
       ...
       // Output each word encountered into the output PCollection.
       for (String word : words) {
         if (!word.isEmpty()) {
           c.output(word);
         }
         // Log INFO messages when the word "love" is found.
         if(word.toLowerCase().equals("love")) {
           LOG.info("Found " + word.toLowerCase());
         }
       }
     }
   }
 ... // Remaining WordCount example code ...

Python

Das Beispiel wordcount.py in Apache Beam kann so geändert werden, dass eine Lognachricht ausgegeben wird, wenn das Wort "love" in einer Zeile des verarbeiteten Texts gefunden wird.

# import Python logging module.
import logging

class ExtractWordsFn(beam.DoFn):
  def process(self, element):
    words = re.findall(r'[A-Za-z\']+', element)
    for word in words:
      yield word

      if word.lower() == 'love':
        # Log using the root logger at info or higher levels
        logging.info('Found : %s', word.lower())

# Remaining WordCount example code ...

Java: SDK 1.x

Java: SDK 2.x

Wenn die geänderte WordCount-Pipeline lokal mit dem standardmäßigen DirectRunner ausgeführt und die Ausgabe an eine lokale Datei (--output=./local-wordcounts) gesendet wird, enthält die Konsolenausgabe die hinzugefügten Lognachrichten:

INFO: Executing pipeline using the DirectRunner.
...
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
...
INFO: Pipeline execution complete.

Standardmäßig werden nur Log-Zeilen mit der Kennzeichnung INFO und höher an Cloud Logging gesendet. Wenn Sie dies ändern möchten, finden Sie weitere Informationen unter Log-Ebenen für Pipeline-Worker festlegen.

Python

Wenn die geänderte WordCount-Pipeline lokal mit dem standardmäßigen DirectRunner ausgeführt und die Ausgabe an eine lokale Datei (--output=./local-wordcounts) gesendet wird, enthält die Konsolenausgabe die hinzugefügten Lognachrichten:

INFO:root:Found : love
INFO:root:Found : love
INFO:root:Found : love

Standardmäßig werden nur Logzeilen mit der Kennzeichnung INFO und höher an Cloud Logging gesendet.

Java: SDK 1.x

Logging-Limit und Drosselung

Worker-Log-Nachrichten sind auf 15.000 Nachrichten alle 30 Sekunden pro Worker beschränkt. Wenn dieses Limit erreicht wird, wird eine einzelne Worker-Lognachricht hinzugefügt, die besagt, dass das Logging gedrosselt wird:

Throttling logger worker. It used up its 30s quota for logs in only 12.345s
Es werden keine weiteren Nachrichten protokolliert, bevor das 30-Sekunden-Intervall vorbei ist. Diese Beschränkung wird durch Log-Nachrichten geteilt, die vom Apache Beam SDK und Nutzercode generiert werden.

Pipeline-Logs überwachen

Wenn Sie Ihre Pipeline mit dem Dataflow-Dienst ausführen, können Sie mit der Dataflow-Monitoring-Oberfläche Logs ansehen, die von der Pipeline ausgegeben werden.

Beispiel für ein Dataflow-Worker-Log

Die geänderte WordCount-Pipeline kann in der Cloud mit den folgenden Optionen ausgeführt werden:

Java: SDK 2.x

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--tempLocation=gs://<bucket-name>/temp
--stagingLocation=gs://<bucket-name>/binaries

Python

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--staging_location=gs://<bucket-name>/binaries

Java: SDK 1.x

Logs ansehen

Weil die WordCount-Cloud-Pipeline Blocking-Ausführung verwendet, werden Konsolennachrichten während der Pipelineausführung ausgegeben. Sobald der Job gestartet wird, wird ein Link zur Cloud Console-Seite an die Konsole ausgegeben, gefolgt von der Pipeline-Job-ID:

INFO: To access the Dataflow monitoring console, please navigate to
https://console.developers.google.com/dataflow/job/2017-04-13_13_58_10-6217777367720337669
Submitted job: 2017-04-13_13_58_10-6217777367720337669

Die Console-URL führt zur Monitoring-Oberfläche von Dataflow. Dort wird eine Zusammenfassung des übermittelten Jobs angezeigt. Links ist eine dynamische Ausführungsgrafik dargestellt und rechts sind die zusammengefassten Informationen: Klicken Sie im unteren Bereich auf , um das Logfeld zu maximieren.

Im Logbereich werden standardmäßig Joblogs angezeigt, die den Status des Jobs insgesamt angeben. Sie können die im Logbereich angezeigten Nachrichten filtern. Klicken Sie dazu auf Info und Logs filtern.

Durch Auswählen eines Pipelineschritts in der Grafik wird die Ansicht so geändert, dass sie die durch den Code generierten Logs für diesen Schritt und den generierten Code anzeigt, der in dem Pipelineschritt ausgeführt wird.

Wenn Sie wieder zu den Joblogs wechseln möchten, schließen Sie die Ansicht des Schritts. Klicken Sie dazu außerhalb der Grafik oder verwenden Sie die Schaltfläche Ausgewählten Schritt aufheben im rechten Fensterbereich.

Durch Klicken auf die Schaltfläche des externen Links im Logbereich wird Logging mit einem Menü zum Auswählen verschiedener Logtypen aufgerufen.

Logging enthält auch andere Infrastrukturlogs für eine Pipeline. Weitere Informationen zum Prüfen Ihrer Logs finden Sie in der Anleitung zum Log-Explorer.

Hier finden Sie eine Zusammenfassung der verschiedenen Logtypen, die auf der Seite Logging angezeigt werden können:

  • Job-Message-Logs enthalten Jobnachrichten, die von verschiedenen Komponenten von Dataflow generiert werden. Beispiele hierfür sind die automatische Skalierungskonfiguration beim Starten oder Herunterfahren von Workern, der Fortschritt des Jobschritts und Jobfehler. Fehler auf Worker-Ebene, die durch einen Absturz des Nutzercodes verursacht wurden und in Worker-Logs vorhanden sind, werden auch in die job-message-Logs übertragen.
  • Worker-Logs werden von Dataflow-Workern erstellt. Worker erledigen die meisten Pipelineaufgaben. Sie wenden z. B. ParDo auf Daten an. Worker-Logs enthalten von Ihrem Code und von Dataflow erfasste Nachrichten.
  • Worker-Startup-Logs sind in den meisten Dataflow-Jobs vorhanden und erfassen mit dem Startvorgang zusammenhängende Nachrichten. Dieser umfasst das Herunterladen der JAR-Dateien eines Jobs aus Cloud Storage und das anschließende Starten der Worker. Bei Problemen mit dem Worker-Start können diese Logs aufschlussreich sein.
  • Shuffler-Logs enthalten Nachrichten von Workern, die die Ergebnisse paralleler Pipelineoperationen konsolidieren.
  • Docker- und Kubelet-Logs enthalten Nachrichten, die mit diesen öffentlichen Technologien in Zusammenhang stehen und für Dataflow-Worker genutzt werden.

Log-Ebenen für Pipeline-Worker festlegen

Java: SDK 2.x

Als standardmäßige SLF4J-Log-Ebene für Worker wird vom Apache Beam SDK for Java INFO festgelegt. Daher werden alle Lognachrichten mit der Kennzeichnung INFO oder höher (INFO, WARN, ERROR) ausgegeben. Sie können eine andere Standard-Log-Ebene festlegen, um niedrigere SLF4J-Log-Ebenen (TRACE oder DEBUG) zu unterstützen, oder verschiedene Log-Ebenen für unterschiedliche Klassenpakete in Ihrem Code einrichten.

Es gibt zwei Pipelineoptionen, um Worker-Log-Ebenen über die Befehlszeile oder programmatisch festzulegen:

  • --defaultWorkerLogLevel=<level>: Verwenden Sie diese Option, um alle Logger auf die angegebene Standardebene festzulegen. Die folgende Befehlszeilenoption überschreibt beispielsweise die Standard-Log-Ebenen von Dataflow INFO und legt sie auf DEBUG:
    --defaultWorkerLogLevel=DEBUG fest.
  • --workerLogLevelOverrides={"<package or class>":"<level>"}: Verwenden Sie diese Option, um die Log-Ebene für bestimmte Pakete oder Klassen vorzugeben. Die Standard-Log-Ebene der Pipeline für das Paket com.google.cloud.dataflow können Sie beispielsweise so überschreiben und auf TRACE festlegen:
    --workerLogLevelOverrides={"com.google.cloud.dataflow":"TRACE"}
    Oder Sie überschreiben die Standard-Log-Ebene der Pipeline für die Klasse com.google.cloud.Foo und legen sie auf DEBUG fest:
    --workerLogLevelOverrides={"com.google.cloud.Foo":"DEBUG"}
    Verwenden Sie für mehrere Überschreibungen eine JSON-Zuordnung:
    (--workerLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...}).

Im folgenden Beispiel werden Pipeline-Logging-Optionen programmatisch mit Standardwerten eingerichtet, die über die Befehlszeile überschrieben werden können:

 PipelineOptions options = ...
 DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class);
 // Overrides the default log level on the worker to emit logs at TRACE or higher.
 loggingOptions.setDefaultWorkerLogLevel(Level.TRACE);
 // Overrides the Foo class and "com.google.cloud.dataflow" package to emit logs at WARN or higher.
 loggingOptions.setWorkerLogLevelOverrides(
     WorkerLogLevelOverride.forClass(Foo.class, Level.WARN),
     WorkerLogLevelOverride.forPackage(Package.getPackage("com.google.cloud.dataflow"), Level.WARN));

Python

Diese Funktion ist derzeit im Apache Beam SDK for Python noch nicht verfügbar.

Java: SDK 1.x

Log von gestarteten BigQuery-Jobs aufrufen

Wenn Sie BigQuery in Ihrer Dataflow-Pipeline verwenden, werden BigQuery-Jobs gestartet, um verschiedene Aktionen für Sie auszuführen, z. B. das Laden von Daten oder das Exportieren von Daten. Weitere Informationen zu Fehlerbehebung und Monitoring dieser BigQuery-Jobs finden Sie in der Dataflow-Monitoring-UI unter Logs.

Die BigQuery-Jobinformationen, die im Bereich Logs angezeigt werden, werden in einer BigQuery-Systemtabelle gespeichert und geladen, sodass Abrechnungskosten anfallen, wenn die zugrunde liegende BigQuery-Tabelle abgefragt wird.

Projekt einrichten

Zur Anzeige der BigQuery-Jobinformationen muss Ihre Pipeline Apache Beam 2.24.0 oder höher verwenden. Bis diese Version veröffentlicht ist, müssen Sie eine Entwicklungsversion des Apache Beam SDK verwenden, die aus dem Hauptzweig erstellt wurde.

Java: SDK 2.x

  1. Fügen Sie der Datei pom.xml für Ihr Projekt das folgende Profil hinzu:

    <profiles>
      <!-- Additional profiles listed here. -->
      <profile>
        <id>snapshot</id>
        <repositories>
          <repository>
            <id>apache.snapshots</id>
            <url>https://repository.apache.org/content/repositories/snapshots</url>
          </repository>
        </repositories>
      </profile>
    </profiles>
    
  2. Setzen Sie beim Testen oder Ausführen des Projekts die Profiloption auf den Wert id in der Datei pom.xml und setzen Sie das Attribut beam.version auf 2.24.0-SNAPSHOT oder höher. Beispiel:

    mvn test -Psnapshot -Dbeam.version=2.24.0-SNAPSHOT
    

    Weitere Snapshot-Werte finden Sie im Snapshot-Index.

Python

  1. Melden Sie sich bei GitHub an.

  2. Gehen Sie zur Ergebnisliste, in der erfolgreich abgeschlossene Apache Beam Python SDK-Builds aufgeführt werden.

  3. Klicken Sie auf einen kürzlich abgeschlossenen Job, der aus dem Hauptzweig (Masterzweig) erstellt wurde.

  4. Klicken Sie in der Seitenleiste auf Dateien in Google Cloud Storage-Bucket auflisten.

  5. Erweitern Sie im Hauptbereich die Option Datei in Google Cloud Storage-Bucket auflisten.

  6. Laden Sie die ZIP-Datei aus der Dateiliste auf einen lokalen Computer oder einen Speicherort herunter, an dem Sie Ihr Python-Projekt ausführen.

    Der Name des Cloud Storage-Buckets lautet beam-wheels-staging. Daher müssen Sie diesen beim Erstellen Ihrer Download-URL angeben. Beispiel:

    gsutil cp gs://beam-wheels-staging/master/02bf081d0e86f16395af415cebee2812620aff4b-207975627/apache-beam-2.25.0.dev0.zip <var>SAVE_TO_LOCATION</var>
    
  7. Installieren Sie die heruntergeladene ZIP-Datei.

    pip install apache-beam-2.25.0.dev0.zip
    
  8. Wenn Sie Ihre Apache Beam-Pipeline ausführen, übergeben Sie das Flag --sdk_location und verweisen auf die ZIP-Datei mit dem SDK.

    --sdk_location=apache-beam-2.25.0.dev0.zip
    

Java: SDK 1.x

BigQuery-Jobdetails aufrufen

Zum Auflisten der BigQuery-Jobs öffnen Sie den Tab BigQuery-Jobs und wählen den Speicherort der BigQuery-Jobs aus. Klicken Sie dann auf BigQuery-Jobs laden und bestätigen Sie das Dialogfeld. Nach Abschluss der Abfrage wird die Jobliste angezeigt.

Schaltfläche &quot;BigQuery-Jobs laden&quot; in der BigQuery-Tabelle mit Jobinformationen

Es werden grundlegende Informationen zu jedem Job bereitgestellt, einschließlich der Job-ID, des Typs, der Dauer usw.

Eine Tabelle zeigt die BigQuery-Jobs, die während der aktuellen Ausführung des Pipeline-Jobs ausgeführt wurden.

Weitere Informationen zu einem bestimmten Job erhalten Sie, wenn Sie in der Spalte Weitere Informationen auf Befehlszeile klicken.

Kopieren Sie im modalen Fenster für die Befehlszeile den Befehl bq jobs describe und führen Sie ihn lokal oder in Cloud Shell aus.

gcloud alpha bq jobs describe BIGQUERY_JOB_ID

Der Befehl bq jobs describe gibt JobStatistics aus. Sie erhalten damit weitere Details, die bei der Diagnose eines langsamen oder hängenden BigQuery-Jobs hilfreich sind.

Wenn Sie alternativ BigQueryIO mit einer SQL-Abfrage verwenden, wird ein Abfragejob ausgegeben. Klicken Sie in der Spalte Weitere Informationen auf Suchanfrage anzeigen, um die vom Job verwendete SQL-Abfrage zu sehen.