記得在之前寫了一篇是mysql基礎使用的,這次就用mysql來儲存direct方式的偏移量。
**如下:
package kafka1
import kafka.common.topicandpartition
import kafka.message.messageandmetadata
import kafka.serializer.stringdecoder
import org.apache.spark.sparkconf
import org.apache.spark.streaming.dstream.inputdstream
import org.apache.spark.streaming.kafka.kafkacluster.err
import org.apache.spark.streaming.kafka.
import org.apache.spark.streaming.
import scalikejdbc.
import scalikejdbc.config.dbs
/*將偏移量儲存到mysql中
*/class directmysql '")
.map(m=>(topicandpartition(
m.string("topic"),m.int("partitions")),m.long("untiloffsets")))
}.tomap //最後要tomap一下,因為前面的返回值已經給定
//建立乙個inputdstream,然後根據offset讀取資料
var kafkastream:inputdstream[(string,string)]=null
//從mysql中獲取資料,進行判斷
if(fromdboffset.size==0)elseelse})}
val messagehandler = (mmd:messageandmetadata[string,string])=>
kafkastream= kafkautils.createdirectstream[string,string,
stringdecoder,stringdecoder,
(string,string)](ssc,kafkas,checkoffsets,messagehandler)
}//開始處理資料流,和zk一樣
kafkastream.foreachrdd(kafkardd=>)})
ssc.start()
ssc.awaittermination()
}}
summed up by jiamingcan
Kafka直連儲存HBase
在之前介紹了kafka與sparkstreaming互動的兩種方式,我提到了公司採用的是direct方式,這次我向大家分享一下將偏移量儲存在hbase中。如下 package kafka1 import kafka.common.topicandpartition import kafka.mess...
kafka直連方式api與Redis
kafka作為生產者,把生產的資料儲存到redis中,讀取的是json檔案,需要匯入阿里的包 一 pom檔案進行設定 redis.clients jedis 2.9.0 com.typesafe config 1.3.1 org.scalikejdbc scalikejdbc 2.11 2.5.0 ...
kafka直連方式消費多個topic
乙個消費者組可以消費多個topic,以前寫過一篇乙個消費者消費乙個topic的,這次的是乙個消費者組通過直連方式消費多個topic,做了小測試,結果是正確的,通過檢視zookeeper的客戶端,zookeeper記錄了偏移量 package day04 消費多個topic import kafka....