Logging Pipeline Messages

For additional visibilty into how your pipeline is running, you can use the Dataflow SDK's built-in logging infrastructure to log information during your pipeline's execution. You can use the Google Cloud Platform Console to monitor logging information during and after your pipeline runs.

Adding Log Messages to Your Pipeline

Java

The Dataflow SDK for Java recommends logging of worker messages through the open-source SLF4J (Simple Logging Facade for Java) library. The Dataflow SDK for Java implements the required logging infrastructure, so your Java code need only import the SLF4J API, then instantiate a Logger to enable message logging within your pipeline code.

For pre-existing code and/or libraries, the Dataflow SDK for Java sets up additional logging infrastructure when executing on the worker to capture log messages produced by the following popular logging libraries for Java:

Python

The Dataflow SDK for Python provides the logging library package to to allow code on your pipeline's workers to output log messages. To use the library functions, you'll need to import the library:

import logging

Worker Log Message Code Example

Java

The WordCount example can be modified to output a log message when the word “love” is found in a line of the processed text. The added code is indicated in bold below (surrounding code is included for context).

 package com.google.cloud.dataflow.examples;
 // Import SLF4J packages.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ...
 public class WordCount {
   ...
   static class ExtractWordsFn extends DoFn<String, String> {
     // Instantiate Logger.
     // Suggestion: As shown, specify the class name of the containing class
     // (WordCount).
     private static final Logger LOG = LoggerFactory.getLogger(WordCount.class);
     ...
     @Override
     public void processElement(ProcessContext c) {
       ...
       // Output each word encountered into the output PCollection.
       for (String word : words) {
         if (!word.isEmpty()) {
           c.output(word);
         }
         // Log INFO messages when the “love” is found.
         if(word.toLowerCase().equals("love")) {
           LOG.info("Found " + word.toLowerCase());
         }
       }
     }
   }
 ... // Remaining WordCount example code ...

Python

The wordcount.py example, located in the google.cloud.dataflow.examples package, can be modified to output a log message when the word “love” is found in a line of the processed text. The added code is indicated in bold below (surrounding code is included for context).

# import Python logging module.
import logging

class ExtractWordsFn(beam.DoFn):

  def process(self, element):
    words = re.findall(r'[A-Za-z\']+', element)
    for word in words:
      yield word

      if word.lower() == 'love':
        # Log using the root logger at info or higher levels
        logging.info('Found : %s', word.lower())

# Remaining WordCount example code ...

Java

If the modified WordCount pipeline is run locally using the default DirectPipelineRunner with the output sent to a local file (--output=./local-wordcounts), console output includes the added log messages:

INFO: Executing pipeline using the DirectPipelineRunner.
...
Feb 11, 2015 1:13:22 PM com.google.cloud.dataflow.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM com.google.cloud.dataflow.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM com.google.cloud.dataflow.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
...
INFO: Pipeline execution complete.

By default, only log lines marked INFO and higher will be sent to Cloud Logging. If you wish to change this behavior, see Setting Pipeline Worker Log Levels.

Python

If the modified WordCount pipeline is run locally using the default DirectRunner with the output sent to a local file (--output=./local-wordcounts), console output includes the added log messages:

INFO:root:Found : love
INFO:root:Found : love
INFO:root:Found : love

By default, only log lines marked INFO and higher will be sent to Cloud Logging.

Monitoring Pipeline Logs

When you run your pipeline on the Dataflow service, you can use Cloud Logging to view logs emitted by the Compute Engine instances that run your pipeline.

Note: Cloud Logging is currently in Beta.

Cloud Dataflow Worker Log Example

The modified `WordCount` pipeline can be run in the cloud with the following options:

Java

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=BlockingDataflowPipelineRunner
--stagingLocation=gs://<bucket-name>/binaries

Python

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=BlockingDataflowPipelineRunner
--staging_location=gs://<bucket-name>/binaries

Viewing Job Summary and Status

Because the WordCount cloud pipeline uses a BlockingDataflowPipelineRunner, console messages are output during pipeline execution. Once the job starts, a link to the Cloud Platform Console page is output to the console, followed by the pipeline job ID:

INFO: To access the Dataflow monitoring console, please navigate to
https://console.developers.google.com/project/project-name/dataflow/job/2015-04-02_11_02_04-5851995174035386688
Submitted job: 2015-04-02_11_02_04-5851995174035386688

The console URL leads to the Cloud Dataflow Monitoring Interface with a summary page for the submitted job. It shows a dynamic execution graph on the left, with summary information on the right:

The logs tab lists job messages that report the status of the job as a whole. You can use the Minimum severity selector to select and view job progress and status messages.

You can use the Worker Logs button in the logs tab to view worker logs for the Compute Engine instances that run your pipeline. Worker Logs consist of log lines generated by your code and the Dataflow generated code running it.

Tip: Instead of navigating to the job summary page from the URL listed in the console, you can select Big Data→Cloud Dataflow from your project page, then select the job name associated with the your job ID from the Jobs list.

Viewing Worker Logs

The Cloud Platform Console Monitoring→Logs page shows the worker logs for your pipeline. When you navigate to this page using Worker Logs from the Cloud Dataflow pipeline Logs tab, the job ID selector is automatically populated with the pipeline job ID.

Once the job ID selector is populated with a job ID, all logs for that pipeline are listed on the page. There are many log types available to view. The worker log is selected by default and contains log lines emitted by your code and the Dataflow SDK.

Here is a summary of the different log types available for viewing from the Monitoring→Logs page:

  • worker is produced by Cloud Dataflow workers. Workers do most of the pipeline work (e.g. applying your ParDos to data). worker logs contain messages logged by your code and Cloud Dataflow.
  • worker-startup logs are present on most Cloud Dataflow jobs and can capture messages related to the startup process. The startup process includes downloading a job's jars from Google Cloud Storage, then starting the workers. If there is a problem starting workers, this is a good place to look.
  • shuffler logs contain messages from workers that consolidate the results of parallel pipeline operations.
  • docker and kubelet logs contain messages related to these public technologies, which are used on Cloud Dataflow workers. keep-docker-running contains messages from a process that is working with docker.

Filtering Log Results

The worker logs view includes log messages output via the logging library from all pipeline workers. Logs are displayed according to the timestamp of each message; thus messages from different workers are interleaved.

Often you will want to filter the worker log view to messages output when a Transformation, CombineFn, or other code you’ve written processes a particular element. This is useful for diagnosing a pipeline problem to see what happened immediately before an exception was thrown. To do this, browse through the worker log list to find a log message associated with the task you wish to examine. Then copy the four-part identifier associated with that task into the filter text box at the top of the page. The four-part identifier starts with the pipeline job ID, followed by the worker ID, work ID, then a thread ID. Each part is separated by a space, and the entire string is enclosed in brackets. Here is an example of a four-part worker task identifier ([pipelinejobID workerID workID threadID]):

Note: Copy the four-part identifier within brackets (don't copy the brackets) into the filter text box, then surround the entire filter text string with quotes. The quotes are necessary to have the filter text treated as one string rather than four space-separated filter-criteria strings. Multiple filter criteria are OR’ed, and we want the four-part criteria to be AND’ed, so the surrounding quotes are necessary.

This example shows a four-part worker task identifier being used to filter the worker log messages.

Setting Pipeline Worker Log Levels

Java

The default SLF4J logging level set on workers by the Dataflow SDK for Java is INFO, so that all log messages of INFO or higher (INFO, WARN, ERROR) will be emitted. You can set a different default log level to support lower SLF4J logging levels (TRACE or DEBUG) or set different log levels for different packages of classes in your code.

Two pipeline options are provided to allow you to set worker log levels from the command line or programmatically:

  • --defaultWorkerLogLevel=<level>: use this option to set all loggers at the specified default level. For example, the following command-line option will override the default Cloud Dataflow INFO log level, and set it to DEBUG:
    --defaultWorkerLogLevel=DEBUG
  • --workerLogLevelOverrides={"<package or class>":"<level>"}: use this option to set the logging level for specified package(s) or class(es). For example, to override the default pipeline log level for the com.google.cloud.dataflow package, and set it to TRACE:
    --workerLogLevelOverrides={"com.google.cloud.dataflow":"TRACE"}
    or to override the default pipeline logging level for the com.google.cloud.Foo class, and set it to DEBUG:
    --workerLogLevelOverrides={"com.google.cloud.Foo":"DEBUG"}
    Multiple overrides can be accomplished by providing a JSON map:
    (--workerLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...}).

The following example programmatically sets pipeline logging options with default values that can be overriden from the command line:

 PipelineOptions options = ...
 DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class);
 // Overrides the default log level on the worker to emit logs at TRACE or higher.
 loggingOptions.setDefaultWorkerLogLevel(Level.TRACE);
 // Overrides the Foo class and "com.google.cloud.dataflow" package to emit logs at WARN or higher.
 loggingOptions.setWorkerLogLevelOverrides(
     WorkerLogLevelOverride.forClass(Foo.class, Level.WARN),
     WorkerLogLevelOverride.forPackage(Package.getPackage("com.google.cloud.dataflow"), Level.WARN));

Python

This feature is not yet available in the Dataflow SDK for Python.

Send feedback about...

Cloud Dataflow Documentation