乙個消費者組可以消費多個topic,以前寫過一篇乙個消費者消費乙個topic的,這次的是乙個消費者組通過直連方式消費多個topic,做了小測試,結果是正確的,通過檢視zookeeper的客戶端,zookeeper記錄了偏移量
package day04
/*消費多個topic
*/import kafka.common.topicandpartition
import kafka.message.messageandmetadata
import kafka.serializer.stringdecoder
import kafka.utils.
import scala.collection.mutable.listbuffer
import org.i0itec.zkclient.zkclient
import org.apache.spark.sparkconf
import org.apache.spark.streaming.dstream.inputdstream
import org.apache.spark.streaming.kafka.
import org.apache.spark.streaming.
//new listbuffer用來存放zkgrouptopicdirs, 用來儲存偏移量的位址
//因為有多個topic,對應的也就有多個zkgrouptopicdirs
var zkgtlist:listbuffer[zkgrouptopicdirs] =new listbuffer[zkgrouptopicdirs]()
//根據topiclist 新建 zkgrouptopicdirs 新增到zkgtlist
for(tp <- topicslist)
//新建zkclient,用來獲取偏移量和更新偏移量
val zkclient = new zkclient(zkquorum)
//新建乙個inputdstream,要是var,因為有兩種情況,消費過? 沒有消費過? 根據情況賦值
var kafkadstream :inputdstream[(string,string)] = null
//建立乙個map,(key,value)-》( 對應的時topic和分割槽 ,偏移量)
var fromoffset = map[topicandpartition,long]()
//獲取每個topic是否被消費過
var childrens:listbuffer[int] =new listbuffer[int]()
var flag = false //有topic被消費過則為true
for (topicdir <- zkgtlist)
}if(flag)
}//返回的而結果是 kafka的key,預設是null, value是kafka中的值
//建立kafkadstream
kafkadstream = kafkautils.createdirectstream[string,string,stringdecoder,stringdecoder,(string,string)](
ssc,kafkaparams,fromoffset,messagehandler
)}else
/*val children1 = zkclient.countchildren(zkgrouptopicdirs1.consumeroffsetdir)
val children2 = zkclient.countchildren(zkgrouptopicdirs2.consumeroffsetdir)
if(children1>0 || children2>0)
}if(children2>0)
}val messagehandler =(mmd:messageandmetadata[string,string])=>
kafkadstream = kafkautils.createdirectstream[string,string,stringdecoder,stringdecoder,(string,string)](ssc,
kafkaparams,fromoffset,messagehandler)
}else*/
var offsetranges = array[offsetrange]www.hjpt521.com() //用來記錄更新的每個topic的分割槽偏移量
kafkadstream.foreachrdd(kafkardd=>else if(topicnn.equals(topic2))*/}})
ssc.start()
ssc.awaittermination(www.dfgjyl.cn)
可以通過zookeeper的客戶端,在/consumers中檢視偏移量,
我的3個topic中,其中wc和wc1只有1個分割槽,可以通過下圖可看出wc1的0分割槽偏移量13
Kafka直連方式儲存MySQL
記得在之前寫了一篇是mysql基礎使用的,這次就用mysql來儲存direct方式的偏移量。如下 package kafka1 import kafka.common.topicandpartition import kafka.message.messageandmetadata import k...
kafka直連方式api與Redis
kafka作為生產者,把生產的資料儲存到redis中,讀取的是json檔案,需要匯入阿里的包 一 pom檔案進行設定 redis.clients jedis 2.9.0 com.typesafe config 1.3.1 org.scalikejdbc scalikejdbc 2.11 2.5.0 ...
Kafka直連儲存HBase
在之前介紹了kafka與sparkstreaming互動的兩種方式,我提到了公司採用的是direct方式,這次我向大家分享一下將偏移量儲存在hbase中。如下 package kafka1 import kafka.common.topicandpartition import kafka.mess...