生產環境中,有乙個topic的資料量非常大。這些資料不是非常重要,需要定期清理。
要求:預設保持24小時,某些topic 需要保留2小時或者6小時
主要有3個:
1. 基於時間
2. 基於日誌大小
3. 基於日誌起始偏移量
接下來,主要介紹基於時間的清除!
kafka版本為: 2.11-1.1.0
zk版本為: 3.4.13
# 啟用刪除主題注意:這2行配置必須存在,否則清除策略失效!log.retention.check.interval.ms 引數的單位是微秒,這裡表示間隔1秒鐘delete.topic.enable=true
# 檢查日誌段檔案的間隔時間,以確定是否檔案屬性是否到達刪除要求。
log.retention.check.interval.ms=1000
在 server.properties 檔案中配置的是全域性策略,針對每乙個topic
針對單個topic策略,需要使用指令碼kafka-configs.sh
此指令碼不需要重啟kafka就會生效!
首先來檢視一下,當前的topic策略,比如test
bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --describe --entity-type topics --entity-name test引數解釋:
--describe 詳細資訊
--entity-type 實體型別
--entity-name 指定topic名
輸出:
configs for topic '這個表示為策略為空test
' are
如果需要刪除topic所有資料,使用命令
bin/kafka-topics.sh --delete --topic test --zookeeper zookeeper-1.default.svc.cluster.local:2181這個命令,請謹慎執行!!!如果想保留主題,只刪除主題現有資料(log)。可以通過修改資料保留時間實現
bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --entity-type topics --entity-name test --alter --add-config retention.ms=10000執行輸出:
completed updating config for entity: topic '注意:修改保留時間為10秒,但不是修改後10秒就馬上刪掉,kafka是採用輪訓的方式,輪訓到這個topic發現10秒前的資料都是刪掉。時間由server.properties裡面的log.retention.check.interval.ms選項為主test
'.
再次檢視topic策略
bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --describe --entity-type topics --entity-name test輸出:
configs for topic '發現目前的刪除策略為 retention.ms=10000test
' are retention.ms=10000
如果需要刪除上面的10秒策略,使用以下命令:
bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --entity-type topics --entity-name test --alter --delete-config retention.ms輸出:
completed updating config for entity: topic '再次檢視topic策略test
'.
bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --describe --entity-type topics --entity-name test輸出:
configs for topic '發現策略為空,說明刪除成功了!test
' are
第一步,設定清除策略為保留10秒
第二步,進入生產者模式,輸入訊息 a
第三步,等待5秒,再次進入生產者模式,輸入訊息 b
第四部,進入消費者模式,看輸出的訊息是a還是b
在進行第三步時,a這條訊息,應該已經被刪除了。所以在第15秒進入消費者模式時,應該輸出 b,這樣的話,策略才是成功的!
topic 為test的資料保留10秒
bin/kafka-configs.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --entity-type topics --entity-name test --alter --add-config retention.ms=10000進入生產模式,輸入a
bin/kafka-console-producer.sh --broker-list kafka-1.default.svc.cluster.local:9092 --topic test等待5秒後,再次進入生產模式,輸入b> a
bin/kafka-console-producer.sh --broker-list kafka-1.default.svc.cluster.local:9092 --topic test等待5秒後,進入 消費者模式> b
bin/kafka-console-consumer.sh --bootstrap-server kafka-1.default.svc.cluster.local:9092 --topic test --from-如果消費者輸出為b,表示策略成功!beginning
b
備註:如果生產環境中,正在不斷的進行生產和消費,執行kafka-configs.sh 指令碼,是否會有影響呢?
答案是不會的,它是動態策略!
posted @
2019-01-29 17:34
肖祥 閱讀(
...)
編輯收藏
kafka刪除新建topic
原理翻譯可參考 刪除kafka的topic,參照 1 刪除kafka儲存目錄 server.properties檔案log.dirs配置,預設為 tmp kafka logs 相關topic目錄 2 kafka 刪除topic的命令是 bin kafka topics delete zookeepe...
Kafka徹底刪除topic
kafka預設僅做刪除標記,並沒有真實刪除,若要真實刪除topic server.properties新增 delete.topic.enable true,然後重啟 刪除測試 建立topic bin kafka topics.sh create zookeeper 127.0.0.1 2180 r...
kafka徹底刪除topic
刪除topic一般是下面這樣的,但是這樣是刪不掉的,只是標記為刪除,再次建立同名topic還是會報錯,topic已經存在.kafka topics.sh zookeeper mypc01 2181 mypc02 2181 mypc03 2181 kafka delete topic pet首先 se...