在ThinkPHP框架下使用Apache Kafka进行分布式消息传递和日志收集,需要以下几个步骤:
使用Composer安装Kafka PHP扩展:
composer require edenhill/kafka-php
在config
目录下创建kafka.php
文件,添加以下内容:
return [
'brokers' => 'kafka:9092',
'topic' => 'test',
'groupId' => 'thinkphp',
'securityProtocol' => 'plaintext',
];
其中,brokers
是Kafka的服务器地址和端口号,topic
是消息主题,groupId
是消费者组ID,securityProtocol
是Kafka的安全协议,这里使用的是明文协议。
在需要发送消息的地方,使用Kafka PHP扩展发送消息:
use RdKafka\Producer;
use RdKafka\ProducerTopic;
$config = config('kafka');
$producer = new Producer();
$producer->setLogLevel(LOG_DEBUG);
$producer->addBrokers($config['brokers']);
$topic = $producer->newTopic($config['topic']);
$topic->produce(RD_KAFKA_PARTITION_UA, 0, 'Hello, Kafka!');
其中,RD_KAFKA_PARTITION_UA
表示使用未分配的分区,0
表示消息键,Hello, Kafka!
是消息内容。
在需要消费消息的地方,使用Kafka PHP扩展消费消息:
use RdKafka\Consumer;
use RdKafka\ConsumerTopic;
use RdKafka\Message;
$config = config('kafka');
$consumer = new Consumer();
$consumer->setLogLevel(LOG_DEBUG);
$consumer->addBrokers($config['brokers']);
$topic = $consumer->newTopic($config['topic']);
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
while (true) {
$message = $topic->consume(0, 1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
echo $message->payload . PHP_EOL;
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
echo $message->errstr() . PHP_EOL;
break;
}
}
其中,RD_KAFKA_OFFSET_BEGINNING
表示从最早的消息开始消费,1000
表示消费超时时间,单位为毫秒。根据消息的错误码进行相应的处理。
可以通过创建一个Kafka
类来集成Kafka到ThinkPHP框架:
namespace app\common;
use RdKafka\Consumer;
use RdKafka\ConsumerTopic;
use RdKafka\Message;
use RdKafka\Producer;
use RdKafka\ProducerTopic;
class Kafka
{
private static $producer;
private static $consumer;
public static function getProducer()
{
if (!self::$producer) {
$config = config('kafka');
self::$producer = new Producer();
self::$producer->setLogLevel(LOG_DEBUG);
self::$producer->addBrokers($config['brokers']);
}
return self::$producer;
}
public static function getConsumer()
{
if (!self::$consumer) {
$config = config('kafka');
self::$consumer = new Consumer();
self::$consumer->setLogLevel(LOG_DEBUG);
self::$consumer->addBrokers($config['brokers']);
}
return self::$consumer;
}
public static function produce($message)
{
$config = config('kafka');
$topic = self::getProducer()->newTopic($config['topic']);
$topic->produce(RD_KAFKA_PARTITION_UA, 0, $message);
}
public static function consume($callback)
{
$config = config('kafka');
$topic = self::getConsumer()->newTopic($config['topic']);
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
while (true) {
$message = $topic->consume(0, 1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$callback($message->payload);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
echo $message->errstr() . PHP_EOL;
break;
}
}
}
}
通过调用Kafka::produce()
方法发送消息,调用Kafka::consume()
方法消费消息。