Apache Flink

Integrasi Apache Flink mengumpulkan log klien, pengelola tugas, dan pengelola tugas, lalu mengurainya menjadi payload JSON. Hasilnya mencakup kolom untuk sumber, level, dan pesan.

Untuk informasi selengkapnya tentang Flink, lihat dokumentasi Apache Flink.

Prasyarat

Untuk mengumpulkan telemetri Flink, Anda harus menginstal Agen Operasional:

  • Untuk metrik, instal versi 2.18.1 atau yang lebih tinggi.
  • Untuk log, instal versi 2.17.0 atau yang lebih tinggi.

Integrasi ini mendukung Flink versi 1.12.5, 1.13.6, dan 1.14.4.

Mengonfigurasi Agen Operasional untuk Flink

Dengan mengikuti panduan untuk Mengonfigurasi Agen Ops, tambahkan elemen yang diperlukan untuk mengumpulkan telemetri dari instance Flink, dan mulai ulang agen.

Contoh konfigurasi

Perintah berikut membuat konfigurasi untuk mengumpulkan dan menyerap telemetri untuk Flink dan memulai ulang Ops Agent.

# Configures Ops Agent to collect telemetry from the app and restart Ops Agent.

set -e

# Create a back up of the existing file so existing configurations are not lost.
sudo cp /etc/google-cloud-ops-agent/config.yaml /etc/google-cloud-ops-agent/config.yaml.bak

# Configure the Ops Agent.
sudo tee /etc/google-cloud-ops-agent/config.yaml > /dev/null << EOF
metrics:
  receivers:
    flink:
      type: flink
  service:
    pipelines:
      flink:
        receivers:
          - flink
logging:
  receivers:
    flink:
      type: flink
  service:
    pipelines:
      flink:
        receivers:
          - flink
EOF

sudo service google-cloud-ops-agent restart
sleep 30

Untuk menyerap log dari Flink, Anda harus membuat penerima untuk log yang dihasilkan Flink, lalu membuat pipeline untuk penerima baru.

Untuk mengonfigurasi penerima log flink, tentukan kolom berikut:

Kolom Default Deskripsi
exclude_paths Daftar pola jalur sistem file yang akan dikecualikan dari kumpulan yang dicocokkan oleh include_paths.
include_paths [/opt/flink/log/flink-*-standalonesession-*.log, /opt/flink/log/flink-*-taskexecutor-*.log, /opt/flink/log/flink-*-client-*.log] Daftar jalur sistem file yang akan dibaca dengan mengikuti setiap file. Karakter pengganti (*) dapat digunakan dalam jalur.
record_log_file_path false Jika ditetapkan ke true, jalur ke file tertentu tempat data log diperoleh akan muncul di entri log output sebagai nilai label agent.googleapis.com/log_file_path. Saat menggunakan karakter pengganti, hanya jalur file tempat data diperoleh yang dicatat.
type Nilai ini harus flink.
wildcard_refresh_interval 60s Interval saat jalur file karakter pengganti di include_paths diperbarui. Diberikan sebagai durasi waktu, misalnya 30s atau 2m. Properti ini mungkin berguna dalam throughput logging yang tinggi, dengan file log dirotasi lebih cepat daripada interval default.

Apa itu log?

logName berasal dari ID penerima yang ditentukan dalam konfigurasi. Kolom mendetail di dalam LogEntry adalah sebagai berikut.

Log flink berisi kolom berikut di LogEntry:

Kolom Jenis Deskripsi
jsonPayload.level string Tingkat entri log
jsonPayload.message string Pesan log, termasuk stacktrace mendetail jika tersedia
jsonPayload.source string Class Java sumber entri log
severity string (LogSeverity) Level entri log (diterjemahkan).

Untuk menyerap metrik dari Flink, Anda harus membuat penerima untuk metrik yang dihasilkan Flink, lalu membuat pipeline untuk penerima baru.

Penerima ini tidak mendukung penggunaan beberapa instance dalam konfigurasi, misalnya, untuk memantau beberapa endpoint. Semua instance tersebut menulis ke deret waktu yang sama, dan Cloud Monitoring tidak dapat membedakannya.

Untuk mengonfigurasi penerima metrik flink, tentukan kolom berikut:

Kolom Default Deskripsi
collection_interval 60s Nilai durasi waktu, seperti 30s atau 5m.
endpoint http://localhost:8081 URL yang ditampilkan oleh Flink.
type Nilai ini harus flink.

Yang dipantau

Tabel berikut memberikan daftar metrik yang dikumpulkan Ops Agent dari instance Flink.

Jenis metrik 
Jenis, Tipe
Resource yang dimonitor
Label
workload.googleapis.com/flink.job.checkpoint.count
CUMULATIVEINT64
gce_instance
checkpoint
host_name
job_name
workload.googleapis.com/flink.job.checkpoint.in_progress
GAUGEINT64
gce_instance
host_name
job_name
workload.googleapis.com/flink.job.last_checkpoint.size
GAUGEINT64
gce_instance
host_name
job_name
workload.googleapis.com/flink.job.last_checkpoint.time
GAUGEINT64
gce_instance
host_name
job_name
workload.googleapis.com/flink.job.restart.count
CUMULATIVEINT64
gce_instance
host_name
job_name
workload.googleapis.com/flink.jvm.class_loader.classes_loaded
CUMULATIVEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.cpu.load
GAUGEDOUBLE
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.cpu.time
CUMULATIVEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.gc.collections.count
CUMULATIVEINT64
gce_instance
garbage_collector_name
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.gc.collections.time
CUMULATIVEINT64
gce_instance
garbage_collector_name
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.direct.total_capacity
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.direct.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.heap.committed
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.heap.max
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.heap.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.mapped.total_capacity
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.mapped.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.metaspace.committed
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.metaspace.max
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.metaspace.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.nonheap.committed
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.nonheap.max
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.nonheap.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.threads.count
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.memory.managed.total
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.memory.managed.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.operator.record.count
CUMULATIVEINT64
gce_instance
host_name
job_name
operator_name
record
subtask_index
task_name
taskmanager_id
workload.googleapis.com/flink.operator.watermark.output
GAUGEINT64
gce_instance
host_name
job_name
operator_name
subtask_index
task_name
taskmanager_id
workload.googleapis.com/flink.task.record.count
CUMULATIVEINT64
gce_instance
host_name
job_name
record
subtask_index
task_name
taskmanager_id

Memverifikasi konfigurasi

Bagian ini menjelaskan cara memverifikasi bahwa Anda telah mengonfigurasi penerima Flink dengan benar. Mungkin perlu waktu satu atau dua menit agar Ops Agent mulai mengumpulkan telemetri.

Untuk memverifikasi bahwa log Flink dikirim ke Cloud Logging, lakukan tindakan berikut:

  1. Di konsol Google Cloud, buka halaman Logs Explorer:

    Buka Logs Explorer

    Jika Anda menggunakan kotak penelusuran untuk menemukan halaman ini, pilih hasil yang subjudulnya adalah Logging.

  2. Masukkan kueri berikut di editor, lalu klik Run query:
    resource.type="gce_instance"
    log_id("flink")
    

Untuk memverifikasi bahwa metrik Flink dikirim ke Cloud Monitoring, lakukan tindakan berikut:

  1. Di konsol Google Cloud, buka halaman  Metrics explorer:

    Buka Metrics explorer

    Jika Anda menggunakan kotak penelusuran untuk menemukan halaman ini, pilih hasil yang subjudulnya adalah Monitoring.

  2. Di toolbar panel pembuat kueri, pilih tombol yang namanya adalah  MQL atau  PromQL.
  3. Pastikan MQL dipilih di tombol Language. Tombol bahasa berada di toolbar yang sama yang memungkinkan Anda memformat kueri.
  4. Masukkan kueri berikut di editor, lalu klik Run query:
    fetch gce_instance
    | metric 'workload.googleapis.com/flink.jvm.memory.heap.used'
    | every 1m
    

Lihat dasbor

Untuk melihat metrik Flink, Anda harus mengonfigurasi diagram atau dasbor. Integrasi Flink menyertakan satu atau beberapa dasbor untuk Anda. Dasbor apa pun akan otomatis diinstal setelah Anda mengonfigurasi integrasi dan Agen Operasional telah mulai mengumpulkan data metrik.

Anda juga dapat melihat pratinjau statis dasbor tanpa menginstal integrasi.

Untuk melihat dasbor yang terinstal, lakukan tindakan berikut:

  1. Di konsol Google Cloud, buka halaman  Dasbor:

    Buka Dasbor

    Jika Anda menggunakan kotak penelusuran untuk menemukan halaman ini, pilih hasil yang subjudulnya adalah Monitoring.

  2. Pilih tab Daftar Dasbor, lalu pilih kategori Integrasi.
  3. Klik nama dasbor yang ingin Anda lihat.

Jika Anda telah mengonfigurasi integrasi, tetapi dasbor belum diinstal, pastikan Agen Operasional berjalan. Jika tidak ada data metrik untuk diagram di dasbor, penginstalan dasbor akan gagal. Setelah Ops Agent mulai mengumpulkan metrik, dasbor akan diinstal untuk Anda.

Untuk melihat pratinjau statis dasbor, lakukan tindakan berikut:

  1. Di konsol Google Cloud, buka halaman  Integrations:

    Buka Integrations

    Jika Anda menggunakan kotak penelusuran untuk menemukan halaman ini, pilih hasil yang subjudulnya adalah Monitoring.

  2. Klik filter platform deployment Compute Engine.
  3. Temukan entri untuk Flink dan klik Lihat Detail.
  4. Pilih tab Dasbor untuk melihat pratinjau statis. Jika dasbor telah diinstal, Anda dapat membukanya dengan mengklik Lihat dasbor.

Untuk mengetahui informasi selengkapnya tentang dasbor di Cloud Monitoring, lihat Dasbor dan diagram.

Untuk informasi selengkapnya tentang cara menggunakan halaman Integrasi, lihat Mengelola integrasi.

Menginstal kebijakan pemberitahuan

Kebijakan pemberitahuan menginstruksikan Cloud Monitoring untuk memberi tahu Anda saat kondisi yang ditentukan terjadi. Integrasi Flink menyertakan satu atau beberapa kebijakan pemberitahuan untuk Anda gunakan. Anda dapat melihat dan menginstal kebijakan pemberitahuan ini dari halaman Integrasi di Monitoring.

Untuk melihat deskripsi kebijakan pemberitahuan yang tersedia dan menginstalnya, lakukan hal berikut:

  1. Di konsol Google Cloud, buka halaman  Integrations:

    Buka Integrations

    Jika Anda menggunakan kotak penelusuran untuk menemukan halaman ini, pilih hasil yang subjudulnya adalah Monitoring.

  2. Temukan entri untuk Flink dan klik Lihat Detail.
  3. Pilih tab Notifikasi. Tab ini memberikan deskripsi tentang kebijakan pemberitahuan yang tersedia dan menyediakan antarmuka untuk menginstalnya.
  4. Instal kebijakan pemberitahuan. Kebijakan pemberitahuan perlu mengetahui tempat untuk mengirim notifikasi bahwa pemberitahuan telah dipicu, sehingga memerlukan informasi dari Anda untuk penginstalan. Untuk menginstal kebijakan pemberitahuan, lakukan hal berikut:
    1. Dari daftar kebijakan pemberitahuan yang tersedia, pilih kebijakan yang ingin Anda instal.
    2. Di bagian Konfigurasi notifikasi, pilih satu atau beberapa saluran notifikasi. Anda memiliki opsi untuk menonaktifkan penggunaan saluran notifikasi, tetapi jika Anda melakukannya, kebijakan pemberitahuan akan diaktifkan secara otomatis. Anda dapat memeriksa statusnya di Pemantauan, tetapi Anda tidak menerima notifikasi.

      Untuk informasi selengkapnya tentang saluran notifikasi, lihat Mengelola saluran notifikasi.

    3. Klik Create Policies.

Untuk informasi selengkapnya tentang kebijakan pemberitahuan di Cloud Monitoring, lihat Pengantar pemberitahuan.

Untuk informasi selengkapnya tentang cara menggunakan halaman Integrasi, lihat Mengelola integrasi.

Langkah selanjutnya

Untuk panduan tentang cara menggunakan Ansible untuk menginstal Agen Operasional, mengonfigurasi aplikasi pihak ketiga, dan menginstal contoh dasbor, lihat video Menginstal Agen Operasional untuk memecahkan masalah aplikasi pihak ketiga.