Apache Kafka 통합은 주제 요청 및 실패와 같은 브로커 측정항목을 수집합니다. 또한 브로커에서 파티션을 모니터링합니다. 통합은 Kafka 로그를 수집하고 이를 JSON 페이로드로 파싱합니다. 결과에는 로거, 수준, 메시지 필드가 포함됩니다.
Kafka에 대한 자세한 내용은 kafka.apache.org/를 참조하세요.
기본 요건
Kafka 로그와 측정항목을 수집하려면 작업 에이전트 버전 2.10.0 이상을 설치해야 합니다.
이 수신자는 Apache Kafka 버전 0.8~3.0.0을 지원합니다.
Kafka 인스턴스 구성
JMX 엔드포인트를 노출하려면 JVM을 시작할 때 com.sun.management.jmxremote.port
시스템 속성을 설정해야 합니다. 또한 com.sun.management.jmxremote.rmi.port
시스템 속성을 동일한 포트로 설정하는 것이 좋습니다. JMX 엔드포인트를 원격으로 노출하려면 java.rmi.server.hostname
시스템 속성도 설정해야 합니다.
기본적으로 이러한 속성은 Kafka 배포의 bin/kafka-run-class.sh
파일에 설정합니다.
명령줄 인수를 사용하여 시스템 속성을 설정하려면 JVM을 시작할 때 속성 이름 앞에 -D
를 붙입니다. 예를 들어 com.sun.management.jmxremote.port
를 포트 9999
로 설정하려면 JVM을 시작할 때 다음을 지정합니다.
-Dcom.sun.management.jmxremote.port=9999
Kafka용 작업 에이전트 구성
작업 에이전트 구성 가이드에 따라 Kafka 인스턴스에서 로그와 측정항목을 수집하는 데 필요한 요소를 추가하고 에이전트를 다시 시작합니다.
구성 예시
다음 명령어는 구성 파일을 만들어 Kafka의 로그와 측정항목을 수집하고 Linux에서 작업 에이전트를 다시 시작합니다.
sudo tee /etc/google-cloud-ops-agent/config.yaml > /dev/null << EOF
logging:
receivers:
kafka:
type: kafka
service:
pipelines:
kafka:
receivers:
- kafka
metrics:
receivers:
kafka:
type: kafka
service:
pipelines:
kafka:
receivers:
- kafka
EOF
sudo service google-cloud-ops-agent restart
로그 수집 구성
Kafka에서 로그를 수집하려면 Kafka로 생성되는 로그의 수신자를 만든 후 새 수신자의 파이프라인을 만들어야 합니다. kafka
로그의 수신자를 구성하려면 다음 필드를 지정합니다.
필드 | 기본값 | 설명 |
---|---|---|
type |
값은 kafka 여야 합니다. |
|
include_paths |
[/var/log/kafka/*.log] |
각 파일을 테일링하여 읽을 파일 시스템 경로의 목록입니다. 와일드 카드(* )를 경로에 사용할 수 있습니다(예: /var/log/kafka*/*.log ). |
exclude_paths |
include_paths 중에서 일치하는 집합에서 제외할 파일 시스템 경로 패턴의 목록입니다. |
|
wildcard_refresh_interval |
60s |
include_paths 의 와일드 카드 파일 경로가 새로 고쳐지는 간격입니다. 기간(예: 30s 또는 2m )으로 지정됩니다. 이 속성은 로그 파일이 기본 간격보다 빠르게 순환되는 높은 로깅 처리량에서 유용할 수 있습니다. 1의 배수여야 합니다. |
로깅되는 내용
kafka
로그의 logName
은 구성에 지정된 수신자 ID에서 파생됩니다. LogEntry
내의 자세한 필드는 다음과 같습니다.
필드 | 유형 | 설명 |
---|---|---|
jsonPayload.source |
문자열 | 로그가 시작된 모듈 또는 스레드입니다. |
jsonPayload.logger |
문자열 | 로그가 시작된 로거의 이름입니다. |
jsonPayload.message |
문자열 | 로그 메시지이며 제공되는 경우 상세 스택 추적을 포함합니다. |
severity |
문자열(LogSeverity ) |
로그 항목 수준입니다(번역됨). |
timestamp |
문자열(Timestamp ) |
요청이 수신된 시간입니다. |
비어 있거나 누락된 필드는 로그 항목에 표시되지 않습니다.
측정항목 수집 구성
Kafka에서 측정항목을 수집하려면 Kafka 측정항목의 수신자를 만든 후 새 수신자의 파이프라인을 만들어야 합니다. Kafka 측정항목의 수신자를 구성하려면 다음 필드를 지정합니다.
필드 | 기본값 | 설명 |
---|---|---|
type |
값은 kafka 여야 합니다. |
|
stub_status_url |
localhost:9999 |
서비스 URL을 구성하는 데 사용되는 JMX 서비스 URL 또는 호스트 및 포트입니다. service:jmx:<protocol>:<sap> 또는 host:port 형식이어야 합니다. host:port 형식의 값이 service:jmx:rmi:///jndi/rmi://<host>:<port>/jmxrmi 의 서비스 URL을 만드는 데 사용됩니다. |
collect_jvm_metrics |
true |
지원되는 JVM 측정항목도 수집하도록 수신자를 구성합니다. |
username |
JMX가 인증을 요구하도록 구성된 경우 구성된 사용자 이름입니다. | |
password |
JMX가 인증을 요구하도록 구성된 경우 구성된 비밀번호입니다. | |
collection_interval |
60s |
time.Duration 값(예: 30s 또는 5m )입니다. |
모니터링 대상
다음 표에서는 작업 에이전트가 Kafka 인스턴스에서 수집하는 측정항목의 목록을 보여줍니다.
측정항목 유형 | |
---|---|
종류, 유형 모니터링 리소스 |
라벨 |
workload.googleapis.com/kafka.isr.operation.count
|
|
CUMULATIVE , INT64 gce_instance |
operation
|
workload.googleapis.com/kafka.message.count
|
|
CUMULATIVE , INT64 gce_instance |
|
workload.googleapis.com/kafka.network.io
|
|
CUMULATIVE , INT64 gce_instance |
state
|
workload.googleapis.com/kafka.partition.count
|
|
GAUGE , INT64 gce_instance |
|
workload.googleapis.com/kafka.partition.offline
|
|
GAUGE , INT64 gce_instance |
|
workload.googleapis.com/kafka.partition.under_replicated
|
|
GAUGE , INT64 gce_instance |
|
workload.googleapis.com/kafka.purgatory.size
|
|
GAUGE , INT64 gce_instance |
type
|
workload.googleapis.com/kafka.request.count
|
|
CUMULATIVE , INT64 gce_instance |
type
|
workload.googleapis.com/kafka.request.failed
|
|
CUMULATIVE , INT64 gce_instance |
type
|
workload.googleapis.com/kafka.request.time.total
|
|
CUMULATIVE , INT64 gce_instance |
type
|
구성 확인
로그 탐색기와 측정항목 탐색기를 사용하여 Kafka 수신자를 올바르게 구성했는지 확인할 수 있습니다. 작업 에이전트가 로그 및 측정항목 수집을 시작하려면 1~2분 정도 걸릴 수 있습니다.
로그가 수집되었는지 확인하려면 로그 탐색기로 이동하고 다음 쿼리를 실행하여 Kafka 로그를 확인합니다.
resource.type="gce_instance"
logName=("projects/PROJECT_ID/logs/kafka")
측정항목이 수집되었는지 확인하려면 측정항목 탐색기로 이동하고 MQL 탭에서 다음 쿼리를 실행합니다.
fetch gce_instance
| metric 'workload.googleapis.com/kafka.request.count'
| align rate(1m)
| every 1m