老雷编程技术分享之老雷编程技术分享之PHPer的kafka快速入门
老雷编程技术分享之PHPer的kafka快速入门
kafka简介
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。
下载kafka
https://mirror-hk.koddos.net/apache/kafka/2.8.0/kafka_2.12-2.8.0.tgz
启动kafak
Linux
./bin/zookeeper-server-start.sh config/zookeeper.properties ./bin/kafka-server-start.sh config/server.properties
Windows
bin\windows\zookeeper-server-start config\zookeeper.properties bin\windows\kafka-server-start config\server.properties
kafka基础知识
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。类似于缓存Key
生产者
生产者即数据的发布者,该角色将消息发布到Kafka的topic中
消费者
消费者从kafka中读取数据
kafka的PhP SDK
Rdkafka C扩展
http://pecl.php.net/package/rdkafka
phpkafka PHP库
https://github.com/swoole/phpkafka
我们使用phpkafka,composer安装
composer require longlang/phpkafka
简单Demo
生产者
<?php use longlang\phpkafka\Producer\Producer; use longlang\phpkafka\Producer\ProducerConfig; require dirname(__DIR__) . '/vendor/autoload.php'; $config = new ProducerConfig(); $config->setBootstrapServer('127.0.0.1:9092'); $config->setUpdateBrokers(true); $config->setAcks(-1); $producer = new Producer($config); $time=date("Y-m-d H:i:s"); $producer->send('test-http', $time); echo "生产内容".$time;
消费者
<?php use longlang\phpkafka\Consumer\ConsumeMessage; use longlang\phpkafka\Consumer\Consumer; use longlang\phpkafka\Consumer\ConsumerConfig; require dirname(__DIR__) . '/vendor/autoload.php'; function consume(ConsumeMessage $message): void { var_dump($message->getKey() . ':' . $message->getValue()); } $config = new ConsumerConfig(); $config->setBroker('127.0.0.1:9092'); $config->setTopic('test-http'); // 主题名称 $config->setGroupId('testGroup'); // 分组ID $config->setClientId('test'); // 客户端ID $config->setGroupInstanceId('test'); // 分组实例ID $consumer = new Consumer($config); $message = $consumer->consume(); if($message){ var_dump($message->getKey() . ':' . $message->getValue()); $consumer->ack($message); // 消费成功反馈 }else{ echo "无消费"; }