kafkaspot在ack機制下如何保證記憶體不溢

2021-09-20 08:19:58 字數 2049 閱讀 9588

storm框架中的kafkaspout類實現的是baserichspout,它裡面已經重寫了fail和ack方法,所以我們的bolt必須實現ack機制,就可以保證訊息的重新傳送;如果不實現ack機制,那麼kafkaspout就無法得到訊息的處理響應,就會在超時以後再次傳送訊息,導致訊息的重**送。

但是回想一下我們自己寫乙個spout類實現baserichspout並讓他具備訊息重發,那麼我們是會在我們的spout類裡面定義乙個map集合,並以msgid作為key。

public

class myspout extends

baserichspout

public

void

open(map conf, topologycontext context, spoutoutputcollector collector)

public

void

nexttuple()

@override

public

void

ack(object msgid)

@override

public

void

fail(object msgid)

}

那麼kafkaspout會不會也是這樣還儲存這已傳送未收到bolt響應的訊息呢?如果這樣,如果訊息處理不斷失敗,不斷重發,訊息不斷積累在kafkaspout節點上,kafkaspout端會不就會出現記憶體溢位?

其實並沒有,回想kafka的原理,kafka會為每乙個consumergroup保留一些metadata資訊–當前消費的訊息的position,也即offset。這個offset由consumer控制。正常情況下consumer會在消費完一條訊息後線性增加這個offset。當然,consumer也可將offset設成乙個較小的值,重新消費一些訊息。也就是說,kafkaspot在消費kafka的資料是,通過offset讀取到訊息並傳送給bolt後,kafkaspot只是儲存者當前的offset值。

當失敗或成功根據msgid查詢offset值,然後再去kafka消費該資料來確保訊息的重新傳送。

那麼雖然offset資料小,但是當offset的資料量上去了還是會記憶體溢位的?

其實並沒有,kafkaspout發現快取的資料超過限制了,會把某端的資料清理掉的。

kafkaspot中傳送資料的**

collector.emit(tup, new kafkamessageid(_partition, toemit.offset));
可以看到msgid裡面包裝了offset引數。

它不快取已經傳送出去的資料資訊。

當他接收到來至bolt的響應後,會從接收到的msgid中得到offset。以下是從原始碼中折取的關鍵**:

public

void

ack(object msgid)

}m.ack(id.offset);

public

void

ack(long offset)

public

void

fail(object msgid)

} m.fail(id.offset);

public

void

fail(long offset)

sortedset

_pending = new treeset();

sortedset

failed = new treeset();

原始碼解析中涉及了很多kafka的概念,所以僅僅理解kafka的概念想完全理解kafkaspot原始碼是很難的,如果不理解kafka概念,那麼就只需要在理解storm的ack機制上明白kafkaspot做了上面的兩件事就可以了。

intsmaze(劉洋)

出處:

由於博主能力有限,文中可能存在描述不正確,歡迎指正、補充!

kafkaspot在ack機制下如何保證記憶體不溢

storm框架中的kafkaspout類實現的是baserichspout,它裡面已經重寫了fail和ack方法,所以我們的bolt必須實現ack機制,就可以保證訊息的重新傳送 如果不實現ack機制,那麼kafkaspout就無法得到訊息的處理響應,就會在超時以後再次傳送訊息,導致訊息的重 送。但是...

kafkaspot在ack機制下如何保證記憶體不溢

storm框架中的kafkaspout類實現的是baserichspout,它裡面已經重寫了fail和ack方法,所以我們的bolt必須實現ack機制,就可以保證訊息的重新傳送 如果不實現ack機制,那麼kafkaspout就無法得到訊息的處理響應,就會在超時以後再次傳送訊息,導致訊息的重 送。但是...

kafkaspot在ack機制下如何保證記憶體不溢

storm框架中的kafkaspout類實現的是baserichspout,它裡面已經重寫了fail和ack方法,所以我們的bolt必須實現ack機制,就可以保證訊息的重新傳送 如果不實現ack機制,那麼kafkaspout就無法得到訊息的處理響應,就會在超時以後再次傳送訊息,導致訊息的重 送。但是...