Apache Flink

Stay organized with collections Save and categorize content based on your preferences.

The Apache Flink integration collects client, jobmanager and taskmanager logs and parses them into a JSON payload. The result includes fields for logger, level, and message.

For more information about Flink, see the Apache Flink documentation.

Prerequisites

To collect Flink telemetry, you must install the Ops Agent:

  • For logs, install version 2.17.0 or higher.
  • For metrics, install version 2.18.1 or higher.

This integration supports Flink versions 1.13.6 and 1.14.4.

Configure the Ops Agent for Flink

Following the guide for Configuring the Ops Agent, add the required elements to collect telemetry from Flink instances, and restart the agent.

Example configuration

The following command creates the configuration to collect and ingest telemetry for Flink and restarts the Ops Agent.

# Configures Ops Agent to collect telemetry from the app and restart Ops Agent.

set -e

sudo tee /etc/google-cloud-ops-agent/config.yaml > /dev/null << EOF
metrics:
  receivers:
    flink:
      type: flink
  service:
    pipelines:
      flink:
        receivers:
          - flink
logging:
  receivers:
    flink:
      type: flink
  service:
    pipelines:
      flink:
        receivers:
          - flink
EOF

sudo service google-cloud-ops-agent restart
sleep 30

To ingest logs from Flink, you must create receivers for the logs that Flink produces and then create a pipeline for the new receivers.

To configure a receiver for your flink logs, specify the following fields:

Field Default Description
exclude_paths A list of filesystem path patterns to exclude from the set matched by include_paths.
include_paths [/opt/flink/log/flink-*-standalonesession-*.log, /opt/flink/log/flink-*-taskexecutor-*.log, /opt/flink/log/flink-*-client-*.log] A list of filesystem paths to read by tailing each file. A wild card (*) can be used in the paths.
record_log_file_path false If set to true, then the path to the specific file from which the log record was obtained appears in the output log entry as the value of the agent.googleapis.com/log_file_path label. When using a wildcard, only the path of the file from which the record was obtained is recorded.
type The value must be flink.
wildcard_refresh_interval 60s The interval at which wildcard file paths in include_paths are refreshed. Given as a time duration, for example 30s or 2m. This property might be useful under high logging throughputs where log files are rotated faster than the default interval.

What is logged

The logName is derived from the receiver IDs specified in the configuration. Detailed fields inside the LogEntry are as follows.

The flink logs contain the following fields in the LogEntry:

Field Type Description
jsonPayload.message string Log message, including detailed stacktrace where provided.
jsonPayload.source string The source Java class of the log entry.
severity string (LogSeverity) Log entry level (translated).

To ingest metrics from Flink, you must create receivers for the metrics that Flink produces and then create a pipeline for the new receivers.

To configure a receiver for your flink metrics, specify the following fields:

Field Default Description
collection_interval 60s A time.Duration value, such as 30s or 5m.
endpoint http://localhost:8081 The URL exposed by Flink.
type The value must be flink.

What is monitored

The following table provides the list of metrics that the Ops Agent collects from the Flink instance.

Metric type 
Kind, Type
Monitored resources
Labels
workload.googleapis.com/flink.job.checkpoint.count
CUMULATIVEINT64
gce_instance
host_name
job_name
checkpoint
workload.googleapis.com/flink.job.checkpoint.in_progress
GAUGEINT64
gce_instance
host_name
job_name
workload.googleapis.com/flink.job.last_checkpoint.size
GAUGEINT64
gce_instance
host_name
job_name
workload.googleapis.com/flink.job.last_checkpoint.time
GAUGEINT64
gce_instance
host_name
job_name
workload.googleapis.com/flink.job.restart.count
CUMULATIVEINT64
gce_instance
host_name
job_name
workload.googleapis.com/flink.jvm.class_loader.classes_loaded
CUMULATIVEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.cpu.load
GAUGEDOUBLE
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.cpu.time
CUMULATIVEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.gc.collections.count
CUMULATIVEINT64
gce_instance
host_name
resource_type
taskmanager_id
garbage_collector_name
workload.googleapis.com/flink.jvm.gc.collections.time
CUMULATIVEINT64
gce_instance
host_name
resource_type
taskmanager_id
garbage_collector_name
workload.googleapis.com/flink.jvm.memory.direct.total_capacity
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.direct.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.heap.committed
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.heap.max
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.heap.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.mapped.total_capacity
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.mapped.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.metaspace.committed
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.metaspace.max
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.metaspace.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.nonheap.committed
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.nonheap.max
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.memory.nonheap.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.jvm.threads.count
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.memory.managed.total
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.memory.managed.used
GAUGEINT64
gce_instance
host_name
resource_type
taskmanager_id
workload.googleapis.com/flink.operator.record.count
CUMULATIVEINT64
gce_instance
host_name
taskmanager_id
job_name
operator_name
task_name
subtask_index
record
workload.googleapis.com/flink.operator.watermark.output
GAUGEINT64
gce_instance
host_name
job_name
operator_name
subtask_index
task_name
taskmanager_id
workload.googleapis.com/flink.task.record.count
CUMULATIVEINT64
gce_instance
host_name
taskmanager_id
job_name
task_name
subtask_index
record

Sample dashboard

To view your Flink metrics, you must have a chart or dashboard configured. Cloud Monitoring provides a library of sample dashboards for integrations, which contain preconfigured charts. For information about installing these dashboards, see Installing sample dashboards.

Verify the configuration

This section describes how to verify that you correctly configured the Flink receiver. It might take one or two minutes for the Ops Agent to begin collecting telemetry.

To verify that the logs are ingested, go to the Logs Explorer and run the following query to view the Flink logs:

resource.type="gce_instance"
log_id("flink")

What's next

For a walkthrough on how to use Ansible to install the Ops Agent, configure a third-party application, and install a sample dashboard, see the Install the Ops Agent to troubleshoot third-party applications video.