最近想統計一些訊息資料,原計畫接收kakfa訊息後自行統計然後存入資料庫(統計相對比較簡單,所以沒有考慮使用apache storm), 突然想起來kafka已經提供kakfa stream功能,於是開始看kafka stream。 下面的例子非常簡單,只是在kafka提供的例子上做了一點修改。
因為我們使用的kafka stream所以新增的依賴是kafka-streams, 不是以前經常使用的kafka-clients.
我的kafka安裝在windows 10上面(為了方便測試,平時在公司時可以直接連線到kafka集群,開發時先在本地執行,於是在windows10上安裝了kafka)。 版本kafka_2.12-1.0.0
>
>
org.apache.kafkagroupid
>
>
kafka-streamsartifactid
>
>
1.0.2version
>
dependency
>
官方示例的**在`這裡
官方示例中向topic直接傳送了溫度資料。 我修改一下。 向topic傳送json格式的資料,裡面包含了溫度和濕度。例如
注意:該**只在官方示例上修該資料格式,其他部分和官方示例一樣。啟動程式後直接向topic iot-temperature傳送格式為的訊息即可看到執行效果。
public
class
temperaturedemo})
.groupbykey()
.windowedby
(timewindows.
of(timeunit.seconds.
tomillis
(temperature_window_size)))
.reduce
(new
reducer
()else}}
).tostream()
//過濾條件就是溫度大於20
stream執行結果存放在topic iot-temperature-max中, 我們檢視該topic的資料。 只有大於temperature_threshold (20)被存入該topic
oracle job簡單示例
廢話不說,本篇記錄乙個簡單job示例,採用oracle 10i與pl sql developer工具。完成乙個job必須具備三元素 1 table 使用者關心的資料表,用於job更新等 2 procedure 封裝使用者對table的操作 3 job 描述什麼時間 執行頻率使用procedure來操...
jsoncpp簡單示例
scons platform linux gcc 編譯出來的庫檔案在其libs linux gcc 4.4.2目錄下,有libjson linux gcc 4.4.2 libmt.so和libjson linux gcc 4.4.2 libmt.a。標頭檔案在解壓目錄下的include中。我的jso...
jsoncpp簡單示例
1 編譯jsoncpp mkdir usr jsoncpp cp r include usr jsoncpp cp r libs usr jsoncpp 2 jsoncpp簡單例項 1 反序列化json物件 比如乙個json物件的字串序列如下,其中 array 表示json物件中的陣列 那怎麼分別取...