Bekerja dengan log pipeline

Anda dapat menggunakan infrastruktur logging bawaan Apache Beam SDK untuk mencatat informasi ke dalam log saat menjalankan pipeline. Anda dapat menggunakan Konsol Google Cloud untuk memantau informasi logging selama dan setelah pipeline berjalan.

Menambahkan pesan log ke pipeline Anda

Java

Apache Beam SDK untuk Java merekomendasikan agar Anda mencatat pesan pekerja ke dalam log melalui library Simple Logging Facade for Java (SLF4J) open source. Apache Beam SDK untuk Java mengimplementasikan infrastruktur logging yang diperlukan sehingga kode Java Anda hanya perlu mengimpor SLF4J API. Kemudian, akan dibuat instance Logger untuk mengaktifkan logging pesan dalam kode pipeline Anda.

Untuk kode dan/atau library yang sudah ada, Apache Beam SDK untuk Java akan menyiapkan infrastruktur logging tambahan. Pesan log yang dihasilkan oleh library logging untuk Java berikut ditangkap:

Python

Apache Beam SDK untuk Python menyediakan paket library logging, yang memungkinkan pekerja pipeline menghasilkan pesan log. Untuk menggunakan fungsi library, Anda harus mengimpor library:

import logging

Go

Apache Beam SDK untuk Go menyediakan paket library log, yang memungkinkan pekerja pipeline menghasilkan pesan log. Untuk menggunakan fungsi library, Anda harus mengimpor library:

import "github.com/apache/beam/sdks/v2/go/pkg/beam/log"

Contoh kode pesan log pekerja

Java

Contoh berikut menggunakan SLF4J untuk logging Dataflow. Untuk mempelajari lebih lanjut cara mengonfigurasi SLF4J untuk logging Dataflow, lihat artikel Tips Java.

Contoh WordCount Apache Beam dapat diubah untuk menghasilkan pesan log jika kata "love" ditemukan dalam baris teks yang diproses. Kode yang ditambahkan ditunjukkan dengan huruf tebal dalam contoh berikut (kode di sekitarnya disertakan untuk konteks).

 package org.apache.beam.examples;
 // Import SLF4J packages.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ...
 public class WordCount {
   ...
   static class ExtractWordsFn extends DoFn<String, String> {
     // Instantiate Logger.
     // Suggestion: As shown, specify the class name of the containing class
     // (WordCount).
     private static final Logger LOG = LoggerFactory.getLogger(WordCount.class);
     ...
     @ProcessElement
     public void processElement(ProcessContext c) {
       ...
       // Output each word encountered into the output PCollection.
       for (String word : words) {
         if (!word.isEmpty()) {
           c.output(word);
         }
         // Log INFO messages when the word "love" is found.
         if(word.toLowerCase().equals("love")) {
           LOG.info("Found " + word.toLowerCase());
         }
       }
     }
   }
 ... // Remaining WordCount example code ...

Python

Contoh wordcount.py Apache Beam dapat diubah untuk menghasilkan pesan log saat kata "love" ditemukan dalam baris teks yang diproses.

# import Python logging module.
import logging

class ExtractWordsFn(beam.DoFn):
  def process(self, element):
    words = re.findall(r'[A-Za-z\']+', element)
    for word in words:
      yield word

      if word.lower() == 'love':
        # Log using the root logger at info or higher levels
        logging.info('Found : %s', word.lower())

# Remaining WordCount example code ...

Go

Contoh wordcount.go Apache Beam dapat diubah untuk menghasilkan pesan log saat kata "love" ditemukan dalam baris teks yang diproses.

func (f *extractFn) ProcessElement(ctx context.Context, line string, emit func(string)) {
    for _, word := range wordRE.FindAllString(line, -1) {
        // increment the counter for small words if length of words is
        // less than small_word_length
        if strings.ToLower(word) == "love" {
            log.Infof(ctx, "Found : %s", strings.ToLower(word))
        }

        emit(word)
    }
}

// Remaining Wordcount example

Java

Jika pipeline WordCount yang dimodifikasi dijalankan secara lokal menggunakan DirectRunner default dengan output yang dikirim ke file lokal (--output=./local-wordcounts), output konsol akan menyertakan pesan log yang ditambahkan:

INFO: Executing pipeline using the DirectRunner.
...
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
...
INFO: Pipeline execution complete.

Secara default, hanya baris log yang ditandai sebagai INFO dan lebih tinggi yang akan dikirim ke Cloud Logging. Jika Anda ingin mengubah perilaku ini, lihat Menetapkan Level Log Pekerja Pipeline.

Python

Jika pipeline WordCount yang dimodifikasi dijalankan secara lokal menggunakan DirectRunner default dengan output yang dikirim ke file lokal (--output=./local-wordcounts), output konsol akan menyertakan pesan log yang ditambahkan:

INFO:root:Found : love
INFO:root:Found : love
INFO:root:Found : love

Secara default, hanya baris log yang ditandai sebagai INFO dan lebih tinggi yang akan dikirim ke Cloud Logging.

Go

Jika pipeline WordCount yang dimodifikasi dijalankan secara lokal menggunakan DirectRunner default dengan output yang dikirim ke file lokal (--output=./local-wordcounts), output konsol akan menyertakan pesan log yang ditambahkan:

2022/05/26 11:36:44 Found : love
2022/05/26 11:36:44 Found : love
2022/05/26 11:36:44 Found : love

Secara default, hanya baris log yang ditandai sebagai INFO dan lebih tinggi yang akan dikirim ke Cloud Logging.

Mengontrol volume log

Anda juga dapat mengurangi volume log yang dihasilkan dengan mengubah level log pipeline. Jika Anda tidak ingin melanjutkan penyerapan sebagian atau semua log Dataflow, tambahkan pengecualian Logging untuk mengecualikan log Dataflow. Kemudian, ekspor log ke tujuan lain seperti BigQuery, Cloud Storage, atau Pub/Sub. Untuk mengetahui informasi selengkapnya, lihat Mengontrol penyerapan log Dataflow.

Batas logging dan throttling

Pesan log pekerja dibatasi hingga 15.000 pesan setiap 30 detik, per pekerja. Jika batas ini tercapai, satu pesan log pekerja akan ditambahkan yang menyatakan bahwa logging dibatasi:

Throttling logger worker. It used up its 30s quota for logs in only 12.345s
Tidak ada pesan lagi yang dicatat ke dalam log sampai interval 30 detik berakhir. Batas ini dibagikan oleh pesan log yang dihasilkan oleh Apache Beam SDK dan kode pengguna.

Penyimpanan dan retensi log

Log operasional disimpan di bucket log _Default. Nama layanan API logging adalah dataflow.googleapis.com. Untuk mengetahui informasi lebih lanjut tentang layanan dan jenis resource yang dimonitor Google Cloud yang digunakan dalam Cloud Logging, lihat Resource dan layanan yang dimonitor.

Untuk mengetahui detail tentang durasi entri log dipertahankan oleh Logging, baca informasi retensi di bagian Kuota dan batas: Periode retensi log.

Untuk mengetahui informasi tentang cara melihat log operasional, lihat Memantau dan melihat log pipeline.

Memantau dan melihat log pipeline

Saat menjalankan pipeline di layanan Dataflow, Anda dapat menggunakan antarmuka pemantauan Dataflow untuk melihat log yang dikeluarkan oleh pipeline Anda.

Contoh log pekerja Dataflow

Pipeline WordCount yang dimodifikasi dapat dijalankan di cloud dengan opsi berikut:

Java

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--tempLocation=gs://<bucket-name>/temp
--stagingLocation=gs://<bucket-name>/binaries

Python

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--staging_location=gs://<bucket-name>/binaries

Go

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--staging_location=gs://<bucket-name>/binaries

Lihat log

Karena pipeline cloud WordCount menggunakan eksekusi pemblokiran, pesan konsol dikeluarkan selama eksekusi pipeline. Setelah tugas dimulai, link ke halaman Google Cloud Console dihasilkan ke konsol, diikuti dengan ID tugas pipeline:

INFO: To access the Dataflow monitoring console, please navigate to
https://console.developers.google.com/dataflow/job/2017-04-13_13_58_10-6217777367720337669
Submitted job: 2017-04-13_13_58_10-6217777367720337669

URL konsol mengarah ke antarmuka pemantauan Dataflow dengan halaman ringkasan untuk tugas yang dikirim. Diagram ini menampilkan grafik eksekusi dinamis di sebelah kiri, dengan informasi ringkasan di sebelah kanan. Klik di panel bawah untuk meluaskan panel log.

Secara default, panel log menampilkan Log Tugas yang melaporkan status tugas secara keseluruhan. Anda dapat memfilter pesan yang muncul di panel log dengan mengklik Info dan Filter logs.

Memilih langkah pipeline dalam grafik akan mengubah tampilan menjadi Log Langkah yang dihasilkan oleh kode Anda dan kode yang dihasilkan yang berjalan di langkah pipeline.

Untuk kembali ke Job Logs, hapus langkah dengan mengklik di luar grafik atau menggunakan tombol Deselect step di panel samping kanan.

Mengklik tombol link eksternal dari panel log akan membuka Logging dengan menu untuk memilih berbagai jenis log.

Logging juga mencakup log infrastruktur lain untuk pipeline Anda. Untuk mengetahui detail selengkapnya tentang cara menjelajahi log, lihat panduan Logs explorer.

Jenis log

Berikut ini ringkasan berbagai jenis log yang tersedia untuk dilihat dari halaman Logging:

  • Log job-message berisi pesan tingkat tugas yang dihasilkan oleh berbagai komponen Dataflow. Contohnya mencakup konfigurasi penskalaan otomatis, kapan pekerja memulai atau menonaktifkan, progres pada langkah tugas, dan error tugas. Error tingkat pekerja yang berasal dari kode pengguna yang error dan yang ada di log worker juga menyebar ke log pesan tugas.
  • log worker dihasilkan oleh pekerja Dataflow. Pekerja melakukan sebagian besar pekerjaan pipeline (misalnya, menerapkan ParDo Anda ke data). Log Worker berisi pesan yang dicatat ke dalam log oleh kode dan Dataflow Anda.
  • Log worker-startup ada di sebagian besar tugas Dataflow dan dapat menangkap pesan yang terkait dengan proses startup. Proses startup termasuk mendownload stoples tugas dari Cloud Storage, lalu memulai pekerja. Jika ada masalah saat memulai pekerja, log ini adalah tempat yang tepat untuk dilihat.
  • Log shuffler berisi pesan dari pekerja yang menggabungkan hasil operasi pipeline paralel.
  • Log docker dan kubelet berisi pesan yang terkait dengan teknologi publik ini, yang digunakan pada pekerja Dataflow.
  • Log nvidia-mps berisi pesan tentang operasi NVIDIA Multi-Process Service (MPS).

Menetapkan level log pekerja pipeline

Java

Level logging SLF4J default yang ditetapkan pada pekerja oleh Apache Beam SDK untuk Java adalah INFO. Semua pesan log INFO atau yang lebih tinggi (INFO, WARN, ERROR) akan ditampilkan. Anda dapat menetapkan level log default yang berbeda untuk mendukung level logging SLF4J yang lebih rendah (TRACE atau DEBUG) atau menetapkan level log yang berbeda untuk berbagai paket class dalam kode Anda.

Opsi pipeline berikut disediakan untuk memungkinkan Anda menetapkan level log pekerja dari command line atau secara terprogram:

  • --defaultSdkHarnessLogLevel=<level>: menggunakan opsi ini untuk menetapkan semua logger pada level default yang ditentukan. Misalnya, opsi command line berikut akan mengganti level log INFO Dataflow default, dan menetapkannya ke DEBUG:
    --defaultSdkHarnessLogLevel=DEBUG
  • --sdkHarnessLogLevelOverrides={"<package or class>":"<level>"}: gunakan opsi ini untuk menetapkan level logging bagi paket atau class yang ditentukan. Misalnya, untuk mengganti level log pipeline default bagi paket org.apache.beam.runners.dataflow, dan menetapkannya ke TRACE:
    --sdkHarnessLogLevelOverrides='{"org.apache.beam.runners.dataflow":"TRACE"}'
    Untuk membuat beberapa penggantian, berikan peta JSON:
    (--sdkHarnessLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...}).
  • Opsi pipeline defaultSdkHarnessLogLevel dan sdkHarnessLogLevelOverrides tidak didukung dengan pipeline yang menggunakan Apache Beam SDK versi 2.50.0 dan yang lebih lama tanpa Runner v2. Jika demikian, gunakan opsi pipeline --defaultWorkerLogLevel=<level> dan --workerLogLevelOverrides={"<package or class>":"<level>"}. Untuk melakukan beberapa penggantian, berikan peta JSON:
    (--workerLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...})

Contoh berikut secara terprogram menetapkan opsi logging pipeline dengan nilai default yang dapat diganti dari command line:

 PipelineOptions options = ...
 SdkHarnessOptions loggingOptions = options.as(SdkHarnessOptions.class);
 // Overrides the default log level on the worker to emit logs at TRACE or higher.
 loggingOptions.setDefaultSdkHarnessLogLevel(LogLevel.TRACE);
 // Overrides the Foo class and "org.apache.beam.runners.dataflow" package to emit logs at WARN or higher.
 loggingOptions.getSdkHarnessLogLevelOverrides()
     .addOverrideForClass(Foo.class, LogLevel.WARN)
     .addOverrideForPackage(Package.getPackage("org.apache.beam.runners.dataflow"), LogLevel.WARN);

Python

Level logging default yang ditetapkan pada pekerja oleh Apache Beam SDK untuk Python adalah INFO. Semua pesan log INFO atau yang lebih tinggi (INFO, WARNING, ERROR, CRITICAL) akan dimunculkan. Anda dapat menetapkan level log default yang berbeda untuk mendukung level logging yang lebih rendah (DEBUG) atau menetapkan level log yang berbeda untuk modul yang berbeda dalam kode Anda.

Dua opsi pipeline disediakan untuk memungkinkan Anda menetapkan level log pekerja dari command line atau secara terprogram:

  • --default_sdk_harness_log_level=<level>: menggunakan opsi ini untuk menetapkan semua logger pada level default yang ditentukan. Misalnya, opsi command line berikut mengganti level log INFO Dataflow default, dan menetapkannya ke DEBUG:
    --default_sdk_harness_log_level=DEBUG
  • --sdk_harness_log_level_overrides={\"<module>\":\"<level>\"}: gunakan opsi ini untuk menetapkan level logging bagi modul yang ditentukan. Misalnya, untuk mengganti level log pipeline default bagi modul apache_beam.runners.dataflow, dan menetapkannya ke DEBUG:
    --sdk_harness_log_level_overrides={\"apache_beam.runners.dataflow\":\"DEBUG\"}
    Untuk membuat beberapa penggantian, berikan peta JSON:
    (--sdk_harness_log_level_overrides={\"<module>\":\"<level>\",\"<module>\":\"<level>\",...}).

Contoh berikut menggunakan class WorkerOptions untuk menetapkan opsi logging pipeline secara terprogram yang dapat diganti dari command line:

  from apache_beam.options.pipeline_options import PipelineOptions, WorkerOptions

  pipeline_args = [
    '--project=PROJECT_NAME',
    '--job_name=JOB_NAME',
    '--staging_location=gs://STORAGE_BUCKET/staging/',
    '--temp_location=gs://STORAGE_BUCKET/tmp/',
    '--region=DATAFLOW_REGION',
    '--runner=DataflowRunner'
  ]

  pipeline_options = PipelineOptions(pipeline_args)
  worker_options = pipeline_options.view_as(WorkerOptions)
  worker_options.default_sdk_harness_log_level = 'WARNING'

  # Note: In Apache Beam SDK 2.42.0 and earlier versions, use ['{"apache_beam.runners.dataflow":"WARNING"}']
  worker_options.sdk_harness_log_level_overrides = {"apache_beam.runners.dataflow":"WARNING"}

  # Pass in pipeline options during pipeline creation.
  with beam.Pipeline(options=pipeline_options) as pipeline:

Ganti kode berikut:

  • PROJECT_NAME: nama project
  • JOB_NAME: nama pekerjaan
  • STORAGE_BUCKET: nama Cloud Storage
  • DATAFLOW_REGION: region tempat Anda ingin men-deploy tugas Dataflow

    Flag --region menggantikan region default yang ditetapkan di server metadata, klien lokal, atau variabel lingkungan.

Go

Fitur ini tidak tersedia di Apache Beam SDK untuk Go.

Lihat log tugas BigQuery yang diluncurkan

Saat menggunakan BigQuery di pipeline Dataflow, tugas BigQuery diluncurkan untuk melakukan berbagai tindakan atas nama Anda. Tindakan ini dapat mencakup memuat data, mengekspor data, dan sebagainya. Untuk tujuan pemecahan masalah dan pemantauan, antarmuka pemantauan Dataflow memiliki informasi tambahan tentang tugas BigQuery yang tersedia di panel Logs.

Informasi tugas BigQuery yang ditampilkan di panel Logs disimpan dan dimuat dari tabel sistem BigQuery. Biaya penagihan timbul saat tabel BigQuery pokok dikueri.

Melihat detail tugas BigQuery

Untuk melihat informasi tugas BigQuery, pipeline Anda harus menggunakan Apache Beam 2.24.0 atau yang lebih baru.

Untuk menampilkan daftar tugas BigQuery, buka tab BigQuery Jobs, lalu pilih lokasi tugas BigQuery. Selanjutnya, klik Load BigQuery Jobs dan konfirmasi dialog yang muncul. Setelah kueri selesai, daftar tugas akan ditampilkan.

Tombol Muat BigQuery Tugas di tabel informasi tugas BigQuery

Informasi dasar tentang setiap pekerjaan disediakan, termasuk ID pekerjaan, jenis, durasi, dan sebagainya.

Tabel yang menampilkan tugas BigQuery yang dijalankan selama eksekusi tugas pipeline saat ini.

Untuk informasi yang lebih mendetail tentang pekerjaan tertentu, klik Command line di kolom More Info.

Di jendela modal untuk command line, salin perintah bq jobs explain dan jalankan secara lokal atau di Cloud Shell.

gcloud alpha bq jobs describe BIGQUERY_JOB_ID

Perintah bq jobs describe menghasilkan JobStatistics, yang memberikan detail lebih lanjut yang berguna saat mendiagnosis tugas BigQuery yang lambat atau macet.

Atau, saat Anda menggunakan BigQueryIO dengan kueri SQL, tugas kueri akan dikeluarkan. Untuk melihat kueri SQL yang digunakan oleh tugas, klik View query di kolom More Info.