kafka scheduler用於執行作業的排程程式。控制乙個在後台定期重複執行或者延遲排程的作業!主要有一下操作:
初始化任務以便可以接受任務排程 def startup()
當任務排程完成關閉。def shutdown() 延遲佇列實現
排程任務:def schedule(name: string, fun: ()=>unit, delay: long = 0, period: long = -1, unit: timeunit = timeunit.milliseconds)
詳細**如下:
class kafkascheduler(val threads: int,
val threadnameprefix: string = "kafka-scheduler-",
daemon: boolean = true) extends scheduler with logging )
}}
override def shutdown()
cachedexecutor.awaittermination(1, timeunit.days)
}} def scheduleonce(name: string, fun: () => unit): unit =
def schedule(name: string, fun: () => unit, delay: long, period: long, unit: timeunit) catch finally
}if(period >= 0)
executor.scheduleatfixedrate(runnable, delay, period, unit)
else
executor.schedule(runnable, delay, unit)
}}
kafka原始碼分析 kafkaconsumer
kafkaconsumer 對於多執行緒訪問是不安全的,通過使用acquire 跟release 方法來操作atomiclong currentthread字段 儲存當前訪問執行緒id 有多個執行緒同時訪問丟擲concurrentmodificationexception,來防止對個執行緒同時訪問。...
Kafka原始碼分析(一)
apache kafka 是 乙個分布式流處理平台.這到底意味著什麼呢?我們知道流處理平台有以下三種特性 它可以用於兩大類別的應用 為了理解kafka是如何做到以上所說的功能,從下面開始,我們將深入探索kafka的特性。首先是一些概念 kafka有四個核心的api 讓我們首先深入了解下kafka的核...
Kafka原始碼分析之KafkaProducer
kafkaproducer是乙個kafka客戶端實現,可以發布記錄records至kafka集群。kafkaproducer是執行緒安全的,多執行緒之間共享單獨乙個producer例項通常會比多個producer例項要快。kafkaproducer包含一組快取池空間,儲存尚未傳輸到集群的記錄reco...