注意:我現在用的是 beta版 暫時不能用在專案上,我出現過cup與負載100%的情況,有時候會不知道為什麼停掉,追求穩定還是用 amqp 更好一些。
解壓
tar -xzf kafka_2.13-2.7.0.tgz
執行 zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
另開乙個終端視窗 開啟 kafka
bin/kafka-server-start.sh config/server.properties &
建立乙個topic(注意這個引數**裡要對應。用到的埠:9092)
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
可參考hyperf 官網注意有坑
composer require hyperf/kafka
建立配置
php bin/hyperf.php vendor:publish hyperf/kafka
<?php
declare(strict_types=1);
/** * this file is part of hyperf.
* * @link
* @document
* @contact [email protected]
* @license
*/use hyperf\kafka\constants\kafkastrategy;
return [
'default' => [
'connect_timeout' => -1,
'send_timeout' => -1,
'recv_timeout' => -1,
'client_id' => '',
'max_write_attempts' => 3,
'brokers' => [
'127.0.0.1:9092',
],'bootstrap_server' => [
'127.0.0.1:9092',
],'update_brokers' => true,
'acks' => 0,
'producer_id' => -1,
'producer_epoch' => -1,
'partition_leader_epoch' => -1,
'broker' => '127.0.0.1:9092',
'interval' => 0,
'session_timeout' => 60,
'rebalance_timeout' => 60,
'replica_id' => -1,
'rack_id' => '127.0.0.1:9092',
'group_retry' => 5,
'group_retry_sleep' => 1,
'group_heartbeat' => 3,
'offset_retry' => 5,
'auto_create_topic' => true,
'partition_assignment_strategy' => kafkastrategy::range_assignor,
'pool' => [
'min_connections' => 1,
'max_connections' => 10,
'connect_timeout' => 10.0,
'wait_timeout' => 3.0,
'heartbeat' => -1,
'max_idle_time' => 60.0,
],],
];
建立消費者(注意不能var_dump())
<?php
declare(strict_types=1);
use hyperf\dbconnection\db;
use hyperf\kafka\abstractconsumer;
use hyperf\kafka\annotation\consumer;
use longlang\phpkafka\consumer\consumemessage;
/** * @consumer(topic="test", nums=1, groupid="hyperf", autocommit=true)
*/class kafkaconsumer extends abstractconsumer
}
建立投遞者(可在controller裡建立方法)
}執行專案會看到kafka的消費端被監聽
訪問介面投遞資料,就會觸發消費者將資料存到資料庫
spark streaming讀取kafka示例
spark streaming讀取kafka示例,其中 spark streaming優雅的關閉策略優化部分參考 如何管理spark streaming消費kafka的偏移量部分參考 spark向kafka中寫入資料部分參考 object demo 建立streamingcontext return...
SparkStreaming 整合kafka例項
核心概念 下面介紹kafka相關概念,以便執行下面例項的同時,更好地理解kafka.接下來在ubuntu系統環境下測試簡單的例項。按順序執行如下命令 進入kafka所在的目錄 命令執行後不會返回shell命令輸入狀態,zookeeper就會按照預設的配置檔案啟動服務,請千萬不要關閉當前終端.啟動新的...
SparkStreaming讀取KafKa資料
import org.apache.kafka.common.serialization.stringdeserializer import org.apache.spark.sparkconf import org.apache.spark.streaming.kafka010.kafkautil...