kafka 0.8版本公升級為0.10版本時,消費**需要做出一些修改,如下:
kafka 0.8版本:
val kafkaparams = map[string, string](
"metadata.broker.list" -> kafka_ip, //此處為kafka對應的ip
"refresh.leader.backoff.ms" -> "30000")
val lines = kafkautils
.createdirectstream[string, string, stringdecoder, stringdecoder]( //根據kafka資料中key value的型別進行選擇,如string
ssc,
kafkaparams,
topics)
val infos = lines.reducebykey((a: string, b: string) => yourfunc(a, b), 100) //此處可使用自己的方法對同一key下的多個value進行相關操作
kafka 0.10版本:
val kafkaparams = map[string, object](
"bootstrap.servers" -> kafka_ip, //此處需將0.8中的「metadata.broker.list」改為「bootstrap.servers」
"key.deserializer" -> classof[stringdeserializer], //需要在此處對kafka資料進行序列化
"value.deserializer" -> classof[stringdeserializer],
"group.id" -> "my_test", 版本的快取需要將topic的分割槽和groupid作為key,此處group.id可自己定義名字
"auto.offset.reset" -> "latest",
"refresh.leader.backoff.ms" -> "30000")
var lines = kafkautils.createdirectstream[string, string](ssc, preferconsistent, subscribe[string, string](topicset, kafkaparams));//createdirectstream的方式與0.8版不同,自行比較
val infos = lines.map(record => (record.key(),record.value()))// 注意:資料流中的每一項都是乙個consumerrecord類,本步操作後可執行reducebykey
val infos1 = infos.reducebykey((a: string, b: string) => yourfunc(a, b), 100)
zabbix自帶php5 4公升級至7 2公升級步驟
公升級前備份 etc php.ini 1.解除安裝原來低版本的php rpm qa grep php xargs i rpm e nodeps2.更新yum源 rpm uvh rpm uvh 生成一些repo檔案在 etc yum.repos.d 目錄下 ls etc yum.repos.d epe...
Fedora 15公升級核心至3 0 4
安裝的fedoar 15,核心版本2.9.40,不知什麼原因,無線網絡卡的驅動可以工作,但是無線上網效率不超過50k s,自己試著更換驅動,效果沒有改善。雖然有人說2.9.40核心和3.0核心沒有什麼實質性的差別,但是自己還是試著手動編譯了linux 3.0.4核心,並且安裝成功。安裝完成後,無線上...
Fedora 15公升級核心至3 0 4
安裝的fedoar 15,核心版本2.9.40,不知什麼原因,無線網絡卡的驅動可以工作,但是無線上網效率不超過50k s,自己試著更換驅動,效果沒有改善。雖然有人說2.9.40核心和3.0核心沒有什麼實質性的差別,但是自己還是試著手動編譯了linux 3.0.4核心,並且安裝成功。安裝完成後,無線上...