当前位置 博文首页 > 文章内容

    详解PHP实现生产者与消费者(Kafka应用)

    作者:shunshunshun18 栏目:未分类 时间:2021-03-25 10:43:42

    本站于2023年9月4日。收到“大连君*****咨询有限公司”通知
    说我们IIS7站长博客,有一篇博文用了他们的图片。
    要求我们给他们一张图片6000元。要不然法院告我们

    为避免不必要的麻烦,IIS7站长博客,全站内容图片下架、并积极应诉
    博文内容全部不再显示,请需要相关资讯的站长朋友到必应搜索。谢谢!

    另祝:版权碰瓷诈骗团伙,早日弃暗投明。

    相关新闻:借版权之名、行诈骗之实,周某因犯诈骗罪被判处有期徒刑十一年六个月

    叹!百花齐放的时代,渐行渐远!



    本篇文章给大家介绍PHP实现生产者与消费者,希望对需要的朋友有所帮助!

    前言

    PHP中使用Kafka需要RdKafka扩展,而RdKafka依赖于librdkafka,所以这两个我们都需要安装,具体安装方法自行百度,本篇不做说明了。

    生产者(测试)

    创建消费者需要步骤:

    • 生产者配置参数
    • 创建生产者实例
    • 创建主题实例(依赖生产者)
    • 生产主题消息
    • 推送消息

    具体代码如下:

            $conf = new \RdKafka\Conf();
            // 绑定服务节点
            $conf->set('metadata.broker.list', '127.0.0.1:32772');
    
            // 创建生产者
            $kafka = new \RdKafka\Producer($conf);
    
            // 创建主题实例
            $topic = $kafka->newTopic('p1r1');
            // 生产主题数据,此时消息在缓冲区中,并没有真正被推送
            $topic->produce(RD_KAFKA_PARTITION_UA, 0, 'Message');
            // 阻塞时间(毫秒), 0为非阻塞
            $kafka->poll(0); 
    
            // 推送消息,如果不调用此函数,消息不会被发送且会丢失
            $result = $kafka->flush(5000);
    
            if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
                throw new \RuntimeException('Was unable to flush, messages might be lost!');
            }

    消费者

    创建一个消费者需要几个步骤:

    • 消费者配置参数
    • 应用配置参数创建消费者实例
    • 订阅对应主题
    • 拉取数据
    • 提交位移

    具体代码如下:

            $conf = new \RdKafka\Conf();
            // 绑定消费者组
            $conf->set('group.id', 'ceshi');
            // 绑定服务节点,多个用,分隔
            $conf->set('metadata.broker.list', '127.0.0.1:32787');
            // 设置自动提交为false
            $conf->set('enable.auto.commit', 'false');
            // 设置当前消费者拉取数据时的偏移量, 可选参数:
            // earliest: 如果消费者组是新创建的,从头开始消费,否则从消费者组当前消费位移开始。
            // latest:如果消费者组是新创建的,从最新偏移量开始,否则从消费者组当前消费位移开始。
            $conf->set('auto.offset.reset', 'earliest');
    
            // 创建消费者实例
            $consumer = new \RdKafka\KafkaConsumer($conf);
            // 消费者订阅主题,数组形式
            $consumer->subscribe(['topic1','topic2']);
            while (true) {
                // 消费数据,阻塞5秒(5秒内有数据就消费,没有数据等待5秒进入下一轮循环)
                $message = $consumer->consume(5000);
                switch ($message->err) {
                    case RD_KAFKA_RESP_ERR_NO_ERROR:
                        // 业务逻辑
                        var_dump($message);
    
                        // 提交位移
                        $consumer->commit($message);
                        break;
                    case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                        echo "No more messages; will wait for more\n";
                        break;
                    case RD_KAFKA_RESP_ERR__TIMED_OUT:
                        echo "Timed out\n";
                        break;
                    default:
                        throw new \Exception($message->errstr(), $message->err);
                        break;
                }
            }
            // 关闭消费者(一般用在脚本中,不需要关闭)
            $conumser->close();

    只消费指定分区中的数据:

        // 对消费者指定分区,注意此方式不能与subscribe一同使用
        $consumer->assign([
            new RdKafka\TopicPartition("topic", 0),
            new RdKafka\TopicPartition("topic", 1),
        ]);