L'integrazione di Apache Kafka raccoglie le metriche dell'intermediario, come le richieste di argomenti e gli errori. Monitora anche le partizioni del broker. L'integrazione raccoglie i log di Kafka e li analizza in un payload JSON. Il risultato include campi per logger, livello e messaggio.
Per ulteriori informazioni su Kafka, consulta kafka.apache.org/.
Prerequisiti
Per raccogliere e importare log e metriche di Kafka, devi installare l'agente operativo versione 2.10.0 o successiva.
Questo ricevitore supporta le versioni di Apache Kafka da 0.8 a 3.0.0.
Configura la tua istanza di Kafka
Per esporre un endpoint JMX, devi impostare la proprietà di sistema com.sun.management.jmxremote.port
all'avvio della JVM. Ti consigliamo inoltre di impostare la
proprietà del sistema com.sun.management.jmxremote.rmi.port
sulla stessa porta. Per
esporre un endpoint JMX da remoto, devi anche impostare la proprietà
di sistema java.rmi.server.hostname
.
Per impostazione predefinita, queste proprietà sono impostate in un file bin/kafka-run-class.sh
di deployment di Kafka.
Per impostare le proprietà di sistema utilizzando argomenti della riga di comando, anteponi il nome della proprietà a -D
all'avvio di JVM. Ad esempio, per impostare
com.sun.management.jmxremote.port
sulla porta 9999
, specifica quanto segue all'avvio di
JVM:
-Dcom.sun.management.jmxremote.port=9999
Configura l'agente operativo per Kafka
Segui la guida per configurare l'agente operativo, aggiungi gli elementi necessari per raccogliere log e metriche dalle istanze di Kafka e riavvia l'agente.
Configurazione di esempio
Il comando seguente crea il file di configurazione per raccogliere e importare log e metriche per Kafka e riavvia l'agente operativo su Linux.
sudo tee /etc/google-cloud-ops-agent/config.yaml > /dev/null << EOF
logging:
receivers:
kafka:
type: kafka
service:
pipelines:
kafka:
receivers:
- kafka
metrics:
receivers:
kafka:
type: kafka
service:
pipelines:
kafka:
receivers:
- kafka
EOF
sudo service google-cloud-ops-agent restart
Configura la raccolta dei log
Per importare i log da Kafka, devi creare dei destinatari per i log prodotti da Kafka, quindi creare una pipeline per i nuovi destinatari. Per configurare un ricevitore per i log kafka
, specifica i seguenti campi:
Campo | predefinita | Descrizione |
---|---|---|
type |
Il valore deve essere kafka . |
|
include_paths |
[/var/log/kafka/*.log] |
Un elenco di percorsi di filesystem da leggere mettendo in coda ogni file. Nei percorsi è possibile utilizzare un carattere jolly (* ), ad esempio /var/log/kafka*/*.log . |
exclude_paths |
Un elenco di pattern di file system da escludere dall'insieme corrispondente da include_paths . |
|
wildcard_refresh_interval |
60s |
L'intervallo a cui vengono aggiornati i percorsi dei file con caratteri jolly in include_paths . È indicato come durata, ad esempio 30s o 2m . Questa proprietà potrebbe essere utile in presenza di velocità effettiva di logging elevate, in cui i file di log vengono ruotati più rapidamente rispetto all'intervallo predefinito. Deve essere un multiplo di 1 s. |
Che cosa viene registrato
I logName
dei log kafka
sono ricavati dagli ID destinatario specificati nella configurazione. Di seguito sono riportati i campi dettagliati all'interno di LogEntry
.
Campo | Tipo | Descrizione |
---|---|---|
jsonPayload.source |
string | Modulo e/o thread in cui ha avuto origine il log. |
jsonPayload.logger |
string | Nome del logger da cui ha avuto origine il log. |
jsonPayload.message |
string | Messaggio di log, incluso stacktrace dettagliato dove fornito. |
severity |
stringa (LogSeverity ) |
Livello di voce di log (tradotto). |
timestamp |
stringa (Timestamp ) |
Ora in cui è stata ricevuta la richiesta. |
I campi vuoti o mancanti non verranno inclusi nella voce di log.
Configurazione della raccolta di metriche
Per raccogliere metriche da Kafka, devi creare un ricevitore per le metriche Kafka, quindi creare una pipeline per il nuovo destinatario. Per configurare un ricevitore per le metriche Kafka, specifica i campi seguenti:
Campo | predefinita | Descrizione |
---|---|---|
type |
Il valore deve essere kafka . |
|
stub_status_url |
localhost:9999 |
URL del servizio JMX oppure host e porta utilizzati per creare l'URL del servizio. Deve essere service:jmx:<protocol>:<sap> o host:port . I valori nel modulo host:port verranno utilizzati per creare un URL di servizio per service:jmx:rmi:///jndi/rmi://<host>:<port>/jmxrmi . |
collect_jvm_metrics |
true |
Consente di configurare il destinatario in modo che raccolga anche le metriche JVM supportate. |
username |
Il nome utente configurato se JMX è configurato in modo da richiedere l'autenticazione. | |
password |
La password configurata se JMX è configurato per richiedere l'autenticazione. | |
collection_interval |
60s |
Un valore time.Duration, ad esempio 30s o 5m . |
Che cosa viene monitorato
La seguente tabella fornisce l'elenco delle metriche che l'agente operativo raccoglie dall'istanza Kafka.
Tipo di metrica | |
---|---|
Tipo, tipo Risorse monitorate |
Etichette |
workload.googleapis.com/kafka.isr.operation.count
|
|
CUMULATIVE , INT64 gce_instance |
operation
|
workload.googleapis.com/kafka.message.count
|
|
CUMULATIVE , INT64 gce_instance |
|
workload.googleapis.com/kafka.network.io
|
|
CUMULATIVE , INT64 gce_instance |
state
|
workload.googleapis.com/kafka.partition.count
|
|
GAUGE , INT64 gce_instance |
|
workload.googleapis.com/kafka.partition.offline
|
|
GAUGE , INT64 gce_instance |
|
workload.googleapis.com/kafka.partition.under_replicated
|
|
GAUGE , INT64 gce_instance |
|
workload.googleapis.com/kafka.purgatory.size
|
|
GAUGE , INT64 gce_instance |
type
|
workload.googleapis.com/kafka.request.count
|
|
CUMULATIVE , INT64 gce_instance |
type
|
workload.googleapis.com/kafka.request.failed
|
|
CUMULATIVE , INT64 gce_instance |
type
|
workload.googleapis.com/kafka.request.time.total
|
|
CUMULATIVE , INT64 gce_instance |
type
|
Verificare la configurazione
Puoi utilizzare Esplora log e Esplora metriche per verificare di aver configurato correttamente il ricevitore Kafka. L'avvio della raccolta di log e metriche da parte dell'agente operativo potrebbe richiedere uno o due minuti.
Per verificare l'importazione dei log, vai a Esplora log ed esegui la query seguente per visualizzare i log di Kafka:
resource.type="gce_instance"
logName=("projects/PROJECT_ID/logs/kafka")
Per verificare l'importazione delle metriche, accedi a
Metrics Explorer
ed esegui la query seguente nella scheda MQL.
fetch gce_instance
| metric 'workload.googleapis.com/kafka.request.count'
| align rate(1m)
| every 1m