Apache Flink

Integrasi Apache Flink mengumpulkan log klien, jobmanager, dan taskmanager, serta menguraikannya menjadi payload JSON. Hasilnya mencakup kolom untuk logger, level, dan pesan.

Untuk informasi selengkapnya tentang Flink, lihat dokumentasi Apache Flink.

Prasyarat

Untuk mengumpulkan telemetri Flink, Anda harus menginstal Agen Operasional:

  • Untuk log, instal versi 2.17.0 atau yang lebih baru.
  • Untuk metrik, instal versi 2.18.1 atau yang lebih baru.

Integrasi ini mendukung Flink versi 1.12.5, 1.13.6, dan 1.14.4.

Mengonfigurasi Agen Operasional untuk Flink

Dengan mengikuti panduan untuk Mengonfigurasi Agen Operasional, tambahkan elemen yang diperlukan untuk mengumpulkan telemetri dari instance Flink, dan mulai ulang agen.

Contoh konfigurasi

Perintah berikut membuat konfigurasi untuk mengumpulkan dan menyerap telemetri untuk Flink dan memulai ulang Agen Operasional.

# Configures Ops Agent to collect telemetry from the app and restart Ops Agent.

set -e

# Create a back up of the existing file so existing configurations are not lost.
sudo cp /etc/google-cloud-ops-agent/config.yaml /etc/google-cloud-ops-agent/config.yaml.bak

# Configure the Ops Agent.
sudo tee /etc/google-cloud-ops-agent/config.yaml > /dev/null << EOF
metrics:
  receivers:
    flink:
      type: flink
  service:
    pipelines:
      flink:
        receivers:
          - flink
logging:
  receivers:
    flink:
      type: flink
  service:
    pipelines:
      flink:
        receivers:
          - flink
EOF

sudo service google-cloud-ops-agent restart
sleep 30

Untuk menyerap log dari Flink, Anda harus membuat penerima untuk log yang dihasilkan Flink, lalu membuat pipeline untuk penerima baru tersebut.

Untuk mengonfigurasi penerima log flink, tentukan kolom berikut:

Kolom Default Deskripsi
exclude_paths Daftar pola jalur sistem file yang akan dikecualikan dari kumpulan yang cocok dengan include_paths.
include_paths [/opt/flink/log/flink-*-standalonesession-*.log, /opt/flink/log/flink-*-taskexecutor-*.log, /opt/flink/log/flink-*-client-*.log] Daftar jalur sistem file yang akan dibaca dengan tailing setiap file. Karakter pengganti (*) dapat digunakan di jalur.
record_log_file_path false Jika ditetapkan ke true, jalur ke file tertentu tempat data log diperoleh akan muncul di entri log output sebagai nilai label agent.googleapis.com/log_file_path. Saat menggunakan karakter pengganti, hanya jalur file tempat data diperoleh yang akan dicatat.
type Nilai harus berupa flink.
wildcard_refresh_interval 60s Interval tempat jalur file karakter pengganti di include_paths dimuat ulang. Diberikan sebagai durasi waktu, misalnya 30s atau 2m. Properti ini mungkin berguna pada throughput logging yang tinggi dengan file log dirotasi lebih cepat daripada interval default.

Apa itu log?

logName berasal dari ID penerima yang ditentukan dalam konfigurasi. Kolom mendetail di dalam LogEntry adalah sebagai berikut.

Log flink berisi kolom berikut di LogEntry:

Kolom Jenis Deskripsi
jsonPayload.level string Log tingkat entri
jsonPayload.message string Pesan log, termasuk stacktrace terperinci jika diberikan.
jsonPayload.source string Class Java sumber entri log.
severity string (LogSeverity) Log tingkat entri (diterjemahkan).

Untuk menyerap metrik dari Flink, Anda harus membuat penerima untuk metrik yang dihasilkan Flink, lalu membuat pipeline untuk penerima baru tersebut.

Penerima ini tidak mendukung penggunaan beberapa instance dalam konfigurasi, misalnya, untuk memantau beberapa endpoint. Semua instance tersebut menulis ke deret waktu yang sama, dan Cloud Monitoring tidak memiliki cara untuk membedakannya.

Untuk mengonfigurasi penerima metrik flink, tentukan kolom berikut:

Kolom Default Deskripsi
collection_interval 60s Nilai time.Duration, seperti 30s atau 5m.
endpoint http://localhost:8081 URL yang diekspos oleh Flink.
type Nilai harus berupa flink.

Hal yang dipantau

Tabel berikut menyediakan daftar metrik yang dikumpulkan Agen Operasional dari instance Flink.

Jenis metrik
Jenis, Jenis
Resource yang dipantau
Label
workload.googleapis.com/flink.job.checkpoint.count
CUMULATIVEINT64
gce_instance
host_name
job_name
checkpoint
workload.googleapis.com/flink.job.checkpoint.in_progress
GAUGEINT64
gce_instance
host_name
job_name
workload.googleapis.com/flink.job.last_checkpoint.size
GAUGEINT64
gce_instance
host_name
job_name
workload.googleapis.com/flink.job.last_checkpoint.time
GAUGEINT64
gce_instance
host_name
job_name
workload.googleapis.com/flink.job.restart.count
CUMULATIVEINT64
gce_instance
host_name
job_name
workload.googleapis.com/flink.jvm.class_loader.classes_loaded
CUMULATIVEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.cpu.load
GAUGEDOUBLE
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.cpu.time
CUMULATIVEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.gc.collections.count
CUMULATIVEINT64
gce_instance
host_name
resource_type
taskmanager_id
garbage_collector_name
workload.googleapis.com/flink.jvm.gc.collections.time
CUMULATIVEINT64
gce_instance
host_name
resource_type
taskmanager_id
garbage_collector_name
workload.googleapis.com/flink.jvm.memory.direct.total_capacity
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.direct.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.heap.committed
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.heap.max
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.heap.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.mapped.total_capacity
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.mapped.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.metaspace.committed
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.metaspace.max
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.metaspace.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.nonheap.committed
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.nonheap.max
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.nonheap.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.threads.count
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.memory.managed.total
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.memory.managed.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.operator.record.count
CUMULATIVEINT64
gce_instance
host_name
taskmanager_id
job_name
operator_name
task_name
subtask_index
record
workload.googleapis.com/flink.operator.watermark.output
GAUGEINT64
gce_instance
host_name
job_name
operator_name
subtask_index
task_name
taskmanager_id
workload.googleapis.com/flink.task.record.count
CUMULATIVEINT64
gce_instance
host_name
taskmanager_id
job_name
task_name
subtask_index
record

Memverifikasi konfigurasi

Bagian ini menjelaskan cara memverifikasi bahwa Anda mengonfigurasi penerima Flink dengan benar. Mungkin perlu waktu satu atau dua menit bagi Agen Operasional untuk mulai mengumpulkan telemetri.

Untuk memastikan bahwa log Flink dikirim ke Cloud Logging, lakukan hal berikut:

  1. Pada panel navigasi Google Cloud Console, pilih Logging, lalu pilih Logs Explorer:

    Buka Logs Explorer

  2. Masukkan kueri berikut di editor, lalu klik Run query:
    resource.type="gce_instance"
    log_id("flink")
    

Untuk memverifikasi bahwa metrik Flink dikirim ke Cloud Monitoring, lakukan hal berikut:

  1. Pada panel navigasi Konsol Google Cloud, pilih Monitoring, lalu pilih  Metrics Explorer:

    Buka Metrics Explorer

  2. Di toolbar panel pembuat kueri, pilih tombol yang namanya adalah  MQL atau  PromQL.
  3. Pastikan MQL dipilih pada tombol Language. Tombol bahasa berada di toolbar yang sama dengan yang memungkinkan Anda memformat kueri.
  4. Masukkan kueri berikut di editor, lalu klik Run query:
    fetch gce_instance
    | metric 'workload.googleapis.com/flink.jvm.memory.heap.used'
    | every 1m
    

Lihat dasbor

Untuk melihat metrik Flink, Anda harus mengonfigurasi diagram atau dasbor. Integrasi Flink menyertakan satu atau beberapa dasbor untuk Anda. Setiap dasbor akan otomatis diinstal setelah Anda mengonfigurasi integrasi dan Agen Operasional mulai mengumpulkan data metrik.

Anda juga dapat melihat pratinjau statis dasbor tanpa menginstal integrasi.

Untuk melihat dasbor yang terinstal, lakukan hal berikut:

  1. Pada panel navigasi Konsol Google Cloud, pilih Monitoring, lalu pilih  Dashboards:

    Buka Dasbor

  2. Pilih tab Dashboard List, lalu pilih kategori Integrations.
  3. Klik nama dasbor yang ingin Anda lihat.

Jika Anda telah mengonfigurasi integrasi, tetapi dasbor belum diinstal, periksa apakah Agen Operasional sedang berjalan. Jika tidak ada data metrik untuk diagram di dasbor, penginstalan dasbor akan gagal. Setelah Agen Operasional mulai mengumpulkan metrik, dasbor akan diinstal untuk Anda.

Untuk melihat pratinjau statis dasbor, lakukan langkah berikut:

  1. Pada panel navigasi Konsol Google Cloud, pilih Monitoring, lalu pilih  Integrations:

    Buka Integrations

  2. Klik filter platform deployment Compute Engine.
  3. Temukan entri untuk Flink, lalu klik View Details.
  4. Pilih tab Dashboards untuk melihat pratinjau statis. Jika dasbor sudah terinstal, Anda dapat membukanya dengan mengklik View dashboard.

Untuk informasi selengkapnya tentang dasbor di Cloud Monitoring, lihat Dasbor dan diagram.

Untuk mengetahui informasi selengkapnya tentang penggunaan halaman Integrasi, lihat Mengelola integrasi.

Menginstal kebijakan pemberitahuan

Kebijakan pemberitahuan menginstruksikan Cloud Monitoring untuk memberi tahu Anda saat kondisi tertentu terjadi. Integrasi Flink mencakup satu atau beberapa kebijakan pemberitahuan yang dapat Anda gunakan. Anda dapat melihat dan menginstal kebijakan pemberitahuan ini dari halaman Integrations di Monitoring.

Untuk melihat deskripsi kebijakan pemberitahuan yang tersedia dan menginstalnya, lakukan langkah berikut:

  1. Pada panel navigasi Konsol Google Cloud, pilih Monitoring, lalu pilih  Integrations:

    Buka Integrations

  2. Temukan entri untuk Flink, lalu klik View Details.
  3. Pilih tab Alerts. Tab ini memberikan deskripsi tentang kebijakan pemberitahuan yang tersedia dan menyediakan antarmuka untuk menginstalnya.
  4. Instal kebijakan pemberitahuan. Kebijakan pemberitahuan perlu mengetahui ke mana pemberitahuan harus dikirimkan jika pemberitahuan telah dipicu, sehingga memerlukan informasi dari Anda untuk penginstalan. Untuk menginstal kebijakan pemberitahuan, lakukan tindakan berikut:
    1. Dari daftar kebijakan pemberitahuan yang tersedia, pilih yang ingin Anda instal.
    2. Di bagian Configure notifications, pilih satu atau beberapa saluran notifikasi. Anda memiliki opsi untuk menonaktifkan penggunaan saluran notifikasi. Namun, jika Anda melakukannya, kebijakan pemberitahuan akan diaktifkan secara diam-diam. Anda dapat memeriksa statusnya di Monitoring, tetapi Anda tidak akan menerima notifikasi.

      Untuk informasi lebih lanjut tentang saluran notifikasi, lihat Mengelola saluran notifikasi.

    3. Klik Create Policy.

Untuk mengetahui informasi selengkapnya tentang kebijakan pemberitahuan di Cloud Monitoring, lihat Pengantar pemberitahuan.

Untuk mengetahui informasi selengkapnya tentang penggunaan halaman Integrasi, lihat Mengelola integrasi.

Langkah selanjutnya

Untuk panduan cara menggunakan Ansible untuk menginstal Agen Operasional, mengonfigurasi aplikasi pihak ketiga, dan menginstal contoh dasbor, lihat video Menginstal Agen Operasional untuk memecahkan masalah aplikasi pihak ketiga.