L'intégration Apache Kafka collecte des métriques d'agent, telles que les échecs et les requêtes de sujet. Il surveille également les partitions de l'agent. L'intégration collecte les journaux Kafka et les analyse dans une charge utile JSON. Le résultat inclut les champs de logger, de niveau et de message.
Pour en savoir plus sur Kafka, consultez la documentation Apache Kafka.
Prerequisites
Pour collecter les données de télémétrie Kafka, vous devez installer l'agent Ops :
- Pour les métriques, installez la version 2.10.0 ou ultérieure.
- Pour les journaux, installez la version 2.10.0 ou ultérieure.
Cette intégration est compatible avec les versions 0.8 à 3.0.0 de Kafka.
Configurer votre instance Kafka
Pour exposer un point de terminaison JMX, vous devez définir la propriété système com.sun.management.jmxremote.port
lors du démarrage de la JVM. Nous vous recommandons également de définir la propriété système com.sun.management.jmxremote.rmi.port
sur le même port. Pour exposer un point de terminaison JMX à distance, vous devez également définir la propriété système java.rmi.server.hostname
.
Par défaut, ces propriétés sont définies dans le fichier bin/kafka-run-class.sh
d'un déploiement Kafka.
Pour définir les propriétés système à l'aide d'arguments de ligne de commande, ajoutez le préfixe -D
au nom de la propriété lors du démarrage de la JVM. Par exemple, pour définir com.sun.management.jmxremote.port
sur le port 9999
, spécifiez les éléments suivants lors du démarrage de la JVM :
-Dcom.sun.management.jmxremote.port=9999
Configurer l'agent Ops pour Kafka
En suivant le guide de configuration de l'agent Ops, ajoutez les éléments requis pour collecter la télémétrie des instances Kafka, puis redémarrez l'agent.
Exemple de configuration
La commande suivante crée la configuration permettant de collecter et d'ingérer la télémétrie pour Kafka, et redémarre l'agent Ops.
Configurer la collecte de journaux
Pour ingérer des journaux à partir de Kafka, vous devez créer des récepteurs pour les journaux produits par Kafka, puis créer un pipeline pour les nouveaux récepteurs.
Pour configurer un récepteur pour vos journaux kafka
, spécifiez les champs suivants :
Champ | Par défaut | Description |
---|---|---|
exclude_paths |
Liste des formats de chemin d'accès au système de fichiers à exclure de l'ensemble correspondant à include_paths . |
|
include_paths |
[/var/log/kafka/*.log] |
Liste des chemins d'accès du système de fichiers à lire en affichant chaque fichier. Un caractère générique * peut être utilisé dans les chemins d'accès. Exemple : /var/log/kafka*/*.log . |
record_log_file_path |
false |
Si cette valeur est définie sur true , le chemin d'accès au fichier spécifique à partir duquel l'enregistrement de journal a été obtenu apparaît dans l'entrée de journal de sortie en tant que valeur du libellé agent.googleapis.com/log_file_path . Lorsque vous utilisez un caractère générique, seul le chemin du fichier à partir duquel l'enregistrement a été obtenu est enregistré. |
type |
Cette valeur doit être kafka . |
|
wildcard_refresh_interval |
60s |
Intervalle d'actualisation pour les chemins d'accès de fichiers utilisant des caractères génériques dans include_paths . Donné en fonction de la durée d'analyse par time.ParseDuration, par exemple 30s ou 2m Cette propriété peut s'avérer utile lorsque le débit de journalisation est élevé et que les fichiers journaux sont alternés plus rapidement que l'intervalle par défaut. |
Contenu consigné
Le champ logName
est dérivé des ID de récepteur spécifiés dans la configuration. Les champs détaillés dans l'entrée de journal (LogEntry
) sont les suivants.
Les journaux kafka
contiennent les champs suivants dans LogEntry
:
Champ | Type | Description |
---|---|---|
jsonPayload.level |
chaîne (LogSeverity ) |
Niveau de l'entrée de journal |
jsonPayload.logger |
chaîne (Timestamp ) |
Nom du logger d'où provient le journal. |
jsonPayload.message |
chaîne | Message de journal, y compris la trace détaillée de la pile, le cas échéant |
jsonPayload.source |
chaîne | Module et/ou thread d'où provient le journal. |
severity |
chaîne | Niveau d'entrée de journal (traduit). |
timestamp |
chaîne | Heure à laquelle la requête a été reçue. |
Configurer la collecte de métriques
Pour ingérer des métriques à partir de Kafka, vous devez créer des récepteurs pour les métriques produites par Kafka, puis créer un pipeline pour les nouveaux récepteurs.
Pour configurer un récepteur pour vos métriques kafka
, spécifiez les champs suivants :
Champ | Par défaut | Description |
---|---|---|
stub_status_url |
localhost:9999 |
L'URL du service JMX, ou l'hôte et le port utilisés pour créer l'URL du service. Doit être au format service:jmx:<protocol>:<sap> ou host:port . Les valeurs du formulaire host:port seront utilisées pour créer une URL de service de service:jmx:rmi:///jndi/rmi://<host>:<port>/jmxrmi . |
collect_jvm_metrics |
true |
Cela configure le récepteur de manière à ce qu'il collecte également les métriques JVM compatibles. |
collection_interval |
60s |
Une valeur time duration, telle que 30s ou 5m . |
password |
Le mot de passe configuré si JMX est configuré pour exiger une authentification. | |
stub_status_url |
localhost:9999 |
L'URL du service JMX, ou l'hôte et le port utilisés pour créer l'URL du service. Cette valeur doit être au format service:jmx: |
type |
Cette valeur doit être kafka . |
|
username |
Le nom d'utilisateur configuré si JMX est configuré pour exiger une authentification. |
Métriques surveillées
Le tableau suivant fournit la liste des métriques que l'agent Ops collecte à partir de l'instance Kafka.
Type de métrique | |
---|---|
Genre, type Ressources surveillées |
Libellés |
workload.googleapis.com/kafka.isr.operation.count
|
|
CUMULATIVE , INT64 gce_instance |
operation
|
workload.googleapis.com/kafka.message.count
|
|
CUMULATIVE , INT64 gce_instance |
|
workload.googleapis.com/kafka.network.io
|
|
CUMULATIVE , INT64 gce_instance |
state
|
workload.googleapis.com/kafka.partition.count
|
|
GAUGE , INT64 gce_instance |
|
workload.googleapis.com/kafka.partition.offline
|
|
GAUGE , INT64 gce_instance |
|
workload.googleapis.com/kafka.partition.under_replicated
|
|
GAUGE , INT64 gce_instance |
|
workload.googleapis.com/kafka.purgatory.size
|
|
GAUGE , INT64 gce_instance |
type
|
workload.googleapis.com/kafka.request.count
|
|
CUMULATIVE , INT64 gce_instance |
type
|
workload.googleapis.com/kafka.request.failed
|
|
CUMULATIVE , INT64 gce_instance |
type
|
workload.googleapis.com/kafka.request.time.total
|
|
CUMULATIVE , INT64 gce_instance |
type
|
Exemple de tableau de bord
Pour afficher vos métriques Kafka, vous devez configurer un graphique ou un tableau de bord. Cloud Monitoring fournit une bibliothèque d'exemples de tableaux de bord pour les intégrations, contenant des graphiques préconfigurés. Pour en savoir plus sur l'installation de ces tableaux de bord, consultez la page Installer des exemples de tableaux de bord.
Vérifier la configuration
Cette section explique comment vérifier que vous avez bien configuré le récepteur Kafka. La collecte de la télémétrie par l'agent Ops peut prendre une ou deux minutes.
Pour vérifier que les journaux sont ingérés, accédez à l'explorateur de journaux et exécutez la requête suivante pour afficher les journaux Kafka :
resource.type="gce_instance"
log_id("kafka")
Pour vérifier que les métriques sont ingérées, accédez à l'explorateur de métriques et exécutez la requête suivante dans l'onglet MQL :
fetch gce_instance
| metric 'workload.googleapis.com/kafka.message.count'
| every 1m
Étape suivante
Pour accéder à un tutoriel pas à pas expliquant comment utiliser Ansible pour installer l'agent Ops, configurer une application tierce et installer un exemple de tableau de bord, consultez la vidéo Install the Ops Agent to troubleshoot third-party applications (Installer l'agent Ops pour résoudre les problèmes liés à des applications tierces).