sender.completebatch
if(error != errors.none &&
canretry
(batch, error)
) on topic-partition {}, retrying ({} attempts left). error: {}"
, correlationid,
batch.topicpartition,
this
.retries - batch.attempts -1,
error)
;//重新把傳送失敗等著批次 加入到佇列裡面。
this
.accumulator.
reenqueue
(batch, now)
;this
.sensors.
recordretries
(batch.topicpartition.
topic()
, batch.recordcount);}
else
else}}
}}if(
!expiredbatches.
isempty()
) log.
trace
("expired {} batches in accumulator"
, count)
;return expiredbatches;
}batch.maybeexpire
public
boolean
maybeexpire
(int requesttimeoutms,
long retrybackoffms,
long now,
long lingerms,
boolean isfull)
elseif(
!this
.inretry()
&& requesttimeoutms <
(now -
(this
.createdms + lingerms)))
elseif(
this
.inretry()
&& requesttimeoutms <
(now -
(this
.lastattemptms + retrybackoffms)))
if(expire)
return expire;
}
networkclient.poll.handletimedoutrequests
private
void
handletimedoutrequests
(list
responses,
long now)
due to request timeout."
, nodeid)
;//我們猜應該是會去修改 連線的狀態
processdisconnection
(responses, nodeid, now);}
// we disconnected, so we should probably refresh our metadata
if(nodeids.
size()
>0)
metadataupdater.
requestupdate()
;}processdisconnection
private
void
processdisconnection
(list
responses, string nodeid,
long now)
due to node {} being disconnected"
, request, nodeid);if
(!metadataupdater.
maybehandledisconnection
(request)
)//對這些請求進行處理
//大家會看到乙個比較有意思的事
//自己封裝了乙個響應。這個響應裡面沒有服務端響應訊息(服務端沒給響應)
//失去連線的狀態表標識為true
responses.
add(
newclientresponse
(request, now,
true
, null));
}}disconnected
public
void
disconnected
(string id,
long now)
kafka producer 分割槽器
策略一 如果傳送訊息的時候,沒有指定key,輪詢達到負載均衡 策略二 這個地方就是指定了key,hash取模,相同的key打到同乙個分割槽上 int partition partition record,serializedkey,serializedvalue,cluster return par...
kafka producer寫詳細過程
1 2 客戶端寫入引數。batch.size 通過這個引數來設定批量提交的資料大小,預設是16k,當積壓的訊息達到這個值的時候就會統一傳送 發往同一分割槽的訊息 linger.ms 這個設定是為傳送設定一定是延遲來收集更多的訊息,預設大小是0ms 就是有訊息就立即傳送 滿足上述任意一條件即傳送訊息。...
三 Kafka Producer傳送訊息及分割槽策略
1 producer 實現 ps 不建議使用自定義序列化和反序列化,他們會把生產者和消費者耦合在一起,且容易出錯 同步傳送訊息 非同步傳送訊息 public class kafkaproducerdemo public static void main string args asyncsendme...