wget --no-check-certificate https://pecl.php.net/get/rdkafka-4.1.2.tgzpear install rdkafka-4.1.2.tgz# If the installation is successful, the messages "install ok" and "You should add extension=rdkafka.so to php.ini" will be displayed.# If the installation fails, follow the prompts to solve the issue.# After successful installation, add extension=rdkafka.so to php.ini.# After php --ini is run, the Loaded Configuration File: shows the location of php.ini.echo 'extension=rdkafka.so' >> /etc/php.ini
<?phpreturn ['bootstrap_servers' => 'bootstrap_servers1:port,bootstrap_servers2:port','topic_name' => 'topic_name','group_id' => 'php-demo',];
Parameter | Description |
bootstrap_servers | Access network. On the Basic Info page of the instance in the console, select the Access Mode module and copy the network information from the Network column. |
topic_name | Topic name. Copy the name on the Topic List page in the console. |
group_id | Consumer group ID. You can set the ID and see the consumer on the Consumer Group page after successful demo running. |
<?php$setting = require __DIR__ . '/CKafkaSetting.php';$conf = new RdKafka\\Conf();// Set the entry service. Obtain the corresponding service address in the console.$conf->set('bootstrap.servers', $setting['bootstrap_servers']);// 3 acknowledgment mechanisms are available for the Kafka producer, as described below:// -1 or all: The broker responds to the producer to continue sending the next (batch of) message(s) only after the leader receives the data and synchronizes it to the follower in all ISRs.// This configuration ensures high data reliability. As long as there is a synchronized replica alive, no message will be lost. Note: This configuration does not ensure all replicas are written before the data is returned.// Can be used in conjunction with the topic level parameter min.insync.replicas.// 0: The producer continues to send the next (batch of) message(s) without waiting for the broker acknowledgment that the synchronization is completed. This configuration provides high production performance but low data reliability.//(Data may be lost if the broker server where the leader replica is stored fails, because the server will not receive any message if the producer is unaware of the failure.)// 1: The producer sends the next (batch of) message(s) after the leader has successfully received the data as acknowledged. This configuration balances the production throughput and data reliability.//(Messages may be lost if the broker server where the leader replica is stored fails, but the replica is not copied.)// The default value 1 is used if the configuration is not displayed. You can set it based on your business requirements.$conf->set('acks', '1');// The number of retries when a request error occurs. It is recommended to set this value to greater than 0 to ensure that the message is not lost to the maximum extent during failed retries.$conf->set('retries', '0');// The time between the failed request transmission and the next retry request.$conf->set('retry.backoff.ms', 100);// The timeout period for producer network requests.$conf->set('socket.timeout.ms', 6000);$conf->set('reconnect.backoff.max.ms', 3000);// Register a callback for message sending.$conf->setDrMsgCb(function ($kafka, $message) {echo '**Producer**Send messages: message=' . var_export($message, true) . "\\n";});// Register a callback for message sending errors.$conf->setErrorCb(function ($kafka, $err, $reason) {echo "**Producer** Message sending errors: err=$err reason=$reason \\n";});$producer = new RdKafka\\Producer($conf);// Set it to LOG_DEBUG if debugging is enabled.//$producer->setLogLevel(LOG_DEBUG);$topicConf = new RdKafka\\TopicConf();$topic = $producer->newTopic($setting['topic_name'], $topicConf);// Produce and send messages.for ($i = 0; $i < 5; $i++) {// RD_KAFKA_PARTITION_UA allows Kafka to choose the partition freely.$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");$producer->poll(0);}while ($producer->getOutQLen() > 0) {$producer->poll(50);}echo "**Producer** Message sent successfully.\\n";
php Producer.php
>**Producer**Send message: message=RdKafka\\Message::__set_state(array(> 'err' => 0,> 'topic_name' => 'topic_name',> 'timestamp' => 1618800895159,> 'partition' => 0,> 'payload' => 'Message 0',> 'len' => 9,> 'key' => NULL,> 'offset' => 0,> 'headers' => NULL,>))>**Producer**Send message: message=RdKafka\\Message::__set_state(array(> 'err' => 0,> 'topic_name' => 'topic_name',> 'timestamp' => 1618800895159,> 'partition' => 0,> 'payload' => 'Message 1',> 'len' => 9,> 'key' => NULL,> 'offset' => 1,> 'headers' => NULL,>))...>**Producer**Message sent successfully.
<?php$setting = require __DIR__ . '/CKafkaSetting.php';$conf = new RdKafka\\Conf();$conf->set('group.id', $setting['group_id']);// Set the entry service. Obtain the corresponding service address in the console.$conf->set('bootstrap.servers', $setting['bootstrap_servers']);// Consumer timeout interval when the Kafka consumer group mechanism is used. If the broker does not receive the heartbeat from the consumer within this interval,// the consumer is considered to be failed, and the broker initiates the rebalancing process again.$conf->set('session.timeout.ms', 10000);// Client request timeout period. If no response is received after this period, the request times out and fails.$conf->set('request.timeout.ms', 305000);// Set the interval of internal retries on the client.$conf->set('reconnect.backoff.max.ms', 3000);$topicConf = new RdKafka\\TopicConf();#$topicConf->set('auto.commit.interval.ms', 100);// Offset reset policy, which is set based on the actual business scenario. Improper settings may result in the loss of consumed data.$topicConf->set('auto.offset.reset', 'earliest');$conf->setDefaultTopicConf($topicConf);$consumer = new RdKafka\\KafkaConsumer($conf);// Set it to LOG_DEBUG if debugging is enabled.//$consumer->setLogLevel(LOG_DEBUG);$consumer->subscribe([$setting['topic_name']]);$isConsuming = true;while ($isConsuming) {$message = $consumer->consume(10 * 1000);switch ($message->err) {case RD_KAFKA_RESP_ERR_NO_ERROR:echo "**Consumer** received the message: " . var_export($message, true) . "\\n";break;case RD_KAFKA_RESP_ERR__PARTITION_EOF:echo "**Consumer** waiting for messages\\n";break;case RD_KAFKA_RESP_ERR__TIMED_OUT:echo "**Consumer** waiting timeout\\n";$isConsuming = false;break;default:throw new \\Exception($message->errstr(), $message->err);break;}}
php Consumer.php
>**Consumer** received the message: RdKafka\\Message::__set_state(array(> 'err' => 0,> 'topic_name' => 'topic_name',> 'timestamp' => 1618800895159,> 'partition' => 0,> 'payload' => 'Message 0',> 'len' => 9,> 'key' => NULL,> 'offset' => 0,> 'headers' => NULL,>))>**Consumer** received the message: RdKafka\\Message::__set_state(array(> 'err' => 0,> 'topic_name' => 'topic_name',> 'timestamp' => 1618800895159,> 'partition' => 0,> 'payload' => 'Message 1',> 'len' => 9,> 'key' => NULL,> 'offset' => 1,> 'headers' => NULL,>))...

Feedback