php處理kafka訊息

2021-08-20 11:29:53 字數 1423 閱讀 3518

php如果要使用kafka的話,需要安裝一下kafka-php:

# composer require nmred/kafka-php
kafka-php的github位址

先寫乙個kafka_producer.php,用來做為生產者:

<?php

require

'/var/www/extend/vendor/autoload.php';

$config = \kafka\producerconfig::getinstance();

// 指定kafka broker列表,有多個的話用逗號分隔

$config->setmetadatabrokerlist('localhost:9092');

$producer = new \kafka\producer(

function

() );

$producer->success(function

($result) );

$producer->error(function

($errorcode) );

$producer->send(true);

然後,再寫乙個kafka_consumer.php,用來做為消費者:

<?php

require

'/var/www/extend/vendor/autoload.php';

$config = \kafka\consumerconfig::getinstance();

$config->setmetadatabrokerlist('localhost:9092');

// 消費模組的分組id

$config->setgroupid('haha');

// 將要消費的kafka topic名稱

$config->settopics(['test']);

// 如果消費offset失效的時候重置offset的策略

$config->setoffsetreset('latest');

$consumer = new \kafka\consumer();

$consumer->start(function

($topic, $part, $message) );

下面,先啟動kafka_consumer:

# php kafka_consumer.php
切換另乙個終端視窗,啟動kafka_producer:

# php kafka_producer.php
現在,返回執行kafka_consumer的終端視窗,會發現經收到了訊息:

array(3) 

}

PHP 訊息佇列 Kafka 使用

安裝 kafka 服務 wget 解壓,進入目錄 tar zxvf kafka 2.13 2.5.0.tgz cd kafka 2.13 2.5.0 啟動 kafka 服務 使用安裝包中的指令碼啟動單節點 zookeeper 例項 bin zookeeper server start.sh daem...

關於kafka處理大訊息的方法

最近發現kafka在傳送一些大訊息的時候會報錯,修改了配置max.request.size。問題依舊。後來查閱了一下,都說要調大限制message大小的引數,不過試過之後發現貌似沒什麼作用。查閱文件發現之前用的客戶端kafka已經三年沒更新了0.0,後改為目前官方推薦的客戶端confluent ka...

訊息佇列 訊息佇列 kafka

kafka是乙個分布式的基於發布 訂閱模式的訊息佇列,主要用於大資料實時處理領域。要理解kafka首先要有分布式的概念,要有訊息佇列的概念。分布式系統最大的優勢就是解耦和削峰,這種情況下,a系統生成了乙個訊息,b系統非同步獲取,那麼就需要乙個存放訊息的訊息佇列 mq 相比較傳統的訊息佇列,訊息被消費...