The Apache Flink integration collects client, jobmanager and taskmanager logs and parses them into a JSON payload. The result includes fields for logger, level, and message.
For more information about Flink, see the Apache Flink documentation.
Prerequisites
To collect Flink telemetry, you must install the Ops Agent:
- For logs, install version 2.17.0 or higher.
- For metrics, install version 2.18.1 or higher.
This integration supports Flink versions 1.13.6 and 1.14.4.
Configure the Ops Agent for Flink
Following the guide for Configuring the Ops Agent, add the required elements to collect telemetry from Flink instances, and restart the agent.
Example configuration
The following command creates the configuration to collect and ingest telemetry for Flink and restarts the Ops Agent.
Configure logs collection
To ingest logs from Flink, you must create receivers for the logs that Flink produces and then create a pipeline for the new receivers.
To configure a receiver for your flink
logs, specify the following
fields:
Field | Default | Description |
---|---|---|
exclude_paths |
A list of filesystem path patterns to exclude from the set matched by include_paths . |
|
include_paths |
[/opt/flink/log/flink-*-standalonesession-*.log, /opt/flink/log/flink-*-taskexecutor-*.log, /opt/flink/log/flink-*-client-*.log] |
A list of filesystem paths to read by tailing each file. A wild card (* ) can be used in the paths. |
record_log_file_path |
false |
If set to true , then the path to the specific file from which the log record was obtained appears in the output log entry as the value of the agent.googleapis.com/log_file_path label. When using a wildcard, only the path of the file from which the record was obtained is recorded. |
type |
The value must be flink . |
|
wildcard_refresh_interval |
60s |
The interval at which wildcard file paths in include_paths are refreshed. Given as a time duration, for example 30s or 2m . This property might be useful under high logging throughputs where log files are rotated faster than the default interval. |
What is logged
The logName
is derived from
the receiver IDs specified in the configuration. Detailed fields inside the
LogEntry
are as follows.
The flink
logs contain the following fields in the LogEntry
:
Field | Type | Description |
---|---|---|
jsonPayload.message |
string | Log message, including detailed stacktrace where provided. |
jsonPayload.source |
string | The source Java class of the log entry. |
severity |
string (LogSeverity ) |
Log entry level (translated). |
Configure metrics collection
To ingest metrics from Flink, you must create receivers for the metrics that Flink produces and then create a pipeline for the new receivers.
To configure a receiver for your flink
metrics, specify the following
fields:
Field | Default | Description |
---|---|---|
collection_interval |
60s |
A time.Duration value, such as 30s or 5m . |
endpoint |
http://localhost:8081 |
The URL exposed by Flink. |
type |
The value must be flink . |
What is monitored
The following table provides the list of metrics that the Ops Agent collects from the Flink instance.
Metric type | |
---|---|
Kind, Type Monitored resources |
Labels |
workload.googleapis.com/flink.job.checkpoint.count
|
|
CUMULATIVE , INT64 gce_instance |
host_name job_name checkpoint
|
workload.googleapis.com/flink.job.checkpoint.in_progress
|
|
GAUGE , INT64 gce_instance |
host_name job_name
|
workload.googleapis.com/flink.job.last_checkpoint.size
|
|
GAUGE , INT64 gce_instance |
host_name job_name
|
workload.googleapis.com/flink.job.last_checkpoint.time
|
|
GAUGE , INT64 gce_instance |
host_name job_name
|
workload.googleapis.com/flink.job.restart.count
|
|
CUMULATIVE , INT64 gce_instance |
host_name job_name
|
workload.googleapis.com/flink.jvm.class_loader.classes_loaded
|
|
CUMULATIVE , INT64 gce_instance |
host_name resource_type taskmanager_id
|
workload.googleapis.com/flink.jvm.cpu.load
|
|
GAUGE , DOUBLE gce_instance |
host_name resource_type taskmanager_id
|
workload.googleapis.com/flink.jvm.cpu.time
|
|
CUMULATIVE , INT64 gce_instance |
host_name resource_type taskmanager_id
|
workload.googleapis.com/flink.jvm.gc.collections.count
|
|
CUMULATIVE , INT64 gce_instance |
host_name resource_type taskmanager_id garbage_collector_name
|
workload.googleapis.com/flink.jvm.gc.collections.time
|
|
CUMULATIVE , INT64 gce_instance |
host_name resource_type taskmanager_id garbage_collector_name
|
workload.googleapis.com/flink.jvm.memory.direct.total_capacity
|
|
GAUGE , INT64 gce_instance |
host_name resource_type taskmanager_id
|
workload.googleapis.com/flink.jvm.memory.direct.used
|
|
GAUGE , INT64 gce_instance |
host_name resource_type taskmanager_id
|
workload.googleapis.com/flink.jvm.memory.heap.committed
|
|
GAUGE , INT64 gce_instance |
host_name resource_type taskmanager_id
|
workload.googleapis.com/flink.jvm.memory.heap.max
|
|
GAUGE , INT64 gce_instance |
host_name resource_type taskmanager_id
|
workload.googleapis.com/flink.jvm.memory.heap.used
|
|
GAUGE , INT64 gce_instance |
host_name resource_type taskmanager_id
|
workload.googleapis.com/flink.jvm.memory.mapped.total_capacity
|
|
GAUGE , INT64 gce_instance |
host_name resource_type taskmanager_id
|
workload.googleapis.com/flink.jvm.memory.mapped.used
|
|
GAUGE , INT64 gce_instance |
host_name resource_type taskmanager_id
|
workload.googleapis.com/flink.jvm.memory.metaspace.committed
|
|
GAUGE , INT64 gce_instance |
host_name resource_type taskmanager_id
|
workload.googleapis.com/flink.jvm.memory.metaspace.max
|
|
GAUGE , INT64 gce_instance |
host_name resource_type taskmanager_id
|
workload.googleapis.com/flink.jvm.memory.metaspace.used
|
|
GAUGE , INT64 gce_instance |
host_name resource_type taskmanager_id
|
workload.googleapis.com/flink.jvm.memory.nonheap.committed
|
|
GAUGE , INT64 gce_instance |
host_name resource_type taskmanager_id
|
workload.googleapis.com/flink.jvm.memory.nonheap.max
|
|
GAUGE , INT64 gce_instance |
host_name resource_type taskmanager_id
|
workload.googleapis.com/flink.jvm.memory.nonheap.used
|
|
GAUGE , INT64 gce_instance |
host_name resource_type taskmanager_id
|
workload.googleapis.com/flink.jvm.threads.count
|
|
GAUGE , INT64 gce_instance |
host_name resource_type taskmanager_id
|
workload.googleapis.com/flink.memory.managed.total
|
|
GAUGE , INT64 gce_instance |
host_name resource_type taskmanager_id
|
workload.googleapis.com/flink.memory.managed.used
|
|
GAUGE , INT64 gce_instance |
host_name resource_type taskmanager_id
|
workload.googleapis.com/flink.operator.record.count
|
|
CUMULATIVE , INT64 gce_instance |
host_name taskmanager_id job_name operator_name task_name subtask_index record
|
workload.googleapis.com/flink.operator.watermark.output
|
|
GAUGE , INT64 gce_instance |
host_name job_name operator_name subtask_index task_name taskmanager_id
|
workload.googleapis.com/flink.task.record.count
|
|
CUMULATIVE , INT64 gce_instance |
host_name taskmanager_id job_name task_name subtask_index record
|
Sample dashboard
To view your Flink metrics, you must have a chart or dashboard configured. Cloud Monitoring provides a library of sample dashboards for integrations, which contain preconfigured charts. For information about installing these dashboards, see Installing sample dashboards.
Verify the configuration
This section describes how to verify that you correctly configured the Flink receiver. It might take one or two minutes for the Ops Agent to begin collecting telemetry.
To verify that the logs are ingested, go to the Logs Explorer and run the following query to view the Flink logs:
resource.type="gce_instance"
log_id("flink")
What's next
For a walkthrough on how to use Ansible to install the Ops Agent, configure a third-party application, and install a sample dashboard, see the Install the Ops Agent to troubleshoot third-party applications video.