[java] Java, Apache Kafka에서 주제의 메시지 수를 얻는 방법

메시징에 Apache kafka를 사용하고 있습니다. 저는 생산자와 소비자를 Java로 구현했습니다. 주제의 메시지 수를 어떻게 얻을 수 있습니까?



답변

소비자의 관점에서 이것을 염두에 두는 유일한 방법은 실제로 메시지를 소비하고 그 다음 카운트하는 것입니다.

Kafka 브로커는 시작 이후 수신 된 메시지 수에 대한 JMX 카운터를 노출하지만 이미 얼마나 많은 메시지가 제거되었는지 알 수 없습니다.

대부분의 일반적인 시나리오에서 Kafka의 메시지는 무한 스트림으로 가장 잘 보이며 현재 디스크에 보관되는 개별 값을 얻는 것은 관련이 없습니다. 더욱이 토픽에서 메시지의 하위 집합을 모두 가지고있는 브로커 클러스터를 다룰 때 상황이 더 복잡해집니다.


답변

Java는 아니지만 유용 할 수 있습니다.

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
  --broker-list <broker>:  <port>
  --topic <topic-name> --time -1 --offsets 1
  | awk -F  ":" '{sum += $3} END {print sum}'


답변

나는 실제로 이것을 내 POC 벤치마킹에 사용합니다. ConsumerOffsetChecker를 사용하려는 항목입니다. 아래와 같이 bash 스크립트를 사용하여 실행할 수 있습니다.

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker  --topic test --zookeeper localhost:2181 --group testgroup

결과는 다음과 같습니다
여기에 이미지 설명 입력
. 빨간색 상자에서 볼 수 있듯이 999는 현재 주제에있는 메시지 수입니다.

업데이트 : ConsumerOffsetChecker는 0.10.0부터 더 이상 사용되지 않으므로 ConsumerGroupCommand 사용을 시작하는 것이 좋습니다.


답변

예를 들어 사용자 지정 파티 셔 너를 테스트 할 때 각 파티션의 메시지 수를 아는 데 관심이 있습니다. 다음 단계는 Confluent 3.2의 Kafka 0.10.2.1-2에서 작동하도록 테스트되었습니다. Kafka 주제 kt와 다음 명령 줄이 제공됩니다.

$ kafka-run-class kafka.tools.GetOffsetShell \
  --broker-list host01:9092,host02:9092,host02:9092 --topic kt

그러면 세 파티션의 메시지 수를 보여주는 샘플 출력이 인쇄됩니다.

kt:2:6138
kt:1:6123
kt:0:6137

행 수는 토픽의 파티션 수에 따라 더 많거나 적을 수 있습니다.


답변

이후 ConsumerOffsetChecker더 이상 지원되지 않습니다, 당신은 항목의 모든 메시지를 확인하려면이 명령을 사용할 수 없습니다 :

bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \
    --group my-group \
    --bootstrap-server localhost:9092 \
    --describe

LAG토픽 파티션의 메시지 수는 어디에 있습니까?

여기에 이미지 설명 입력

또한 kafkacat을 사용해 볼 수 있습니다 . 이것은 토픽과 파티션에서 메시지를 읽고 stdout으로 출력하는 데 도움이 될 수있는 오픈 소스 프로젝트입니다. 다음은 sample-kafka-topic주제 에서 마지막 10 개의 메시지를 읽은 다음 종료 하는 샘플입니다 .

kafkacat -b localhost:9092 -t sample-kafka-topic -p 0 -o -10 -e


답변

https://prestodb.io/docs/current/connector/kafka-tutorial.html 사용

Facebook에서 제공하는 슈퍼 SQL 엔진으로 여러 데이터 소스 (Cassandra, Kafka, JMX, Redis …)에서 연결됩니다.

PrestoDB는 선택적 작업자가있는 서버로 실행되고 (추가 작업자가없는 독립형 모드가 있음), 작은 실행 가능한 JAR (presto CLI라고 함)을 사용하여 쿼리를 수행합니다.

Presto 서버를 잘 구성했으면 기존 SQL을 사용할 수 있습니다.

SELECT count(*) FROM TOPIC_NAME;


답변

주제의 모든 파티션에서 처리되지 않은 메시지를 가져 오는 Apache Kafka 명령 :

kafka-run-class kafka.tools.ConsumerOffsetChecker
    --topic test --zookeeper localhost:2181
    --group test_group

인쇄물:

Group      Topic        Pid Offset          logSize         Lag             Owner
test_group test         0   11051           11053           2               none
test_group test         1   10810           10812           2               none
test_group test         2   11027           11028           1               none

6 열은 처리되지 않은 메시지입니다. 다음과 같이 추가하십시오.

kafka-run-class kafka.tools.ConsumerOffsetChecker
    --topic test --zookeeper localhost:2181
    --group test_group 2>/dev/null | awk 'NR>1 {sum += $6}
    END {print sum}'

awk는 행을 읽고 헤더 행을 건너 뛰고 6 번째 열을 더하고 끝에 합계를 인쇄합니다.

인쇄물

5