Apache Kafka 시리즈
- Apache Kafka 기본 개념 - 메시징 시스템의 이해
- Apache Kafka 파티션과 컨슈머 그룹
- Apache Kafka 복제와 고가용성 - 데이터 안정성 보장
- Apache Kafka 성능 튜닝과 운영 - 최적화
- Spring Boot와 KRaft를 활용한 Apache Kafka 개발
- Apache Kafka 모니터링과 운영 - Prometheus & Grafana ← 현재 글
개요
- Prometheus와 Grafana를 활용한 Kafka 클러스터 모니터링 구축과 Alert Manager를 통한 실시간 알림 설정 방법을 설명함
- 브로커, 프로듀서, 컨슈머의 핵심 메트릭 지표 분석과 운영 중 발생할 수 있는 주요 이슈 대응 방안을 다룸
모니터링 환경 구성
- Kafka JMX 메트릭은 Prometheus가 직접 수집 불가
- Kafka Exporter를 사이드카 패턴으로 배치하여 Prometheus 호환 포맷으로 변환 필요
Docker Compose 설정
docker-compose.yml에 kafka-exporter 서비스 추가
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# docker-compose.yml
# ... 기존 kafka 서비스들 ...
# Kafka Exporter (추가)
# Kafka 클러스터 상태를 Prometheus가 이해할 수 있는 포맷으로 변환
kafka-exporter:
image: danielqsj/kafka-exporter
container_name: kafka-exporter
command:
- --kafka.server=kafka1:29092
- --kafka.server=kafka2:29093
- --kafka.server=kafka3:29094
ports:
- "9308:9308"
depends_on:
- kafka1
- kafka2
- kafka3
Prometheus 설정
prometheus/prometheus.yml에 수집 대상(Exporter, Actuator) 등록
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# prometheus/prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
# 1. Kafka 클러스터 메트릭 (Kafka Exporter)
- job_name: "kafka-cluster"
static_configs:
- targets: ["kafka-exporter:9308"]
# 2. Spring Boot 애플리케이션 메트릭 (Actuator)
- job_name: "spring-boot-app"
metrics_path: "/actuator/prometheus"
scrape_interval: 5s
static_configs:
# Docker 내부 통신용 주소 (Mac/Windows Docker Desktop)
- targets: ["host.docker.internal:8080"]
# Linux 환경인 경우 호스트 IP 사용 필요 (예: 172.17.0.1:8080)
Grafana 시각화 구축
데이터 소스 연결
- 브라우저 접속
http://localhost:3000(admin/admin)
- Connections -> Data Sources -> Add data source
- Prometheus 선택
- Connection URL
http://prometheus:9090(Docker 컨테이너명)
- Save & test 클릭 (성공 메시지 확인)
대시보드 가져오기 (Import)
- 커뮤니티 검증 대시보드 활용 권장
- Dashboards -> New -> Import
- 추천 대시보드 ID 입력
- ID: 7589 (Kafka Exporter Overview)
- ID: 11378 (JVM Micrometer)
- Load 클릭 후 Data Source로 Prometheus 선택
- Import 완료
모니터링 대시보드
Grafana 대시보드
- 핵심 메트릭
- 브로커 상태
- 활성 파티션 수
- 리더 수
- 메시지 처리량
- 초당 메시지 수
- 바이트 처리량
- 성능 지표
- 요청 처리 시간
- 실패율
- 컨슈머 상태
- Lag
- 오프셋 커밋 성공률
- 리소스 사용
- CPU/메모리 사용률
- 디스크 I/O
알림 규칙
rules/kafka_alerts.yml설정
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
groups:
- name: kafka_system_alerts # 브로커 및 시스템 상태 모니터링
rules:
- alert: KafkaBrokerDown
expr: up{job="kafka"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Kafka 브로커 다운"
description: "브로커 {{ $labels.instance }}가 응답하지 않습니다"
- alert: KafkaUnderReplicatedPartitions
expr: kafka_server_replicamanager_underreplicated_partitions > 0
for: 5m
labels:
severity: warning
annotations:
summary: "복제 파티션 부족"
description: "브로커 {{ $labels.instance }}에 복제가 부족한 파티션이 있습니다"
- alert: KafkaHighCPU
expr: rate(process_cpu_seconds_total[5m]) > 0.8
for: 5m
labels:
severity: warning
annotations:
summary: "높은 CPU 사용률"
description: "인스턴스 {{ $labels.instance }}의 CPU 사용률이 80%를 초과했습니다"
- name: kafka_consumer_alerts # 컨슈머 그룹별 처리 지연 모니터링
rules:
# 실시간 처리 그룹
- alert: KafkaRealtimeProcessorLag
expr: kafka_consumergroup_lag{group="realtime-processor"} > 200
for: 1m
labels:
severity: critical
consumer_group: realtime-processor
annotations:
summary: "실시간 처리 지연 발생"
description: |
실시간 처리 그룹에서 처리 지연이 발생했습니다.
- 컨슈머 그룹: {{ $labels.group }}
- 토픽: {{ $labels.topic }}
- 현재 Lag: {{ $value | printf "%.0f" }}건
# DB 저장 그룹 (일반 처리)
- alert: KafkaDatabaseProcessorLag
expr: kafka_consumergroup_lag{group="database-processor"} > 1000
for: 5m
labels:
severity: warning
consumer_group: database-processor
annotations:
summary: "DB 저장 처리 지연 발생"
description: |
DB 저장 그룹에서 처리 지연이 발생했습니다.
- 컨슈머 그룹: {{ $labels.group }}
- 토픽: {{ $labels.topic }}
- 현재 Lag: {{ $value | printf "%.0f" }}건
# 분석 처리 그룹 (배치 처리)
- alert: KafkaAnalyticsProcessorLag
expr: kafka_consumergroup_lag{group="analytics-processor"} > 5000
for: 10m
labels:
severity: info
consumer_group: analytics-processor
annotations:
summary: "분석 처리 지연 발생"
description: |
분석 처리 그룹에서 배치 처리 지연이 발생했습니다.
- 컨슈머 그룹: {{ $labels.group }}
- 토픽: {{ $labels.topic }}
- 현재 Lag: {{ $value | printf "%.0f" }}건
테스트 및 운영
성능 테스트
-
KafkaLoadTest.java- 부하 테스트1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
@SpringBootTest public class KafkaLoadTest { @Autowired private KafkaTemplate<String, SensorData> kafkaTemplate; @Test public void loadTest() throws InterruptedException { int messageCount = 100000; // 병렬 메시지 전송 int threadCount = 32; // 32개 스레드로 부하 생성 (가상 사용자 32명) ExecutorService executor = Executors.newFixedThreadPool(threadCount); CountDownLatch latch = new CountDownLatch(messageCount); // 동시성 처리를 위해 Thread-Safe 리스트 사용 List<Long> latencies = Collections.synchronizedList(new ArrayList<>()); long start = System.currentTimeMillis(); // 병렬 메시지 전송 for (int i = 0; i < messageCount; i++) { final int index = i; executor.submit(() -> { SensorData data = new SensorData(); data.setSensorId("TEST-" + index); data.setTemperature(20.0 + Math.random() * 10); data.setHumidity(40.0 + Math.random() * 20); data.setTimestamp(LocalDateTime.now()); long sendTime = System.nanoTime(); kafkaTemplate.send("sensor-data", data.getSensorId(), data) .whenComplete((result, ex) -> { if (ex == null) { // 지연 시간 측정 (마이크로초 단위) long latency = (System.nanoTime() - sendTime) / 1000; latencies.add(latency); latch.countDown(); } else { log.error("Failed to send message", ex); latch.countDown(); // 에러 발생 시에도 카운트 감소 } }); }); } // 최대 1분간 대기 boolean completed = latch.await(1, TimeUnit.MINUTES); long end = System.currentTimeMillis(); // 성능 지표 계산 double duration = (end - start) / 1000.0; double throughput = messageCount / duration; // 지연 시간 통계 DoubleSummaryStatistics stats = latencies.stream() .mapToDouble(Long::doubleValue) .summaryStatistics(); // 결과 출력 log.info("성능 테스트 결과:"); log.info("- 총 메시지: {} (성공: {})", messageCount, latencies.size()); log.info("- 처리 시간: {:.2f}초", duration); log.info("- 처리율: {:.0f} msg/sec", throughput); log.info("- 평균 지연: {:.2f}µs", stats.getAverage()); log.info("- 최대 지연: {:.2f}µs", stats.getMax()); log.info("- 최소 지연: {:.2f}µs", stats.getMin()); } }