php Hyperf 實現 kafka 訊息列隊

2021-10-20 04:43:53 字數 2790 閱讀 4031

注意:我現在用的是 beta版 暫時不能用在專案上,我出現過cup與負載100%的情況,有時候會不知道為什麼停掉,追求穩定還是用 amqp 更好一些。

解壓 

tar -xzf kafka_2.13-2.7.0.tgz
執行 zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties
另開乙個終端視窗 開啟 kafka

bin/kafka-server-start.sh config/server.properties &
建立乙個topic(注意這個引數**裡要對應。用到的埠:9092)

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
可參考hyperf 官網注意有坑

composer require hyperf/kafka
建立配置

php bin/hyperf.php vendor:publish hyperf/kafka
<?php 

declare(strict_types=1);

/** * this file is part of hyperf.

* * @link

* @document

* @contact [email protected]

* @license

*/use hyperf\kafka\constants\kafkastrategy;

return [

'default' => [

'connect_timeout' => -1,

'send_timeout' => -1,

'recv_timeout' => -1,

'client_id' => '',

'max_write_attempts' => 3,

'brokers' => [

'127.0.0.1:9092',

],'bootstrap_server' => [

'127.0.0.1:9092',

],'update_brokers' => true,

'acks' => 0,

'producer_id' => -1,

'producer_epoch' => -1,

'partition_leader_epoch' => -1,

'broker' => '127.0.0.1:9092',

'interval' => 0,

'session_timeout' => 60,

'rebalance_timeout' => 60,

'replica_id' => -1,

'rack_id' => '127.0.0.1:9092',

'group_retry' => 5,

'group_retry_sleep' => 1,

'group_heartbeat' => 3,

'offset_retry' => 5,

'auto_create_topic' => true,

'partition_assignment_strategy' => kafkastrategy::range_assignor,

'pool' => [

'min_connections' => 1,

'max_connections' => 10,

'connect_timeout' => 10.0,

'wait_timeout' => 3.0,

'heartbeat' => -1,

'max_idle_time' => 60.0,

],],

];

建立消費者(注意不能var_dump())

<?php 

declare(strict_types=1);

use hyperf\dbconnection\db;

use hyperf\kafka\abstractconsumer;

use hyperf\kafka\annotation\consumer;

use longlang\phpkafka\consumer\consumemessage;

/** * @consumer(topic="test", nums=1, groupid="hyperf", autocommit=true)

*/class kafkaconsumer extends abstractconsumer

}

建立投遞者(可在controller裡建立方法)

}執行專案會看到kafka的消費端被監聽

訪問介面投遞資料,就會觸發消費者將資料存到資料庫

spark streaming讀取kafka示例

spark streaming讀取kafka示例,其中 spark streaming優雅的關閉策略優化部分參考 如何管理spark streaming消費kafka的偏移量部分參考 spark向kafka中寫入資料部分參考 object demo 建立streamingcontext return...

SparkStreaming 整合kafka例項

核心概念 下面介紹kafka相關概念,以便執行下面例項的同時,更好地理解kafka.接下來在ubuntu系統環境下測試簡單的例項。按順序執行如下命令 進入kafka所在的目錄 命令執行後不會返回shell命令輸入狀態,zookeeper就會按照預設的配置檔案啟動服務,請千萬不要關閉當前終端.啟動新的...

SparkStreaming讀取KafKa資料

import org.apache.kafka.common.serialization.stringdeserializer import org.apache.spark.sparkconf import org.apache.spark.streaming.kafka010.kafkautil...