老雷编程技术分享之老雷编程技术分享之PHPer的kafka快速入门
查看视频教程或者获取有关《老雷编程技术分享》更多信息

老雷编程技术分享之PHPer的kafka快速入门

kafka简介

    Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。

    7.png

下载kafka

https://mirror-hk.koddos.net/apache/kafka/2.8.0/kafka_2.12-2.8.0.tgz

启动kafak

Linux

./bin/zookeeper-server-start.sh config/zookeeper.properties
./bin/kafka-server-start.sh config/server.properties


Windows

bin\windows\zookeeper-server-start config\zookeeper.properties
bin\windows\kafka-server-start  config\server.properties

kafka基础知识

Topic

                每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。类似于缓存Key

生产者

                生产者即数据的发布者,该角色将消息发布到Kafka的topic中

消费者

        消费者从kafka中读取数据

kafka的PhP SDK

Rdkafka C扩展

http://pecl.php.net/package/rdkafka

phpkafka PHP库

https://github.com/swoole/phpkafka

我们使用phpkafka,composer安装 

    composer require longlang/phpkafka


简单Demo

生产者

<?php
use longlang\phpkafka\Producer\Producer;
use longlang\phpkafka\Producer\ProducerConfig;
require dirname(__DIR__) . '/vendor/autoload.php';
$config = new ProducerConfig();
$config->setBootstrapServer('127.0.0.1:9092');
$config->setUpdateBrokers(true);
$config->setAcks(-1);
$producer = new Producer($config);
$time=date("Y-m-d H:i:s");
$producer->send('test-http', $time);
echo "生产内容".$time;


消费者

<?php
use longlang\phpkafka\Consumer\ConsumeMessage;
use longlang\phpkafka\Consumer\Consumer;
use longlang\phpkafka\Consumer\ConsumerConfig;
require dirname(__DIR__) . '/vendor/autoload.php';
function consume(ConsumeMessage $message): void
{
var_dump($message->getKey() . ':' . $message->getValue());
}
$config = new ConsumerConfig();
$config->setBroker('127.0.0.1:9092');
$config->setTopic('test-http'); // 主题名称
$config->setGroupId('testGroup'); // 分组ID
$config->setClientId('test'); // 客户端ID
$config->setGroupInstanceId('test'); // 分组实例ID
$consumer = new Consumer($config);
$message = $consumer->consume();
if($message){
var_dump($message->getKey() . ':' . $message->getValue());
$consumer->ack($message); // 消费成功反馈
}else{
echo "无消费";
}