tencent cloud

文档反馈

消费组消费

最后更新时间:2024-04-23 11:35:32

    前提条件

    1. 开通日志服务,创建 日志集日志主题,并成功采集到日志数据。
    2. 子账号/协作者需要主账号授权,授权步骤参见 基于 CAM 管理权限,复制授权策略参见 CLS 访问策略模板

    消费组消费流程

    使用消费组消费数据时,服务端会管理消费组里的所有消费者的消费任务,根据主题分区和消费者的数量关系自动调整消费任务的均衡性,同时会记录每个主题分区的消费进度,保证不同消费者可以无重复消费数据。消费组消费的具体流程如下:
    1. 创建消费组。
    2. 每个消费者定期向服务端发送心跳。
    3. 消费组根据主题分区负载情况自动分配主题分区给消费者。
    4. 消费者根据所分配的分区列表,获取分区 offset 并消费数据。
    5. 消费者周期性地更新分区的消费进度到消费组,便于下次消费组分配任务。
    6. 重复步骤2 - 步骤6,直至消费结束。

    消费参数说明

    配置参数
    说明
    默认值
    取值范围
    endpoint
    请求域名API 上传日志标签页面的域名
    -
    支持地域:北京,上海,广州,南京,中国香港,东京,美东,新加坡,法兰克福
    access_key_id
    用户的 Secret_id,请前往 CAM 查看
    -
    -
    access_key
    用户的 Secret_key,请前往 CAM 查看
    -
    -
    region
    主题所在地域,例如 ap-beijing、ap_guangzhou、ap-shanghai,详情请参见 地域和访问域名
    -
    支持地域:北京,上海,广州,南京,中国香港,东京,美东,新加坡,法兰克福
    logset_id
    日志集 ID,仅支持一个日志集
    -
    -
    topic_ids
    日志主题 ID,多个主题请使用','隔开
    -
    -
    consumer_group_name
    消费者组名称
    -
    -
    internal
    内网:TRUE
    公网:FALSE
    说明:
    内网/外网读流量费用请参见 产品定价
    FALSE
    TRUE/FALSE
    consumer_name
    消费者名称。同一个消费者组内,消费者名称不可重复
    -
    0-9、aA-zZ、'-'、'_'、'.'组成的字符串
    heartbeat_interval
    消费者心跳上报间隔,2个间隔没有上报心跳,会被认为是消费者下线
    20
    0-30分钟
    data_fetch_interval
    消费者拉取数据间隔,不小于1秒
    2
    -
    offset_start_time
    拉取数据的开始时间,字符串类型的 unix 时间戳,精度为秒,例如"1711607794",也可以直接可配置为"begin"和"end"。
    begin:日志主题生命周期内的最早数据
    end:日志主题生命周期内的最新数据
    "end"
    "begin"/"end"/unix时间戳
    max_fetch_log_group_size
    消费者单次拉取数据大小,默认2M,最大10M
    2097152
    2M - 10M
    offset_end_time
    拉取数据的结束时间,支持字符串类型的 unix 时间戳,精度为秒,例如"1711607794"。不填写代表持续拉取
    -
    -

    消费 Demo (Python)

    # -*- coding: utf-8 -*-
    
    import os
    from threading import RLock
    from tencentcloud.log.consumer import *
    from tencentcloud.log.logclient import YunApiLogClient
    
    root = logging.getLogger()
    
    handler = logging.StreamHandler()
    
    handler.setFormatter(logging.Formatter(
    
    fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s',
    
    datefmt='%Y-%m-%d %H:%M:%S'))
    
    root.setLevel(logging.INFO)
    
    root.addHandler(handler)
    
    logger = logging.getLogger(__name__)
    
    class SampleConsumer(ConsumerProcessorBase):
    #记录上次消费offset的时间。
    last_check_time = 0
    #记录消费的日志数据
    log_results = []
    
    lock = RLock()
    
    def initialize(self, topic_id):
    
    self.topic_id = topic_id
    
    def process(self, log_groups, offset_tracker):
    #获取消费的日志数据
    for log_group in log_groups:
    
    for log in log_group.logs:
    
    item = dict()
    
    item['time'] = log.time
    
    item['filename'] = log_group.filename
    
    item['source'] = log_group.source
    
    for content in log.contents:
    
    item[content.key] = content.value
    
    with SampleConsumer.lock:
    
    SampleConsumer.log_results.append(item)
    #提交offset
    current_time = time.time()
    
    if current_time - self.last_check_time > 3:
    
    try:
    
    self.last_check_time = current_time
    
    offset_tracker.save_offset(True)
    
    except Exception:
    
    import traceback
    
    traceback.print_exc()
    
    else:
    
    try:
    
    offset_tracker.save_offset(False)
    
    except Exception:
    
    import traceback
    
    traceback.print_exc()
    
    return None
    
    def shutdown(self, offset_tracker):
    
    try:
    
    offset_tracker.save_offset(True)
    
    except Exception:
    
    import traceback
    
    traceback.print_exc()
    
    
    def sleep_until(seconds, exit_condition=None, expect_error=False):
    
    if not exit_condition:
    
    time.sleep(seconds)
    
    return
    
    s = time.time()
    
    while time.time() - s < seconds:
    
    try:
    
    if exit_condition():
    
    break
    
    except Exception:
    
    if expect_error:
    
    continue
    
    time.sleep(1)
    # 创建消费组
    def sample_consumer_group():
    
    # load options from envs
    #请求域名
    endpoint = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ENDPOINT', '')
    #用户的Secret_id
    access_key_id = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ACCESSID', '')
    #用户的Secret_key
    access_key = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ACCESSKEY', '')
    #日志集id
    logset_id = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_LOGSET_ID', '')
    #日志主题id列表,支持多个
    topic_ids = ['topic_id_1','topic_id_2']
    #消费组,同一个日志集下的消费组名称唯一
    consumer_group = 'consumer-group-1'
    #消费者1
    consumer_name1 = "consumer-group-1-A"
    #消费者2
    consumer_name2 = "consumer-group-1-B"
    #地域
    region = "ap-guangzhou"
    
    assert endpoint and access_key_id and access_key and logset_id, ValueError("endpoint/access_id/access_key and logset_id cannot be empty")
    #创建访问云API接口的Client
    client = YunApiLogClient(access_key_id, access_key, region=region)
    #初始化消费结果列表
    SampleConsumer.log_results = []
    
    try:
    
    # 创建两个消费者配置
    option1 = LogHubConfig(endpoint, access_key_id, access_key, region, logset_id, topic_ids, consumer_group,
    
    consumer_name1, heartbeat_interval=3, data_fetch_interval=1,
    
    offset_start_time="end", max_fetch_log_group_size=10485760)
    
    option2 = LogHubConfig(endpoint, access_key_id, access_key, region, logset_id, topic_ids, consumer_group,
    
    consumer_name2, heartbeat_interval=3, data_fetch_interval=1,
    
    offset_start_time="end", max_fetch_log_group_size=10485760)
    
    print("*** start to consume data...")
    #消费者1
    client_worker1 = ConsumerWorker(SampleConsumer, consumer_option=option1)
    #启动消费者1
    client_worker1.start()
    
    client_worker2 = ConsumerWorker(SampleConsumer, consumer_option=option2)
    
    client_worker2.start()
    sleep_until(120, lambda: len(SampleConsumer.log_results) > 0)
    
    print("*** consumer group status ***")
    #打印消费组信息:消费组的名称、消费的日志主题、消费者心跳超时时间
    ret = client.list_consumer_group(logset_id, topic_ids)
    ret.log_print()
    
    print("*** stopping workers")
    #关闭消费者
    client_worker1.shutdown()
    
    client_worker2.shutdown()
    
    print("*** delete consumer group")
    #删除消费者组
    client.delete_consumer_group(logset_id, consumer_group)
    
    except Exception as e:
    
    raise e
    #打印消费的日志数据
    ret = str(SampleConsumer.log_results)
    
    print("*** get content:")
    
    print(ret)
    
    
    if __name__ == '__main__':
    
    sample_consumer_group()
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持