kafka作為生產者,把生產的資料儲存到redis中,讀取的是json檔案,需要匯入阿里的包
一、pom檔案進行設定
redis.clients二、kafka直連apijedis
2.9.0
com.typesafe
config
1.3.1
org.scalikejdbc
scalikejdbc_2.11
2.5.0
org.scalikejdbc
scalikejdbc-core_2.11
2.5.0
org.scalikejdbc
scalikejdbc-config_2.11
2.5.0
com.alibaba
fastjson
1.2.36
import com.alibaba.fastjson.json三、redis連線池import kafka.common.topicandpartition
import kafka.message.messageandmetadata
import kafka.serializer.stringdecoder
import kafka.utils.
import org.i0itec.zkclient.zkclient
import org.apache.spark.sparkconf
import org.apache.spark.streaming.dstream.inputdstream
import org.apache.spark.streaming.kafka.
import org.apache.spark.streaming.
/*** direct直連方式
*/object kafkadirectlink "
//準備kafka引數
val kafkas = map(
"metadata.broker.list"->brokerlist,
"group.id"->groupid,
//從頭開始讀取資料
"auto.offset.reset"->kafka
.api.offsetrequest.smallesttimestring
)// zookeeper 的host和ip,建立乙個client,用於更新偏移量
// 是zookeeper客戶端,可以從zk中讀取偏移量資料,並更新偏移量
val zkclient = new zkclient(zkquorum)
//"/gp01/offset/tt/0/10001"
//"/gp01/offset/tt/1/20001"
//"/gp01/offset/tt/2/30001"
val clientoffset = zkclient.countchildren(zktopicpath)
// 建立kafkastream
var kafkastream :inputdstream[(string,string)]= null
//如果zookeeper中有儲存offset 我們會利用這個offset作為kafkastream的起始位置
//topicandpartition [/gp01/offset/tt/0/ , 8888]
var fromoffsets:map[topicandpartition,long] = map()
//如果儲存過offset
if(clientoffset > 0)")
// tt/0
val tp = topicandpartition(topic,i)
//將不同partition 對應得offset增加到fromoffset中
// tt/0 -> 10001
fromoffsets += (tp->partitionoffset.tolong)
}// key 是kafka的key value 就是kafka資料
// 這個會將kafka的訊息進行transform 最終kafka的資料都會變成(kafka的key,message)這樣的tuple
val messagehandler = (mmd:messageandmetadata[string,string])=>
(mmd.key(),mmd.message())
// 通過kafkautils建立直連的dstream
//[string,string,stringdecoder, stringdecoder,(string,string)]
// key value key解碼方式 value的解碼方式 接收資料的格式
kafkastream = kafkautils.createdirectstream
[string,string,stringdecoder,
stringdecoder,(string,string)](ssc,kafkas,fromoffsets,messagehandler)
}else
//偏移量範圍
var offsetranges = array[offsetrange]()
// 依次迭代dstream中的rdd
kafkastream.foreachrdd/$"
//將該partition的offset儲存到zookeeper中
// /gp01/offset/tt/ 0/88889
zkutils.updatepersistentpath(zkclient,zkpath,o.untiloffset.tostring)}}
// 啟動
ssc.start()
ssc.awaittermination()
}}
package day04.chinamobil四、讀取json檔案,並儲存到redis中import redis.clients.jedis.
/** * redis連線池
*/object jedisconnectionpool
}
import com.alibaba.fastjson.jsonobjectimport day04.
import org.apache.spark.rdd.rdd
//統計全網的充值訂單量, 充值金額, 充值成功數及充值平均時長.
object indexstatistics )
val re: rdd[(string, list[double])] = database1.map(t => (t._1, t._4))
//reducebykey聚合的是value
val re1: rdd[(string, list[double])] = re.reducebykey((list1, list2) => )
re1.foreachpartition(t => )
jedis.close()
})}
Kafka直連方式儲存MySQL
記得在之前寫了一篇是mysql基礎使用的,這次就用mysql來儲存direct方式的偏移量。如下 package kafka1 import kafka.common.topicandpartition import kafka.message.messageandmetadata import k...
kafka直連方式消費多個topic
乙個消費者組可以消費多個topic,以前寫過一篇乙個消費者消費乙個topic的,這次的是乙個消費者組通過直連方式消費多個topic,做了小測試,結果是正確的,通過檢視zookeeper的客戶端,zookeeper記錄了偏移量 package day04 消費多個topic import kafka....
Kafka直連儲存HBase
在之前介紹了kafka與sparkstreaming互動的兩種方式,我提到了公司採用的是direct方式,這次我向大家分享一下將偏移量儲存在hbase中。如下 package kafka1 import kafka.common.topicandpartition import kafka.mess...