[apache-kafka] 실행하기 전에 토픽에서 모든 데이터를 삭제하거나 토픽을 삭제하는 방법이 있습니까?

실행하기 전에 토픽에서 모든 데이터를 삭제하거나 토픽을 삭제하는 방법이 있습니까?

logRetentionHours속성 을 변경하기 위해 KafkaConfig.scala 파일을 수정할 수 있습니까 ? 소비자가 메시지를 읽는 즉시 메시지가 삭제되는 방법이 있습니까?

생산자를 사용하여 어딘가에서 데이터를 가져오고 소비자가 소비하는 특정 주제로 데이터를 보내고 있는데, 실행할 때마다 해당 주제의 모든 데이터를 삭제할 수 있습니까? 나는 주제에 매번 새로운 데이터만을 원합니다. 어떻게 든 주제를 다시 초기화하는 방법이 있습니까?



답변

아직 지원되지 않는다고 생각하지 마십시오. JIRA 문제 “Add delete topic support”를 살펴보십시오 .

수동으로 삭제하려면 :

  1. 클러스터 종료
  2. kafka 로그 디렉토리 ( log.dirkafka 구성 파일 의 속성에 의해 지정됨 )와 사육사 데이터를 정리합니다.
  3. 클러스터 다시 시작

주어진 주제에 대해 할 수있는 것은

  1. 카프카 중지
  2. 파티션에 특정한 kafka 로그를 정리합니다. kafka는 로그 파일을 “logDir / topic-partition”형식으로 저장하므로 “MyTopic”이라는 주제의 경우 파티션 ID 0에 대한 로그 가 속성으로 지정된 /tmp/kafka-logs/MyTopic-0위치에 저장됩니다 ./tmp/kafka-logslog.dir
  3. kafka 다시 시작

이것은 NOT좋고 권장되는 접근 방식이지만 작동합니다. Kafka 브로커 구성 파일에서 log.retention.hours.per.topic속성은 다음을 정의하는 데 사용됩니다.The number of hours to keep a log file before deleting it for some specific topic

또한 소비자가 읽는 즉시 메시지가 삭제되는 방법이 있습니까?

로부터 카프카 문서 :

Kafka 클러스터는 사용 여부에 관계없이 게시 된 모든 메시지를 구성 가능한 기간 동안 유지합니다. 예를 들어 로그 보존이 2 일로 설정된 경우 메시지가 게시 된 후 2 일 동안 사용할 수 있으며 그 이후에는 공간을 확보하기 위해 폐기됩니다. Kafka의 성능은 데이터 크기와 관련하여 효과적으로 일정하므로 많은 데이터를 유지하는 것은 문제가되지 않습니다.

실제로 소비자별로 유지되는 유일한 메타 데이터는 “오프셋”이라고하는 로그에서 소비자의 위치입니다. 이 오프셋은 소비자에 의해 제어됩니다. 일반적으로 소비자는 메시지를 읽을 때 오프셋을 선형으로 진행하지만 실제로 위치는 소비자가 제어하며 원하는 순서대로 메시지를 소비 할 수 있습니다. 예를 들어 소비자는 이전 오프셋으로 재설정하여 재 처리 할 수 ​​있습니다.

시작을 찾기위한 카프카 0.8 읽어 오프셋 단순 소비자 예를 그들이 말하는을

Kafka는 도움이되는 두 개의 상수를 포함 kafka.api.OffsetRequest.EarliestTime()하고 있으며, 로그에서 데이터의 시작 부분을 찾고 거기에서 스트리밍을 시작하고 kafka.api.OffsetRequest.LatestTime()새 메시지 만 스트리밍합니다.

소비자 측에서 오프셋을 관리하기위한 예제 코드도 찾을 수 있습니다.

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                 long whichTime, String clientName) {
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);

    if (response.hasError()) {
        System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
        return 0;
    }
    long[] offsets = response.offsets(topic, partition);
    return offsets[0];
}


답변

내가 여기서 언급했듯이 Purge Kafka Queue :

빠른 시작 예제를 위해 Kafka 0.8.2에서 테스트되었습니다. 먼저 config 폴더 아래의 server.properties 파일에 한 줄을 추가합니다.

delete.topic.enable=true

그런 다음 다음 명령을 실행할 수 있습니다.

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test


답변

kafka 0.10으로 테스트

1. stop zookeeper & Kafka server,
2. then go to 'kafka-logs' folder , there you will see list of kafka topic folders, delete folder with topic name
3. go to 'zookeeper-data' folder , delete data inside that.
4. start zookeeper & kafka server again.

참고 : zookeeper-data 폴더가 아닌 kafka-logs 내부의 주제 폴더를 삭제하는 경우 주제가 여전히 존재하는 것을 볼 수 있습니다.


답변

더러운 해결 방법으로 주제별 런타임 보존 설정을 조정할 수 있습니다 bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my_topic --config retention.bytes=1( 예 : retention.bytes = 0 도 작동 할 수 있음).

잠시 후 kafka가 공간을 확보해야합니다. 이것이 주제를 다시 만드는 것과 비교하여 어떤 의미가 있는지 확실하지 않습니다.

추신. 카프카 청소가 끝나면 보존 설정을 다시 가져 오는 것이 좋습니다.

retention.ms기록 데이터를 유지 하는 데 사용할 수도 있습니다.


답변

다음은 localhost를 zookeeper 서버로 가정하고 Kafka_Home이 설치 디렉토리로 설정되어 있다고 가정하고 Kafka 토픽을 비우고 삭제하는 스크립트입니다.

이 스크립트는 아래의 것 비우 1 초 자사의 유지 시간을 설정 한 후 설정을 제거하여 주제를 :

#!/bin/bash
echo "Enter name of topic to empty:"
read topicName
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --add-config retention.ms=1000
sleep 5
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --delete-config retention.ms

토픽 을 완전히 삭제 하려면 적용 가능한 모든 kafka 브로커를 중지하고 kafka 로그 디렉토리 (기본값 : / tmp / kafka-logs)에서 해당 디렉토리를 제거한 다음이 스크립트를 실행하여 zookeeper에서 토픽을 제거해야합니다. zookeeper에서 삭제되었는지 확인하려면 ls / brokers / topics의 출력에 더 이상 주제가 포함되지 않아야합니다.

#!/bin/bash
echo "Enter name of topic to delete from zookeeper:"
read topicName
/$Kafka_Home/bin/zookeeper-shell localhost:2181 <<EOF
rmr /brokers/topics/$topicName
ls /brokers/topics
quit
EOF


답변

우리는 중간 수준의 성공으로 다른 답변이 설명하는 것을 거의 시도했습니다. 우리에게 실제로 효과가 있었던 것은 (Apache Kafka 0.8.1) class 명령입니다.

sh kafka-run-class.sh kafka.admin.DeleteTopicCommand –topic yourtopic –zookeeper localhost : 2181


답변

Brew 사용자 용

brew나처럼 사용 하고 악명 높은 kafka-logs폴더를 검색하는 데 많은 시간을 낭비 했다면 더 이상 두려워하지 마십시오 . (그리고 그것이 당신과 Homebrew, Kafka 등의 여러 버전에서 작동하는지 알려주십시오 :))

아마도 다음에서 찾을 수 있습니다.

위치:

/usr/local/var/lib/kafka-logs


실제로 그 길을 찾는 방법

(이것은 기본적으로 brew를 통해 설치하는 모든 앱에도 유용합니다)

1) brew services list

kafka가 matbhz /Users/matbhz/Library/LaunchAgents/homebrew.mxcl.kafka.plist를 시작했습니다.

2) plist위에서 찾은 내용을 열고 읽습니다.

3) server.properties내 경우에는 위치를 정의하는 라인을 찾으십시오 .

  • /usr/local/etc/kafka/server.properties

4) log.dirs라인을 찾으십시오 .

log.dirs = / usr / local / var / lib / kafka-logs

5) 해당 위치로 이동하여 원하는 주제에 대한 로그를 삭제하십시오.

6) Kafka를 다시 시작하십시오. brew services restart kafka