Utilizza i log della pipeline

Puoi utilizzare l'infrastruttura di logging integrata dell'SDK Apache Beam per registrare le informazioni durante l'esecuzione della pipeline. Puoi utilizzare la console Google Cloud per monitorare le informazioni di logging durante e dopo l'esecuzione della pipeline.

Aggiungi messaggi di log alla pipeline

Java

L'SDK Apache Beam per Java consiglia di registrare i messaggi worker tramite la libreria open source Simple Logging Facade for Java (SLF4J). L'SDK Apache Beam per Java implementa l'infrastruttura di logging richiesta in modo che il codice Java debba importare solo l'API SLF4J. Quindi crea un'istanza di un Logger per abilitare il logging dei messaggi all'interno del codice della pipeline.

Per il codice e/o le librerie preesistenti, l'SDK Apache Beam per Java configura un'infrastruttura di logging aggiuntiva. I messaggi di log generati dalle seguenti librerie di logging per Java vengono acquisiti:

Python

L'SDK Apache Beam per Python fornisce il pacchetto della libreria logging, che consente ai worker della pipeline di restituire i messaggi di log. Per utilizzare le funzioni di libreria, devi importare la libreria:

import logging

Go

L'SDK Apache Beam per Go fornisce il pacchetto della libreria log, che consente ai worker della pipeline di restituire i messaggi di log. Per utilizzare le funzioni di libreria, devi importare la libreria:

import "github.com/apache/beam/sdks/v2/go/pkg/beam/log"

Esempio di codice del messaggio di log del worker

Java

L'esempio seguente utilizza SLF4J per il logging di Dataflow. Per saperne di più sulla configurazione di SLF4J per il logging di Dataflow, consulta l'articolo Suggerimenti per Java.

L'esempio WordCount di Apache Beam può essere modificato in modo da generare un messaggio di log quando la parola "love" viene trovata in una riga del testo elaborato. Il codice aggiunto è indicato in grassetto nell'esempio seguente (il codice di delimitazione è incluso per il contesto).

 package org.apache.beam.examples;
 // Import SLF4J packages.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ...
 public class WordCount {
   ...
   static class ExtractWordsFn extends DoFn<String, String> {
     // Instantiate Logger.
     // Suggestion: As shown, specify the class name of the containing class
     // (WordCount).
     private static final Logger LOG = LoggerFactory.getLogger(WordCount.class);
     ...
     @ProcessElement
     public void processElement(ProcessContext c) {
       ...
       // Output each word encountered into the output PCollection.
       for (String word : words) {
         if (!word.isEmpty()) {
           c.output(word);
         }
         // Log INFO messages when the word "love" is found.
         if(word.toLowerCase().equals("love")) {
           LOG.info("Found " + word.toLowerCase());
         }
       }
     }
   }
 ... // Remaining WordCount example code ...

Python

L'esempio wordcount.py di Apache Beam può essere modificato in modo da generare un messaggio di log quando viene trovata la parola "love" in una riga del testo elaborato.

# import Python logging module.
import logging

class ExtractWordsFn(beam.DoFn):
  def process(self, element):
    words = re.findall(r'[A-Za-z\']+', element)
    for word in words:
      yield word

      if word.lower() == 'love':
        # Log using the root logger at info or higher levels
        logging.info('Found : %s', word.lower())

# Remaining WordCount example code ...

Go

L'esempio wordcount.go di Apache Beam può essere modificato in modo da generare un messaggio di log quando viene trovata la parola "love" in una riga del testo elaborato.

func (f *extractFn) ProcessElement(ctx context.Context, line string, emit func(string)) {
    for _, word := range wordRE.FindAllString(line, -1) {
        // increment the counter for small words if length of words is
        // less than small_word_length
        if strings.ToLower(word) == "love" {
            log.Infof(ctx, "Found : %s", strings.ToLower(word))
        }

        emit(word)
    }
}

// Remaining Wordcount example

Java

Se la pipeline WordCount modificata viene eseguita localmente utilizzando DirectRunner predefinito con l'output inviato a un file locale (--output=./local-wordcounts), l'output della console include i messaggi di log aggiunti:

INFO: Executing pipeline using the DirectRunner.
...
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
...
INFO: Pipeline execution complete.

Per impostazione predefinita, solo le righe di log contrassegnate con INFO e successive verranno inviate a Cloud Logging. Se vuoi modificare questo comportamento, consulta Impostare i livelli di log del worker di pipeline.

Python

Se la pipeline WordCount modificata viene eseguita localmente utilizzando DirectRunner predefinito con l'output inviato a un file locale (--output=./local-wordcounts), l'output della console include i messaggi di log aggiunti:

INFO:root:Found : love
INFO:root:Found : love
INFO:root:Found : love

Per impostazione predefinita, solo le righe di log contrassegnate con INFO e successive verranno inviate a Cloud Logging.

Go

Se la pipeline WordCount modificata viene eseguita localmente utilizzando DirectRunner predefinito con l'output inviato a un file locale (--output=./local-wordcounts), l'output della console include i messaggi di log aggiunti:

2022/05/26 11:36:44 Found : love
2022/05/26 11:36:44 Found : love
2022/05/26 11:36:44 Found : love

Per impostazione predefinita, solo le righe di log contrassegnate con INFO e successive verranno inviate a Cloud Logging.

Regolare il volume log

Potresti anche ridurre il volume di log generati modificando i livelli di log della pipeline. Se non vuoi continuare a importare alcuni o tutti i log di Dataflow, aggiungi un'esclusione di Logging per escludere i log di Dataflow. Quindi, esporta i log in una destinazione diversa, come BigQuery, Cloud Storage o Pub/Sub. Per ulteriori informazioni, consulta Controllare l'importazione dei log di Dataflow.

Limite di logging e limitazione

I messaggi di log del worker sono limitati a 15.000 messaggi ogni 30 secondi per worker. Se questo limite viene raggiunto, viene aggiunto un singolo messaggio di log worker che indica che il logging è limitato:

Throttling logger worker. It used up its 30s quota for logs in only 12.345s
Non vengono registrati altri messaggi fino al termine dell'intervallo di 30 secondi. Questo limite è condiviso dai messaggi di log generati dall'SDK Apache Beam e dal codice utente.

Archiviazione e conservazione dei log

I log operativi vengono archiviati nel bucket di log _Default. Il nome del servizio API di logging è dataflow.googleapis.com. Per saperne di più sui tipi di risorse e sui servizi monitorati di Google Cloud utilizzati in Cloud Logging, consulta Risorse e servizi monitorati.

Per maggiori dettagli sul periodo di tempo in cui le voci di log vengono conservate da Logging, consulta le informazioni sulla conservazione in Quote e limiti: periodi di conservazione dei log.

Per informazioni sulla visualizzazione dei log operativi, consulta Monitorare e visualizzare i log della pipeline.

Monitora e visualizza i log della pipeline

Quando esegui la pipeline sul servizio Dataflow, puoi utilizzare l'interfaccia di monitoraggio di Dataflow per visualizzare i log emessi dalla pipeline.

Esempio di log del worker Dataflow

La pipeline WordCount modificata può essere eseguita nel cloud con le seguenti opzioni:

Java

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--tempLocation=gs://<bucket-name>/temp
--stagingLocation=gs://<bucket-name>/binaries

Python

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--staging_location=gs://<bucket-name>/binaries

Go

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--staging_location=gs://<bucket-name>/binaries

Visualizza i log

Poiché la pipeline cloud WordCount utilizza l'esecuzione di blocco, i messaggi della console vengono generati durante l'esecuzione della pipeline. Dopo l'avvio del job, nella console viene generato un link alla pagina della console Google Cloud, seguito dall'ID job della pipeline:

INFO: To access the Dataflow monitoring console, please navigate to
https://console.developers.google.com/dataflow/job/2017-04-13_13_58_10-6217777367720337669
Submitted job: 2017-04-13_13_58_10-6217777367720337669

L'URL della console porta all'interfaccia di monitoraggio di Dataflow con una pagina di riepilogo per il job inviato. Mostra un grafico di esecuzione dinamica a sinistra, con informazioni di riepilogo a destra. Fai clic su nel riquadro in basso per espandere quello dei log.

Per impostazione predefinita, il riquadro dei log mostra i log del job che segnalano lo stato del job nel suo complesso. Per filtrare i messaggi visualizzati nel riquadro dei log, fai clic su Informazioni e Filtra log.

Se selezioni un passaggio della pipeline nel grafico, la visualizzazione cambia in Log dei passaggi generati dal codice e dal codice generato in esecuzione nel passaggio della pipeline.

Per tornare a Log del job, cancella il passaggio facendo clic all'esterno del grafico o utilizzando il pulsante Deseleziona passaggio nel riquadro laterale a destra.

Se fai clic sul pulsante del link esterno dal riquadro dei log, accedi a Logging con un menu che consente di selezionare diversi tipi di log.

Logging include anche altri log dell'infrastruttura per la pipeline. Per ulteriori dettagli su come esplorare i log, consulta la guida Esplora log.

Tipi di log

Ecco un riepilogo dei diversi tipi di log disponibili per la visualizzazione dalla pagina Logging:

  • I log job-message contengono messaggi a livello di job generati da vari componenti di Dataflow. Alcuni esempi includono la configurazione della scalabilità automatica, l'avvio o l'arresto dei worker, l'avanzamento nel passaggio del job e gli errori del job. Gli errori a livello di worker derivanti dall'arresto anomalo del codice utente e presenti nei log worker si propagano anche ai log di job-message.
  • I log worker vengono generati dai worker di Dataflow. I worker svolgono la maggior parte del lavoro della pipeline (ad esempio, applicano i ParDo ai dati). I log Worker contengono messaggi registrati dal tuo codice e da Dataflow.
  • I log worker-startup sono presenti nella maggior parte dei job Dataflow e possono acquisire messaggi relativi al processo di avvio. Il processo di avvio include il download dei jar del job da Cloud Storage, quindi l'avvio dei worker. Se si verifica un problema durante l'avvio dei worker, questi log sono un ottimo punto di riferimento.
  • I log di shuffler contengono i messaggi dei worker che consolidano i risultati delle operazioni delle pipeline parallele.
  • I log docker e kubelet contengono messaggi relativi a queste tecnologie pubbliche, utilizzati sui worker Dataflow.
  • I log nvidia-mps contengono messaggi sulle operazioni di NVIDIA Multi-Process Service (MPS).

Imposta livelli di log dei worker della pipeline

Java

Il livello di logging SLF4J predefinito impostato sui worker dall'SDK Apache Beam per Java è INFO. Tutti i messaggi di log di tipo INFO o superiore (INFO, WARN, ERROR) verranno emessi. Puoi impostare un livello di log predefinito diverso per supportare livelli di logging SLF4J inferiori (TRACE o DEBUG) o impostare livelli di log diversi per pacchetti di classi diversi nel codice.

Le seguenti opzioni della pipeline consentono di impostare i livelli di log dei worker dalla riga di comando o in modo programmatico:

  • --defaultSdkHarnessLogLevel=<level>: utilizza questa opzione per impostare tutti i logger al livello predefinito specificato. Ad esempio, la seguente opzione della riga di comando sostituirà il livello di log predefinito di Dataflow INFO e lo imposterà su DEBUG:
    --defaultSdkHarnessLogLevel=DEBUG
  • --sdkHarnessLogLevelOverrides={"<package or class>":"<level>"}: utilizza questa opzione per impostare il livello di logging per pacchetti o classi specificati. Ad esempio, per eseguire l'override del livello predefinito di log della pipeline per il pacchetto org.apache.beam.runners.dataflow e impostarlo su TRACE:
    --sdkHarnessLogLevelOverrides='{"org.apache.beam.runners.dataflow":"TRACE"}'
    Per eseguire più override, fornisci una mappa JSON:
    (--sdkHarnessLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...}).
  • Le opzioni delle pipeline defaultSdkHarnessLogLevel e sdkHarnessLogLevelOverrides non sono supportate con le pipeline che utilizzano l'SDK Apache Beam 2.50.0 e versioni precedenti senza Runner v2. In questo caso, utilizza le opzioni della pipeline --defaultWorkerLogLevel=<level> e --workerLogLevelOverrides={"<package or class>":"<level>"}. Per eseguire più override, fornisci una mappa JSON:
    (--workerLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...})

L'esempio seguente imposta in modo programmatico le opzioni di logging della pipeline con valori predefiniti che possono essere sostituiti dalla riga di comando:

 PipelineOptions options = ...
 SdkHarnessOptions loggingOptions = options.as(SdkHarnessOptions.class);
 // Overrides the default log level on the worker to emit logs at TRACE or higher.
 loggingOptions.setDefaultSdkHarnessLogLevel(LogLevel.TRACE);
 // Overrides the Foo class and "org.apache.beam.runners.dataflow" package to emit logs at WARN or higher.
 loggingOptions.getSdkHarnessLogLevelOverrides()
     .addOverrideForClass(Foo.class, LogLevel.WARN)
     .addOverrideForPackage(Package.getPackage("org.apache.beam.runners.dataflow"), LogLevel.WARN);

Python

Il livello di logging predefinito impostato sui worker dall'SDK Apache Beam per Python è INFO. Tutti i messaggi di log di tipo INFO o superiore (INFO, WARNING, ERROR, CRITICAL) verranno emessi. Puoi impostare un livello di log predefinito diverso per supportare livelli di logging inferiori (DEBUG) o impostare livelli di log diversi per moduli diversi nel codice.

Sono disponibili due opzioni della pipeline per consentirti di impostare i livelli di log dei worker dalla riga di comando o in modo programmatico:

  • --default_sdk_harness_log_level=<level>: utilizza questa opzione per impostare tutti i logger al livello predefinito specificato. Ad esempio, la seguente opzione della riga di comando sostituisce il livello di log predefinito di Dataflow INFO e lo imposta su DEBUG:
    --default_sdk_harness_log_level=DEBUG
  • --sdk_harness_log_level_overrides={\"<module>\":\"<level>\"}: utilizza questa opzione per impostare il livello di logging per i moduli specificati. Ad esempio, per eseguire l'override del livello predefinito di log della pipeline per il modulo apache_beam.runners.dataflow e impostarlo su DEBUG:
    --sdk_harness_log_level_overrides={\"apache_beam.runners.dataflow\":\"DEBUG\"}
    Per eseguire più override, fornisci una mappa JSON:
    (--sdk_harness_log_level_overrides={\"<module>\":\"<level>\",\"<module>\":\"<level>\",...}).

L'esempio seguente utilizza la classe WorkerOptions per impostare in modo programmatico le opzioni di logging della pipeline che possono essere sostituite dalla riga di comando:

  from apache_beam.options.pipeline_options import PipelineOptions, WorkerOptions

  pipeline_args = [
    '--project=PROJECT_NAME',
    '--job_name=JOB_NAME',
    '--staging_location=gs://STORAGE_BUCKET/staging/',
    '--temp_location=gs://STORAGE_BUCKET/tmp/',
    '--region=DATAFLOW_REGION',
    '--runner=DataflowRunner'
  ]

  pipeline_options = PipelineOptions(pipeline_args)
  worker_options = pipeline_options.view_as(WorkerOptions)
  worker_options.default_sdk_harness_log_level = 'WARNING'

  # Note: In Apache Beam SDK 2.42.0 and earlier versions, use ['{"apache_beam.runners.dataflow":"WARNING"}']
  worker_options.sdk_harness_log_level_overrides = {"apache_beam.runners.dataflow":"WARNING"}

  # Pass in pipeline options during pipeline creation.
  with beam.Pipeline(options=pipeline_options) as pipeline:

Sostituisci quanto segue:

  • PROJECT_NAME: il nome del progetto
  • JOB_NAME: il nome del job
  • STORAGE_BUCKET: il nome di Cloud Storage
  • DATAFLOW_REGION: la regione in cui vuoi eseguire il deployment del job Dataflow

    Il flag --region sostituisce la regione predefinita impostata nel server di metadati, nel client locale o nelle variabili di ambiente.

Go

Questa funzionalità non è disponibile nell'SDK Apache Beam per Go.

Visualizza il log dei job BigQuery avviati

Quando utilizzi BigQuery nella tua pipeline Dataflow, vengono avviati i job BigQuery in grado di eseguire varie azioni per tuo conto. Queste azioni possono includere il caricamento, l'esportazione dei dati e così via. Per la risoluzione dei problemi e il monitoraggio, l'interfaccia di monitoraggio di Dataflow offre ulteriori informazioni su questi job BigQuery disponibili nel riquadro Log.

Le informazioni sui job BigQuery visualizzate nel riquadro Log vengono archiviate e caricate da una tabella di sistema BigQuery. Viene addebitato un costo di fatturazione quando viene eseguita una query sulla tabella BigQuery sottostante.

Visualizzare i dettagli del job BigQuery

Per visualizzare le informazioni sui job BigQuery, la pipeline deve utilizzare Apache Beam 2.24.0 o versioni successive.

Per elencare i job BigQuery, apri la scheda Job BigQuery e seleziona la posizione dei job BigQuery. Quindi, fai clic su Carica job BigQuery e conferma la finestra di dialogo. Al termine della query, viene visualizzato l'elenco dei job.

Il pulsante Carica job BigQuery nella tabella delle informazioni dei job BigQuery

Vengono fornite informazioni di base su ogni job, tra cui ID job, tipo, durata e così via.

Una tabella che mostra i job BigQuery eseguiti durante l'esecuzione attuale del job della pipeline.

Per informazioni più dettagliate su un job specifico, fai clic su Riga di comando nella colonna Ulteriori informazioni.

Nella finestra modale della riga di comando, copia il comando bq Job describe ed eseguilo localmente o in Cloud Shell.

gcloud alpha bq jobs describe BIGQUERY_JOB_ID

Il comando bq jobs describe restituisce JobStatistics, che fornisce ulteriori dettagli utili per la diagnosi di un job BigQuery lento o bloccato.

In alternativa, quando utilizzi BigQueryIO con una query SQL, viene eseguito un job di query. Per visualizzare la query SQL utilizzata dal job, fai clic su Visualizza query nella colonna Ulteriori informazioni.