看了一下kafka,然後寫了消費kafka資料的**。感覺自己功力還是不夠。
不能隨心所欲地運算元據,資料結構沒學好,spark的rdd操作沒學好。
不能很好地組織**結構,設計模式沒學好,物件導向思想理解不夠成熟。
用佇列來儲存要消費的資料。
用佇列來儲存要提交的offest,然後處理執行緒將其給回消費者提交。
每個分割槽開乙個處理執行緒來處理資料,分割槽與處理器的對映放在map中。
當處理到一定的數量或者距離上一次處理一定的時間間隔後, 由poll執行緒進行提交offset。
不好的地方:
每次處理的資料太少,而且每個資料都進行判斷其分割槽是否已經有處理執行緒在處理了。
獲取topic不太優雅。
流程圖
下面是多執行緒消費者實現:
/**
* 負責啟動消費者執行緒msgreceiver, 儲存消費者執行緒msgreceiver, 儲存處理任務和執行緒recordprocessor, 以及銷毀這些執行緒
* created by stillcoolme on 2018/10/12.
*/public class kafkamultiprocessormain
public static void main(string args)
private void init(string consumerproppath)
for (int i = 0; i < threadsnum; i++)
logger.info("finish creating" + threadsnum + " threads to consume kafka warn msg");
}//銷毀啟動的執行緒
public void destroy()
private void closerecordprocessthreads()
logger.debug("finish interrupting record process threads");
}private void closekafkaconsumer()
logger.debug("finish interrupting consumer threads");
}private mapgetconsumerconfig()
/*** 獲取消費者引數
** @param propath
*/private void getconsumerprops(string propath) else
consumerprops.load(instream);
} catch (ioexception e) finally catch (ioexception e) }}
}}
/**
* 負責呼叫 recordprocessor進行資料處理
* created by zhangjianhua on 2018/10/12.
*/public class msgreceiver implements runnable
@override
public void run()
//最多輪詢1000ms
consumerrecordsrecords = kafkaconsumer.poll(1000);
if (records.count() > 0)
for (consumerrecord record : records)
//有 processor 可以處理該分割槽的 record了
processtask.addrecordtoqueue(record);
}} catch (exception e)
}} finally }}
public class recordprocessor implements runnable
@override
public void run()
//提交偏移給queue中
committoqueue();
} catch (interruptedexception e) }}
//將當前的消費偏移量放到queue中, 由msgreceiver進行提交
private void committoqueue()
//如果消費了設定的條數, 比如又消費了commitlength訊息
boolean arrivedcommitlength = this.completetask % commitlength == 0;
//獲取當前時間, 看是否已經到了需要提交的時間
localdatetime currenttime = localdatetime.now();
boolean arrivedtime = currenttime.isafter(lasttime.plus(committime));
if(arrivedcommitlength || arrivedtime)
}//consumer執行緒向處理執行緒的佇列中新增record
public void addrecordtoqueue(consumerrecordrecord) catch (interruptedexception e)
}private void process(consumerrecordrecord)
}
對處理程式recordprocessor進行抽象,抽象出basepropessor父類。以後業務需求需要不同的處理程式recordprocessor就可以靈活改變了。
反射來構建recordprocessor??在配置檔案配置具體要new的recordprocessor類路徑,然後在建立msgreceiver的時候傳遞進去。
參考kafka consumer多執行緒例項 : 如這篇文章所說的維護了多個worker來做具體業務處理,這篇文章用的是threadpoolexecutor執行緒池。
Kafka學習筆記 多執行緒開發消費者
從 kafka 0.10.1.0 版本開始,kafkaconsumer 就變為了雙線程的設計,即使用者主線程和心跳執行緒。所謂使用者主線程,就是你啟動 consumer 應用程式 main 方法的那個執行緒,而新引入的心跳執行緒 heartbeat thread 只負責定期給對應的 broker 機...
多執行緒系列之生產者和消費者
在之前接觸過pv操作的,應該對於生產者和消費者的情況有乙個了解,這裡學到多執行緒同步的時候,最恰當的乙個例子。pv操作就不多做解釋。生產者和消費者 author bobo public class producerconsumer 產品 窩頭 class wotou public string to...
Java多執行緒實現,生產者消費者
根據自己的理解簡單的實現了乙個,生產者,消費者模式的多執行緒,請大家多提寶貴意見 sleep wait 比較 sleep 是thread的靜態方法,是用來修改執行緒自身的執行方式。執行緒睡眠時間不會釋放鎖,睡眠完成自動開始執行。wait 是object類中的方法,用作執行緒之間的通訊,被其他執行緒呼...