kafka直連方式api與Redis

2021-09-01 02:20:46 字數 3492 閱讀 7430

kafka作為生產者,把生產的資料儲存到redis中,讀取的是json檔案,需要匯入阿里的包

一、pom檔案進行設定

redis.clients

jedis

2.9.0

com.typesafe

config

1.3.1

org.scalikejdbc

scalikejdbc_2.11

2.5.0

org.scalikejdbc

scalikejdbc-core_2.11

2.5.0

org.scalikejdbc

scalikejdbc-config_2.11

2.5.0

com.alibaba

fastjson

1.2.36

二、kafka直連api

import com.alibaba.fastjson.json

import kafka.common.topicandpartition

import kafka.message.messageandmetadata

import kafka.serializer.stringdecoder

import kafka.utils.

import org.i0itec.zkclient.zkclient

import org.apache.spark.sparkconf

import org.apache.spark.streaming.dstream.inputdstream

import org.apache.spark.streaming.kafka.

import org.apache.spark.streaming.

/*** direct直連方式

*/object kafkadirectlink "

//準備kafka引數

val kafkas = map(

"metadata.broker.list"->brokerlist,

"group.id"->groupid,

//從頭開始讀取資料

"auto.offset.reset"->kafka

.api.offsetrequest.smallesttimestring

)// zookeeper 的host和ip,建立乙個client,用於更新偏移量

// 是zookeeper客戶端,可以從zk中讀取偏移量資料,並更新偏移量

val zkclient = new zkclient(zkquorum)

//"/gp01/offset/tt/0/10001"

//"/gp01/offset/tt/1/20001"

//"/gp01/offset/tt/2/30001"

val clientoffset = zkclient.countchildren(zktopicpath)

// 建立kafkastream

var kafkastream :inputdstream[(string,string)]= null

//如果zookeeper中有儲存offset 我們會利用這個offset作為kafkastream的起始位置

//topicandpartition [/gp01/offset/tt/0/ , 8888]

var fromoffsets:map[topicandpartition,long] = map()

//如果儲存過offset

if(clientoffset > 0)")

// tt/0

val tp = topicandpartition(topic,i)

//將不同partition 對應得offset增加到fromoffset中

// tt/0 -> 10001

fromoffsets += (tp->partitionoffset.tolong)

}// key 是kafka的key value 就是kafka資料

// 這個會將kafka的訊息進行transform 最終kafka的資料都會變成(kafka的key,message)這樣的tuple

val messagehandler = (mmd:messageandmetadata[string,string])=>

(mmd.key(),mmd.message())

// 通過kafkautils建立直連的dstream

//[string,string,stringdecoder, stringdecoder,(string,string)]

// key value key解碼方式 value的解碼方式 接收資料的格式

kafkastream = kafkautils.createdirectstream

[string,string,stringdecoder,

stringdecoder,(string,string)](ssc,kafkas,fromoffsets,messagehandler)

}else

//偏移量範圍

var offsetranges = array[offsetrange]()

// 依次迭代dstream中的rdd

kafkastream.foreachrdd/$"

//將該partition的offset儲存到zookeeper中

// /gp01/offset/tt/ 0/88889

zkutils.updatepersistentpath(zkclient,zkpath,o.untiloffset.tostring)}}

// 啟動

ssc.start()

ssc.awaittermination()

}}

三、redis連線池

package day04.chinamobil

import redis.clients.jedis.

/** * redis連線池

*/object jedisconnectionpool

}

四、讀取json檔案,並儲存到redis中

import com.alibaba.fastjson.jsonobject

import day04.

import org.apache.spark.rdd.rdd

//統計全網的充值訂單量, 充值金額, 充值成功數及充值平均時長.

object indexstatistics )

val re: rdd[(string, list[double])] = database1.map(t => (t._1, t._4))

//reducebykey聚合的是value

val re1: rdd[(string, list[double])] = re.reducebykey((list1, list2) => )

re1.foreachpartition(t => )

jedis.close()

})}

Kafka直連方式儲存MySQL

記得在之前寫了一篇是mysql基礎使用的,這次就用mysql來儲存direct方式的偏移量。如下 package kafka1 import kafka.common.topicandpartition import kafka.message.messageandmetadata import k...

kafka直連方式消費多個topic

乙個消費者組可以消費多個topic,以前寫過一篇乙個消費者消費乙個topic的,這次的是乙個消費者組通過直連方式消費多個topic,做了小測試,結果是正確的,通過檢視zookeeper的客戶端,zookeeper記錄了偏移量 package day04 消費多個topic import kafka....

Kafka直連儲存HBase

在之前介紹了kafka與sparkstreaming互動的兩種方式,我提到了公司採用的是direct方式,這次我向大家分享一下將偏移量儲存在hbase中。如下 package kafka1 import kafka.common.topicandpartition import kafka.mess...