storm框架中的kafkaspout類實現的是baserichspout,它裡面已經重寫了fail和ack方法,所以我們的bolt必須實現ack機制,就可以保證訊息的重新傳送;如果不實現ack機制,那麼kafkaspout就無法得到訊息的處理響應,就會在超時以後再次傳送訊息,導致訊息的重**送。
但是回想一下我們自己寫乙個spout類實現baserichspout並讓他具備訊息重發,那麼我們是會在我們的spout類裡面定義乙個map集合,並以msgid作為key。
public那麼kafkaspout會不會也是這樣還儲存這已傳送未收到bolt響應的訊息呢?如果這樣,如果訊息處理不斷失敗,不斷重發,訊息不斷積累在kafkaspout節點上,kafkaspout端會不就會出現記憶體溢位?class myspout extends
baserichspout
public
void
open(map conf, topologycontext context, spoutoutputcollector collector)
public
void
nexttuple()
@override
public
void
ack(object msgid)
@override
public
void
fail(object msgid)
}
其實並沒有,回想kafka的原理,kafka會為每乙個consumergroup保留一些metadata資訊–當前消費的訊息的position,也即offset。這個offset由consumer控制。正常情況下consumer會在消費完一條訊息後線性增加這個offset。當然,consumer也可將offset設成乙個較小的值,重新消費一些訊息。也就是說,kafkaspot在消費kafka的資料是,通過offset讀取到訊息並傳送給bolt後,kafkaspot只是儲存者當前的offset值。
當失敗或成功根據msgid查詢offset值,然後再去kafka消費該資料來確保訊息的重新傳送。
那麼雖然offset資料小,但是當offset的資料量上去了還是會記憶體溢位的?
其實並沒有,kafkaspout發現快取的資料超過限制了,會把某端的資料清理掉的。
kafkaspot中傳送資料的**
collector.emit(tup, new kafkamessageid(_partition, toemit.offset));可以看到msgid裡面包裝了offset引數。
它不快取已經傳送出去的資料資訊。
當他接收到來至bolt的響應後,會從接收到的msgid中得到offset。以下是從原始碼中折取的關鍵**:
public原始碼解析中涉及了很多kafka的概念,所以僅僅理解kafka的概念想完全理解kafkaspot原始碼是很難的,如果不理解kafka概念,那麼就只需要在理解storm的ack機制上明白kafkaspot做了上面的兩件事就可以了。void
ack(object msgid)
}m.ack(id.offset);
public
void
ack(long offset)
public
void
fail(object msgid)
} m.fail(id.offset);
public
void
fail(long offset)
sortedset
_pending = new treeset();
sortedset
failed = new treeset();
kafkaspot在ack機制下如何保證記憶體不溢
storm框架中的kafkaspout類實現的是baserichspout,它裡面已經重寫了fail和ack方法,所以我們的bolt必須實現ack機制,就可以保證訊息的重新傳送 如果不實現ack機制,那麼kafkaspout就無法得到訊息的處理響應,就會在超時以後再次傳送訊息,導致訊息的重 送。但是...
kafkaspot在ack機制下如何保證記憶體不溢
storm框架中的kafkaspout類實現的是baserichspout,它裡面已經重寫了fail和ack方法,所以我們的bolt必須實現ack機制,就可以保證訊息的重新傳送 如果不實現ack機制,那麼kafkaspout就無法得到訊息的處理響應,就會在超時以後再次傳送訊息,導致訊息的重 送。但是...
kafkaspot在ack機制下如何保證記憶體不溢
storm框架中的kafkaspout類實現的是baserichspout,它裡面已經重寫了fail和ack方法,所以我們的bolt必須實現ack機制,就可以保證訊息的重新傳送 如果不實現ack機制,那麼kafkaspout就無法得到訊息的處理響應,就會在超時以後再次傳送訊息,導致訊息的重 送。但是...