kafka刪除topic資料

2021-09-22 07:33:47 字數 3556 閱讀 7902

生產環境中,有乙個topic的資料量非常大。這些資料不是非常重要,需要定期清理。

要求:預設保持24小時,某些topic 需要保留2小時或者6小時

主要有3個:

1. 基於時間

2. 基於日誌大小

3. 基於日誌起始偏移量

接下來,主要介紹基於時間的清除!

kafka版本為:  2.11-1.1.0

zk版本為:  3.4.13

# 啟用刪除主題

delete.topic.enable=true

# 檢查日誌段檔案的間隔時間,以確定是否檔案屬性是否到達刪除要求。

log.retention.check.interval.ms=1000

注意:這2行配置必須存在,否則清除策略失效!log.retention.check.interval.ms 引數的單位是微秒,這裡表示間隔1秒鐘

在 server.properties 檔案中配置的是全域性策略,針對每乙個topic

針對單個topic策略,需要使用指令碼kafka-configs.sh

此指令碼不需要重啟kafka就會生效!

首先來檢視一下,當前的topic策略,比如test

bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --describe --entity-type topics --entity-name test
引數解釋:

--describe  詳細資訊

--entity-type 實體型別

--entity-name 指定topic名

輸出:

configs for topic '

test

' are

這個表示為策略為空

如果需要刪除topic所有資料,使用命令

bin/kafka-topics.sh --delete --topic test --zookeeper zookeeper-1.default.svc.cluster.local:2181
這個命令,請謹慎執行!!!如果想保留主題,只刪除主題現有資料(log)。可以通過修改資料保留時間實現

bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --entity-type topics --entity-name test --alter --add-config retention.ms=10000
執行輸出:

completed updating config for entity: topic '

test

'.

注意:修改保留時間為10秒,但不是修改後10秒就馬上刪掉,kafka是採用輪訓的方式,輪訓到這個topic發現10秒前的資料都是刪掉。時間由server.properties裡面的log.retention.check.interval.ms選項為主

再次檢視topic策略

bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --describe --entity-type topics --entity-name test
輸出:

configs for topic '

test

' are retention.ms=10000

發現目前的刪除策略為 retention.ms=10000

如果需要刪除上面的10秒策略,使用以下命令:

bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --entity-type topics --entity-name test --alter --delete-config retention.ms
輸出:

completed updating config for entity: topic '

test

'.

再次檢視topic策略

bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --describe --entity-type topics --entity-name test
輸出:

configs for topic '

test

' are

發現策略為空,說明刪除成功了!

第一步,設定清除策略為保留10秒

第二步,進入生產者模式,輸入訊息 a

第三步,等待5秒,再次進入生產者模式,輸入訊息 b

第四部,進入消費者模式,看輸出的訊息是a還是b

在進行第三步時,a這條訊息,應該已經被刪除了。所以在第15秒進入消費者模式時,應該輸出 b,這樣的話,策略才是成功的!

topic 為test的資料保留10秒

bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --entity-type topics --entity-name test --alter --add-config retention.ms=10000
進入生產模式,輸入a

bin/kafka-console-producer.sh --broker-list kafka-1.default.svc.cluster.local:9092 --topic test

> a

等待5秒後,再次進入生產模式,輸入b

bin/kafka-console-producer.sh --broker-list kafka-1.default.svc.cluster.local:9092 --topic test

> b

等待5秒後,進入 消費者模式

bin/kafka-console-consumer.sh --bootstrap-server kafka-1.default.svc.cluster.local:9092 --topic test --from-

beginning

b

如果消費者輸出為b,表示策略成功!

備註:如果生產環境中,正在不斷的進行生產和消費,執行kafka-configs.sh 指令碼,是否會有影響呢?

答案是不會的,它是動態策略!

posted @

2019-01-29 17:34

肖祥 閱讀(

...)

編輯收藏

kafka刪除新建topic

原理翻譯可參考 刪除kafka的topic,參照 1 刪除kafka儲存目錄 server.properties檔案log.dirs配置,預設為 tmp kafka logs 相關topic目錄 2 kafka 刪除topic的命令是 bin kafka topics delete zookeepe...

Kafka徹底刪除topic

kafka預設僅做刪除標記,並沒有真實刪除,若要真實刪除topic server.properties新增 delete.topic.enable true,然後重啟 刪除測試 建立topic bin kafka topics.sh create zookeeper 127.0.0.1 2180 r...

kafka徹底刪除topic

刪除topic一般是下面這樣的,但是這樣是刪不掉的,只是標記為刪除,再次建立同名topic還是會報錯,topic已經存在.kafka topics.sh zookeeper mypc01 2181 mypc02 2181 mypc03 2181 kafka delete topic pet首先 se...