新功能发布记录
Broker 版本升级记录
公告
pip install kafka-python
yum install -y python3-pippip3 install kafka-python
from kafka import KafkaProducer# 创建生产者producer = KafkaProducer(bootstrap_servers='$domainName:$port', # 替换为你的 Kafka 地址api_version=(2, 8, 0), # 显式指定协议版本(根据集群实例调整)retries=2147483647, # 重试次数设为 int 最大值(无限重试)retry_backoff_ms=1000, # 重试间隔 1 秒acks=1 # 只等待 leader 确认,不等待所有副本)# 发送消息for i in range(5):msg = f"Message {i}"future = producer.send('$topic_name', value=msg.encode('utf-8')) # $topic_name需要提前在控制台创建,并注意替换result = future.get(timeout=10)print(f"发送成功: '{msg}' -> 分区={result.partition}, 偏移量={result.offset}")producer.close()
参数 | 描述 |
bootstrap_servers | 接入网络,在控制台的实例基本信息页面的接入方式模块的网络列复制。 |
topic_name | Topic 名称,您可以在控制台上 Topic 列表页面复制。 |


from kafka import KafkaConsumer# 创建消费者consumer = KafkaConsumer('$topic_name', # topic名字bootstrap_servers='$domainName:$port', # 替换为你的 Kafka 地址group_id='$group_id', # 指定消费组名字auto_offset_reset='earliest', # 从最早消息开始读api_version=(2, 8, 0), # 显式指定协议版本enable_auto_commit=True)print("开始消费消息...")try:for msg in consumer:value = msg.value.decode('utf-8')print(f"收到消息: {value}")except KeyboardInterrupt:print("消费者已停止。")finally:consumer.close()
参数 | 描述 |
bootstrap_servers | 接入网络,在控制台的实例基本信息页面的接入方式模块的网络列复制。 |
group_id | 消费者的组 ID,根据业务需求自定义 |
topic_name | Topic 名称,您可以在控制台上 Topic 列表页面复制。 |


文档反馈