本頁目錄
kafka源表的實現**於自社群的kafka版本實現。
注意:本文件只適合獨享模式下使用。kafka需要定義的ddl如下。
create
table kafka_stream
(
messagekey varbinary
,
`message`
varbinary
,
topic varchar
,
`partition`
int,
`offset`
bigint
)
with
(
type
='kafka010'
,
topic
='***'
,
`group.id`
='***'
,
bootstrap
.servers
='ip:埠,ip:埠,ip:埠'
);
注意:以上表中的五個字段順序務必保持一致。引數
注釋說明
備註type
kafka對應版本
推薦使用kafka010
topic
讀取的單個topic
topic名稱
(1)kafka08必選配置:
引數注釋說明
備註group.id
無消費組id
zookeeper.connect
zk鏈結位址
zk連線id
(2)kafka09/kafka010/kafka011必選配置:
引數注釋說明
備註group.id
無消費組id
bootstrap.servers
kafka集群位址
kafka集群位址
如果您的kafka是阿里雲商業版,請參考kafka商業版準備配置文件。
如果您的kafka是阿里雲公測版,請參考kafka公測版準備配置文件。
"consumer.id"
,"socket.timeout.ms"
,"fetch.message.max.bytes"
,"num.consumer.fetchers"
,"auto.commit.enable"
,"auto.commit.interval.ms"
,"queued.max.message.chunks"
,"rebalance.max.retries"
,"fetch.min.bytes"
,"fetch.wait.max.ms"
,"rebalance.backoff.ms"
,"refresh.leader.backoff.ms"
,"auto.offset.reset"
,"consumer.timeout.ms"
,"exclude.internal.topics"
,"partition.assignment.strategy"
,"client.id"
,"zookeeper.session.timeout.ms"
,"zookeeper.connection.timeout.ms"
,"zookeeper.sync.time.ms"
,"offsets.storage"
,"offsets.channel.backoff.ms"
,"offsets.channel.socket.timeout.ms"
,"offsets.commit.max.retries"
,"dual.commit.enabled"
,"partition.assignment.strategy"
,"socket.receive.buffer.bytes"
,"fetch.min.bytes"
注意:其它可選配置項參考kafka官方文件:typekafka09
kafka010
kafka011
kafka 版本
kafka08
0.8.22
kafka09
0.9.0.1
kafka010
0.10.2.1
kafka011
0.11.0.2
預設kafka讀到的訊息:
messagekey varbianry
,
message varbianry
,
topic varchar
,
partition
int,
offset bigint
這樣乙個五元組,如果您希望在source階段把資料parser成特定的其它格式,可以按照下面實踐進行。
引數注釋說明
備註parserudtf
自定**析函式
用於解析從kafka讀到的訊息對映到ddl具體對應的型別
如何寫乙個parserudtf參見自定義錶值函式(udtf)。
與阿里雲kafka訊息佇列一樣,ddl定義相同。
create
table kafka_stream
(
messagekey varbinary
,
`message`
varbinary
,
topic varchar
,
`partition`
int,
`offset`
bigint
)
with
(
type
='kafka011'
,
topic
='kafka_01'
,
`group.id`
='cid_blink'
,
bootstrap
.servers
='192.168.0.251:9092'
);
關於自建kafka的with引數,請參考本文件kafka建立時ddl的with引數說明。需要注意的是bootstrap.servers
引數需要填寫自建的位址和埠號。
注意:無論是阿里雲kafka還是自建kafka,目前實時計算均無tps、rps等指標資訊。在作業上線之後,運維介面暫時不支援顯示指標資訊。
相關文件
相關產品
本文**實時計算——
建立訊息佇列(kafka)源表
訊息佇列 訊息佇列 kafka
kafka是乙個分布式的基於發布 訂閱模式的訊息佇列,主要用於大資料實時處理領域。要理解kafka首先要有分布式的概念,要有訊息佇列的概念。分布式系統最大的優勢就是解耦和削峰,這種情況下,a系統生成了乙個訊息,b系統非同步獲取,那麼就需要乙個存放訊息的訊息佇列 mq 相比較傳統的訊息佇列,訊息被消費...
訊息佇列 Kafka學習
kafka是乙個分布式的訊息佇列,學習見apache kafka文件,中文翻譯見kafka分享,乙個簡單的入門例子見kafka 入門例項。本文只針對自己感興趣的點記錄下。producer consumer 訊息的生成者和使用者。broker kafka server充當broker角色,起到訊息佇列...
訊息佇列 Kafka學習
kafka是乙個分布式的訊息佇列,學習見apache kafka文件,中文翻譯見kafka分享,乙個簡單的入門例子見kafka 入門例項。本文只針對自己感興趣的點記錄下。producer consumer 訊息的生成者和使用者。broker kafka server充當broker角色,起到訊息佇列...