kafka producer 異常處理

2021-10-22 17:35:17 字數 2381 閱讀 1287

sender.completebatch

if(error != errors.none &&

canretry

(batch, error)

) on topic-partition {}, retrying ({} attempts left). error: {}"

, correlationid,

batch.topicpartition,

this

.retries - batch.attempts -1,

error)

;//重新把傳送失敗等著批次 加入到佇列裡面。

this

.accumulator.

reenqueue

(batch, now)

;this

.sensors.

recordretries

(batch.topicpartition.

topic()

, batch.recordcount);}

else

else}}

}}if(

!expiredbatches.

isempty()

) log.

trace

("expired {} batches in accumulator"

, count)

;return expiredbatches;

}batch.maybeexpire

public

boolean

maybeexpire

(int requesttimeoutms,

long retrybackoffms,

long now,

long lingerms,

boolean isfull)

elseif(

!this

.inretry()

&& requesttimeoutms <

(now -

(this

.createdms + lingerms)))

elseif(

this

.inretry()

&& requesttimeoutms <

(now -

(this

.lastattemptms + retrybackoffms)))

if(expire)

return expire;

}

networkclient.poll.handletimedoutrequests

private

void

handletimedoutrequests

(list

responses,

long now)

due to request timeout."

, nodeid)

;//我們猜應該是會去修改 連線的狀態

processdisconnection

(responses, nodeid, now);}

// we disconnected, so we should probably refresh our metadata

if(nodeids.

size()

>0)

metadataupdater.

requestupdate()

;}processdisconnection

private

void

processdisconnection

(list

responses, string nodeid,

long now)

due to node {} being disconnected"

, request, nodeid);if

(!metadataupdater.

maybehandledisconnection

(request)

)//對這些請求進行處理

//大家會看到乙個比較有意思的事

//自己封裝了乙個響應。這個響應裡面沒有服務端響應訊息(服務端沒給響應)

//失去連線的狀態表標識為true

responses.

add(

newclientresponse

(request, now,

true

, null));

}}disconnected

public

void

disconnected

(string id,

long now)

kafka producer 分割槽器

策略一 如果傳送訊息的時候,沒有指定key,輪詢達到負載均衡 策略二 這個地方就是指定了key,hash取模,相同的key打到同乙個分割槽上 int partition partition record,serializedkey,serializedvalue,cluster return par...

kafka producer寫詳細過程

1 2 客戶端寫入引數。batch.size 通過這個引數來設定批量提交的資料大小,預設是16k,當積壓的訊息達到這個值的時候就會統一傳送 發往同一分割槽的訊息 linger.ms 這個設定是為傳送設定一定是延遲來收集更多的訊息,預設大小是0ms 就是有訊息就立即傳送 滿足上述任意一條件即傳送訊息。...

三 Kafka Producer傳送訊息及分割槽策略

1 producer 實現 ps 不建議使用自定義序列化和反序列化,他們會把生產者和消費者耦合在一起,且容易出錯 同步傳送訊息 非同步傳送訊息 public class kafkaproducerdemo public static void main string args asyncsendme...