Lavora con i log della pipeline

Puoi utilizzare l'infrastruttura di logging integrata dell'SDK Apache Beam per registrare durante l'esecuzione della pipeline. Puoi utilizzare lo Console Google Cloud per monitorare le informazioni di logging durante e dopo l'esecuzione della pipeline.

Aggiungi messaggi di log alla pipeline

Java

L'SDK Apache Beam per Java consiglia di registrare i messaggi worker tramite il modello open source Semplice Libreria Facade per Java (SLF4J) Logging. L'SDK Apache Beam per Java implementa l'infrastruttura di logging richiesta, in modo che il codice Java debba importare solo API SLF4J. Quindi, crea un’istanza Logger per abilitare la registrazione dei messaggi all’interno del codice della pipeline.

Per il codice e/o le librerie preesistenti, l'SDK Apache Beam per Java imposta un'ulteriore infrastruttura di logging. Messaggi di log prodotti dalle seguenti librerie di logging per Java vengono acquisite:

Python

L'SDK Apache Beam per Python fornisce il pacchetto di libreria logging, che consente ai worker della pipeline di generare messaggi di log. Per utilizzare funzioni libreria, devi importare la libreria:

import logging

Vai

L'SDK Apache Beam per Go fornisce il pacchetto di libreria log, che consente ai worker della pipeline di generare messaggi di log. Per utilizzare funzioni libreria, devi importare la libreria:

import "github.com/apache/beam/sdks/v2/go/pkg/beam/log"

Esempio di codice messaggio di log del worker

Java

L'esempio seguente utilizza SLF4J per il logging di Dataflow. Per ulteriori informazioni sulla configurazione di SLF4J per il logging di Dataflow, consulta l'articolo dei suggerimenti per Java.

Apache Beam L'esempio WordCount può essere modificato per generare un messaggio di log quando la parola "amore" si trova in una riga del testo elaborato. Il codice aggiunto è indicato in grassetto nell'esempio seguente (il codice circostante è incluso per contesto).

 package org.apache.beam.examples;
 // Import SLF4J packages.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ...
 public class WordCount {
   ...
   static class ExtractWordsFn extends DoFn<String, String> {
     // Instantiate Logger.
     // Suggestion: As shown, specify the class name of the containing class
     // (WordCount).
     private static final Logger LOG = LoggerFactory.getLogger(WordCount.class);
     ...
     @ProcessElement
     public void processElement(ProcessContext c) {
       ...
       // Output each word encountered into the output PCollection.
       for (String word : words) {
         if (!word.isEmpty()) {
           c.output(word);
         }
         // Log INFO messages when the word "love" is found.
         if(word.toLowerCase().equals("love")) {
           LOG.info("Found " + word.toLowerCase());
         }
       }
     }
   }
 ... // Remaining WordCount example code ...

Python

Apache Beam wordcount.py di esempio può essere modificato per restituire un messaggio di log quando parola "amore" si trova in una riga del testo elaborato.

# import Python logging module.
import logging

class ExtractWordsFn(beam.DoFn):
  def process(self, element):
    words = re.findall(r'[A-Za-z\']+', element)
    for word in words:
      yield word

      if word.lower() == 'love':
        # Log using the root logger at info or higher levels
        logging.info('Found : %s', word.lower())

# Remaining WordCount example code ...

Vai

Apache Beam wordcount.go di esempio può essere modificato per restituire un messaggio di log quando parola "amore" si trova in una riga del testo elaborato.

func (f *extractFn) ProcessElement(ctx context.Context, line string, emit func(string)) {
    for _, word := range wordRE.FindAllString(line, -1) {
        // increment the counter for small words if length of words is
        // less than small_word_length
        if strings.ToLower(word) == "love" {
            log.Infof(ctx, "Found : %s", strings.ToLower(word))
        }

        emit(word)
    }
}

// Remaining Wordcount example

Java

Se la pipeline WordCount modificata viene eseguita localmente utilizzando il valore predefinito DirectRunner con l'output inviato a un file locale (--output=./local-wordcounts), output della console include i messaggi di log aggiunti:

INFO: Executing pipeline using the DirectRunner.
...
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
...
INFO: Pipeline execution complete.

Per impostazione predefinita, solo le righe di log contrassegnate con INFO o superiore verranno inviate a Cloud Logging. Se modificare questo comportamento, vedi Impostazione dei livelli di log del worker della pipeline.

Python

Se la pipeline WordCount modificata viene eseguita localmente utilizzando il valore predefinito DirectRunner con l'output inviato a un file locale (--output=./local-wordcounts), output della console include i messaggi di log aggiunti:

INFO:root:Found : love
INFO:root:Found : love
INFO:root:Found : love

Per impostazione predefinita, solo le righe di log contrassegnate con INFO o superiore verranno inviate a Cloud Logging.

Vai

Se la pipeline WordCount modificata viene eseguita localmente utilizzando il valore predefinito DirectRunner con l'output inviato a un file locale (--output=./local-wordcounts), output della console include i messaggi di log aggiunti:

2022/05/26 11:36:44 Found : love
2022/05/26 11:36:44 Found : love
2022/05/26 11:36:44 Found : love

Per impostazione predefinita, solo le righe di log contrassegnate con INFO o superiore verranno inviate a Cloud Logging.

Regolare il volume dei log

Potresti anche ridurre il volume dei log generati modificando i livelli di log della pipeline. Se non vuoi continuare a importare alcuni o tutti i tuoi log Dataflow, aggiungi un'esclusione di Logging per escludere i log di Dataflow. Quindi, esporta i log in una destinazione diversa, come BigQuery, Cloud Storage, o Pub/Sub. Per ulteriori informazioni, vedi Controllare l'importazione dei log di Dataflow.

Limite di logging e limitazione

I messaggi di log dei worker sono limitati a 15.000 messaggi ogni 30 secondi per worker. Se viene raggiunto questo limite, viene aggiunto un singolo messaggio di log del worker che comunica che il logging è limitato:

Throttling logger worker. It used up its 30s quota for logs in only 12.345s
Non vengono registrati altri messaggi fino al L'intervallo di 30 secondi è terminato. Questo limite viene condiviso dai messaggi di log generati dall'SDK Apache Beam. codice utente.

Archiviazione e conservazione dei log

I log operativi vengono archiviati nel bucket di log _Default. Il nome del servizio API di logging è dataflow.googleapis.com. Per ulteriori informazioni i tipi di risorse e i servizi monitorati da Google Cloud utilizzati in Cloud Logging, consulta Risorse e servizi monitorati.

Per maggiori dettagli su per quanto tempo le voci di log vengono conservate da Logging: vedere le informazioni sulla conservazione Quote e limiti: periodi di conservazione dei log.

Per informazioni sulla visualizzazione dei log operativi, consulta Monitorare e visualizzare i log della pipeline.

Monitora e visualizza i log della pipeline

Quando esegui la pipeline Servizio Dataflow, puoi utilizzare Dataflow interfaccia di monitoraggio per visualizzare i log emessi dalla pipeline.

Esempio di log worker Dataflow

La pipeline WordCount modificata può essere eseguita nel cloud con le seguenti opzioni:

Java

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--tempLocation=gs://<bucket-name>/temp
--stagingLocation=gs://<bucket-name>/binaries

Python

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--staging_location=gs://<bucket-name>/binaries

Vai

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--staging_location=gs://<bucket-name>/binaries

Visualizza i log

Poiché la pipeline cloud WordCount utilizza il blocco dell'esecuzione, i messaggi della console vengono generati durante l'esecuzione della pipeline. Dopo l'avvio del job, viene fornito un link La pagina della console Google Cloud viene visualizzata nella console, seguita dalla pipeline ID job:

INFO: To access the Dataflow monitoring console, please navigate to
https://console.developers.google.com/dataflow/job/2017-04-13_13_58_10-6217777367720337669
Submitted job: 2017-04-13_13_58_10-6217777367720337669

L'URL della console indirizza interfaccia di monitoraggio con un pagina di riepilogo per il job inviato. Mostra un grafico di esecuzione dinamico a sinistra, con le informazioni di riepilogo a destra. Fai clic su nel riquadro inferiore per espandere il riquadro dei log.

Per impostazione predefinita, il riquadro dei log mostra i log dei job che segnalano lo stato sul lavoro nel suo complesso. Puoi filtrare i messaggi visualizzati nel riquadro dei log in base a facendo clic su Informazioni e Filtra i log.

Se selezioni un passaggio della pipeline nel grafico, la vista cambia in Log dei passaggi generato dal codice e dal codice generato in esecuzione nel passaggio della pipeline.

Per tornare a Log dei job, cancella il passaggio facendo clic all'esterno del grafico oppure utilizzando il pulsante Deseleziona passaggio nel riquadro laterale a destra.

Vai a Esplora log

Per aprire Esplora log e selezionare diversi tipi di log, nel riquadro dei log, Fai clic su Visualizza in Esplora log (il pulsante del link esterno).

In Esplora log, per vedere il riquadro con diversi tipi di log, fai clic sull'opzione di attivazione/disattivazione Campi log.

Nella pagina Esplora log, la query potrebbe filtrare i log in base al passaggio del job o per tipo di log. Per rimuovere i filtri, fai clic sul pulsante di attivazione/disattivazione Mostra query e modifica la query.

Per visualizzare tutti i log disponibili per un job, segui questi passaggi:

  1. Nel campo Query, inserisci la seguente query:

    resource.type="dataflow_step"
    resource.labels.job_id="JOB_ID"
    

    Sostituisci JOB_ID con l'ID del job.

  2. Fai clic su Esegui query.

  3. Se utilizzi questa query e non vedi i log per il tuo job, fai clic su Modifica ora.

  4. Modifica i valori di inizio e fine, poi fai clic su Applica.

Tipi di log

Esplora log include anche i log dell'infrastruttura una pipeline o un blocco note personalizzato.

Ecco un riepilogo dei diversi tipi di log disponibili per la visualizzazione nel Pagina Esplora log:

  • I log job-message contengono messaggi a livello di job che vari componenti generate da Dataflow. Gli esempi includono la scalabilità automatica dell'avvio o dell'arresto dei worker, l'avanzamento del passaggio del job e gli errori dei job. Errori a livello di worker che hanno origine dall'arresto anomalo del codice utente presenti nei log di worker si propagano anche fino job-message.
  • I log worker vengono prodotti dai worker Dataflow. I lavoratori sanno gran parte del lavoro della pipeline (ad esempio l'applicazione di ParDo ai dati). I log dei lavoratori contengono messaggi registrati dal tuo codice e e Dataflow.
  • I log worker-startup sono presenti nella maggior parte dei job Dataflow può acquisire messaggi relativi al processo di avvio. Il processo di avvio include il download dei jar del job da Cloud Storage, quindi l'avvio worker. Se si verifica un problema durante l'avvio dei worker, questi log sono una buona posizione. per dare un'occhiata.
  • I log shuffler contengono messaggi dei worker che consolidano i risultati delle operazioni parallele della pipeline.
  • I log di system contengono messaggi dai sistemi operativi host delle VM worker. In alcuni scenari, potrebbero acquisire di arresti anomali dell'elaborazione o eventi di esaurimento della memoria.
  • docker e I log di kubelet contengono messaggi relativi a queste tecnologie pubbliche, che vengono utilizzati Worker Dataflow.
  • I log nvidia-mps contengono messaggi su Operazioni di NVIDIA Multi-Process Service (MPS).

Imposta livelli di log del worker della pipeline

Java

Il livello di logging SLF4J predefinito impostato sui worker dall'SDK Apache Beam per Java è INFO. Tutti i messaggi di log di INFO o superiore (INFO, WARN, ERROR). Puoi impostare un livello di log predefinito diverso per supportare livelli di logging SLF4J più bassi (TRACE o DEBUG) o impostare diversi di log per i diversi pacchetti di classi nel codice.

Vengono fornite le seguenti opzioni di pipeline per consentirti di impostare i livelli di log dei worker dalla riga di comando in modo programmatico:

  • --defaultSdkHarnessLogLevel=<level>: utilizza questa opzione per impostare tutti i logger nella al livello predefinito specificato. Ad esempio, la seguente opzione della riga di comando sostituirà la il livello di log predefinito di Dataflow INFO e impostalo su DEBUG:
    --defaultSdkHarnessLogLevel=DEBUG
  • --sdkHarnessLogLevelOverrides={"<package or class>":"<level>"}: usa questa opzione per impostare il livello di logging per pacchetti o classi specifici. Ad esempio, per eseguire l'override livello di log predefinito della pipeline per il pacchetto org.apache.beam.runners.dataflow, e impostala su TRACE:
    --sdkHarnessLogLevelOverrides='{"org.apache.beam.runners.dataflow":"TRACE"}'
    Per eseguire più override, fornisci una mappa JSON:
    (--sdkHarnessLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...}).
  • Le opzioni della pipeline defaultSdkHarnessLogLevel e sdkHarnessLogLevelOverrides non sono supportato con pipeline che usano Apache Beam SDK versioni 2.50.0 e precedenti senza Runner v2. In questo caso, utilizza --defaultWorkerLogLevel=<level> e --workerLogLevelOverrides={"<package or class>":"<level>"} le opzioni della pipeline. Per eseguire più override, fornisci una mappa JSON:
    (--workerLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...})

L'esempio seguente imposta in modo programmatico le opzioni di logging della pipeline con valori predefiniti che possono essere sostituite dalla riga di comando:

 PipelineOptions options = ...
 SdkHarnessOptions loggingOptions = options.as(SdkHarnessOptions.class);
 // Overrides the default log level on the worker to emit logs at TRACE or higher.
 loggingOptions.setDefaultSdkHarnessLogLevel(LogLevel.TRACE);
 // Overrides the Foo class and "org.apache.beam.runners.dataflow" package to emit logs at WARN or higher.
 loggingOptions.getSdkHarnessLogLevelOverrides()
     .addOverrideForClass(Foo.class, LogLevel.WARN)
     .addOverrideForPackage(Package.getPackage("org.apache.beam.runners.dataflow"), LogLevel.WARN);

Python

Il livello di logging predefinito impostato sui worker dall'SDK Apache Beam per Python è INFO. Tutti i messaggi di log di INFO o superiore (INFO, WARNING, ERROR, CRITICAL). Puoi impostare un livello di log predefinito diverso per supportare livelli di logging più bassi (DEBUG) o impostare livelli di log diversi per moduli diversi nel codice.

Sono disponibili due opzioni della pipeline per consentirti di impostare i livelli di log dei worker dalla riga di comando in modo programmatico:

  • --default_sdk_harness_log_level=<level>: utilizza questa opzione per impostare tutti i logger nella al livello predefinito specificato. Ad esempio, la seguente opzione della riga di comando esegue l'override il livello di log predefinito di Dataflow INFO e lo imposta su DEBUG:
    --default_sdk_harness_log_level=DEBUG
  • --sdk_harness_log_level_overrides={\"<module>\":\"<level>\"}: usa questa opzione per impostare il livello di logging per i moduli specificati. Ad esempio, per eseguire l'override livello di log predefinito della pipeline per il modulo apache_beam.runners.dataflow, e impostala su DEBUG:
    --sdk_harness_log_level_overrides={\"apache_beam.runners.dataflow\":\"DEBUG\"}
    Per eseguire più override, fornisci una mappa JSON:
    (--sdk_harness_log_level_overrides={\"<module>\":\"<level>\",\"<module>\":\"<level>\",...}).

L'esempio seguente utilizza il metodo WorkerOptions per impostare in modo programmatico le opzioni di logging della pipeline che possono essere sostituite dalla riga di comando:

  from apache_beam.options.pipeline_options import PipelineOptions, WorkerOptions

  pipeline_args = [
    '--project=PROJECT_NAME',
    '--job_name=JOB_NAME',
    '--staging_location=gs://STORAGE_BUCKET/staging/',
    '--temp_location=gs://STORAGE_BUCKET/tmp/',
    '--region=DATAFLOW_REGION',
    '--runner=DataflowRunner'
  ]

  pipeline_options = PipelineOptions(pipeline_args)
  worker_options = pipeline_options.view_as(WorkerOptions)
  worker_options.default_sdk_harness_log_level = 'WARNING'

  # Note: In Apache Beam SDK 2.42.0 and earlier versions, use ['{"apache_beam.runners.dataflow":"WARNING"}']
  worker_options.sdk_harness_log_level_overrides = {"apache_beam.runners.dataflow":"WARNING"}

  # Pass in pipeline options during pipeline creation.
  with beam.Pipeline(options=pipeline_options) as pipeline:

Sostituisci quanto segue:

  • PROJECT_NAME: il nome del progetto
  • JOB_NAME: il nome del lavoro
  • STORAGE_BUCKET: il nome di Cloud Storage
  • DATAFLOW_REGION: il valore regione in cui vuoi eseguire il deployment del job Dataflow

    Il flag --region esegue l'override della regione predefinita impostata nel server dei metadati, nel client locale o nelle variabili di ambiente.

Vai

Questa funzionalità non è disponibile nell'SDK Apache Beam per Go.

Visualizza il log dei job BigQuery avviati

Quando usi BigQuery nella tua pipeline Dataflow, I job BigQuery sono eseguire varie azioni per tuo conto. Queste azioni possono includere caricamento dei dati, esportazione dei dati e così via. Per la risoluzione dei problemi e il monitoraggio, l'interfaccia di monitoraggio di Dataflow dispone di informazioni aggiuntive su questi job BigQuery disponibili nel riquadro Log.

Le informazioni sui job BigQuery visualizzate nel riquadro Log vengono archiviate e caricate da una tabella di sistema BigQuery. Quando viene eseguita una query sulla tabella BigQuery sottostante, viene addebitato un costo di fatturazione.

Visualizza i dettagli del job BigQuery

Per visualizzare le informazioni sui job BigQuery, la pipeline deve utilizzare Apache Beam 2.24.0 o versioni successive.

Per elencare i job BigQuery, apri la scheda Job BigQuery e seleziona la posizione dei job BigQuery. Quindi, fai clic su Carica job BigQuery e conferma la finestra di dialogo. Al termine della query, viene visualizzato l'elenco dei job.

Pulsante Carica job BigQuery nella tabella di informazioni sui job BigQuery

Vengono fornite le informazioni di base su ogni job, tra cui ID, tipo, durata e così via.

Una tabella che mostra i job BigQuery eseguiti durante l&#39;esecuzione attuale del job della pipeline.

Per informazioni più dettagliate su un job specifico, fai clic su Riga di comando nella colonna Ulteriori informazioni.

Nella finestra modale della riga di comando, copia il comando bq job describe ed eseguilo localmente o in Cloud Shell.

gcloud alpha bq jobs describe BIGQUERY_JOB_ID

Il comando bq jobs describe restituisce JobStatistics, che forniscono ulteriori dettagli utili per diagnosticare un job BigQuery lento o bloccato.

In alternativa, quando utilizzi BigQueryIO con una query SQL, viene eseguito un job di query. Per vedere la query SQL utilizzata dal job, Fai clic su Visualizza query nella colonna Ulteriori informazioni.