在之前介紹了kafka與sparkstreaming互動的兩種方式,我提到了公司採用的是direct方式,這次我向大家分享一下將偏移量儲存在hbase中。
**如下:
package kafka1
import kafka.common.topicandpartition
import kafka.message.messageandmetadata
import kafka.serializer.stringdecoder
import kafka.utils.zkutils
import org.apache.hadoop.hbase.client.
import org.apache.hadoop.hbase.util.bytes
import org.apache.hadoop.hbase.
import org.apache.spark.sparkconf
import org.apache.spark.streaming.dstream.inputdstream
import org.apache.spark.streaming.kafka.
import org.apache.spark.streaming.
object kafkahbasemanager
table.put(put)
conn.close()
} // 從zookeeper中獲取topic的分割槽數
def getnumberofpartitionsfortopicfromzk(topic_name: string, group_id: string,
zkquorum: string, zkrootdir: string, sesstimeout: int, conntimeout: int): int =
// 獲取hbase的offset
def getlastestoffsets(topic_name: string, group_id: string, htablename: string,
zkquorum: string, zkrootdir: string, sesstimeout: int, conntimeout: int): map[topicandpartition, long] =
val fromoffsets = collection.mutable.map[topicandpartition, long]()
if (hbasenumberofpartitions == 0)
} else if (zknumberofpartitions > hbasenumberofpartitions)
// 對新增加的分割槽將它的offset值設為0
for (partition <- hbasenumberofpartitions until zknumberofpartitions)
} else
}scanner.close()
conn.close()
fromoffsets.tomap
} def main(args: array[string]): unit =
})ssc.start()
ssc.awaittermination()
}}
存放在hbase中**有點麻煩,接下來我的部落格中會像大家介紹兩種比較簡單的。
summed up by jiamingcan
Kafka直連方式儲存MySQL
記得在之前寫了一篇是mysql基礎使用的,這次就用mysql來儲存direct方式的偏移量。如下 package kafka1 import kafka.common.topicandpartition import kafka.message.messageandmetadata import k...
kafka直連方式api與Redis
kafka作為生產者,把生產的資料儲存到redis中,讀取的是json檔案,需要匯入阿里的包 一 pom檔案進行設定 redis.clients jedis 2.9.0 com.typesafe config 1.3.1 org.scalikejdbc scalikejdbc 2.11 2.5.0 ...
kafka直連方式消費多個topic
乙個消費者組可以消費多個topic,以前寫過一篇乙個消費者消費乙個topic的,這次的是乙個消費者組通過直連方式消費多個topic,做了小測試,結果是正確的,通過檢視zookeeper的客戶端,zookeeper記錄了偏移量 package day04 消費多個topic import kafka....