配置参数 | 说明 | 默认值 | 取值范围 |
endpoint | - | 支持地域:北京,上海,广州,南京,中国香港,东京,美东,新加坡,法兰克福 | |
access_key_id | - | - | |
access_key | - | - | |
region | - | 支持地域:北京,上海,广州,南京,中国香港,东京,美东,新加坡,法兰克福 | |
logset_id | 日志集 ID,仅支持一个日志集 | - | - |
topic_ids | 日志主题 ID,多个主题请使用','隔开 | - | - |
consumer_group_name | 消费者组名称 | - | - |
internal | 内网:TRUE 公网:FALSE 说明: | FALSE | TRUE/FALSE |
consumer_name | 消费者名称。同一个消费者组内,消费者名称不可重复 | - | 0-9、aA-zZ、'-'、'_'、'.'组成的字符串 |
heartbeat_interval | 消费者心跳上报间隔,2个间隔没有上报心跳,会被认为是消费者下线 | 20 | 0-30分钟 |
data_fetch_interval | 消费者拉取数据间隔,不小于1秒 | 2 | - |
offset_start_time | 拉取数据的开始时间,字符串类型的 unix 时间戳,精度为秒,例如"1711607794",也可以直接可配置为"begin"和"end"。 begin:日志主题生命周期内的最早数据 end:日志主题生命周期内的最新数据 | "end" | "begin"/"end"/unix时间戳 |
max_fetch_log_group_size | 消费者单次拉取数据大小,默认2M,最大10M | 2097152 | 2M - 10M |
offset_end_time | 拉取数据的结束时间,支持字符串类型的 unix 时间戳,精度为秒,例如"1711607794"。不填写代表持续拉取 | - | - |
# -*- coding: utf-8 -*-import osfrom threading import RLockfrom tencentcloud.log.consumer import *from tencentcloud.log.logclient import YunApiLogClientroot = logging.getLogger()handler = logging.StreamHandler()handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s',datefmt='%Y-%m-%d %H:%M:%S'))root.setLevel(logging.INFO)root.addHandler(handler)logger = logging.getLogger(__name__)class SampleConsumer(ConsumerProcessorBase):#记录上次消费offset的时间。last_check_time = 0#记录消费的日志数据log_results = []lock = RLock()def initialize(self, topic_id):self.topic_id = topic_iddef process(self, log_groups, offset_tracker):#获取消费的日志数据for log_group in log_groups:for log in log_group.logs:item = dict()item['time'] = log.timeitem['filename'] = log_group.filenameitem['source'] = log_group.sourcefor content in log.contents:item[content.key] = content.valuewith SampleConsumer.lock:SampleConsumer.log_results.append(item)#提交offsetcurrent_time = time.time()if current_time - self.last_check_time > 3:try:self.last_check_time = current_timeoffset_tracker.save_offset(True)except Exception:import tracebacktraceback.print_exc()else:try:offset_tracker.save_offset(False)except Exception:import tracebacktraceback.print_exc()return Nonedef shutdown(self, offset_tracker):try:offset_tracker.save_offset(True)except Exception:import tracebacktraceback.print_exc()def sleep_until(seconds, exit_condition=None, expect_error=False):if not exit_condition:time.sleep(seconds)returns = time.time()while time.time() - s < seconds:try:if exit_condition():breakexcept Exception:if expect_error:continuetime.sleep(1)# 创建消费组def sample_consumer_group():# load options from envs#请求域名endpoint = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ENDPOINT', '')#用户的Secret_idaccess_key_id = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ACCESSID', '')#用户的Secret_keyaccess_key = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ACCESSKEY', '')#日志集idlogset_id = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_LOGSET_ID', '')#日志主题id列表,支持多个topic_ids = ['topic_id_1','topic_id_2']#消费组,同一个日志集下的消费组名称唯一consumer_group = 'consumer-group-1'#消费者1consumer_name1 = "consumer-group-1-A"#消费者2consumer_name2 = "consumer-group-1-B"#地域region = "ap-guangzhou"assert endpoint and access_key_id and access_key and logset_id, ValueError("endpoint/access_id/access_key and logset_id cannot be empty")#创建访问云API接口的Clientclient = YunApiLogClient(access_key_id, access_key, region=region)#初始化消费结果列表SampleConsumer.log_results = []try:# 创建两个消费者配置option1 = LogHubConfig(endpoint, access_key_id, access_key, region, logset_id, topic_ids, consumer_group,consumer_name1, heartbeat_interval=3, data_fetch_interval=1,offset_start_time="end", max_fetch_log_group_size=10485760)option2 = LogHubConfig(endpoint, access_key_id, access_key, region, logset_id, topic_ids, consumer_group,consumer_name2, heartbeat_interval=3, data_fetch_interval=1,offset_start_time="end", max_fetch_log_group_size=10485760)print("*** start to consume data...")#消费者1client_worker1 = ConsumerWorker(SampleConsumer, consumer_option=option1)#启动消费者1client_worker1.start()client_worker2 = ConsumerWorker(SampleConsumer, consumer_option=option2)client_worker2.start()sleep_until(120, lambda: len(SampleConsumer.log_results) > 0)print("*** consumer group status ***")#打印消费组信息:消费组的名称、消费的日志主题、消费者心跳超时时间ret = client.list_consumer_group(logset_id, topic_ids)ret.log_print()print("*** stopping workers")#关闭消费者client_worker1.shutdown()client_worker2.shutdown()print("*** delete consumer group")#删除消费者组client.delete_consumer_group(logset_id, consumer_group)except Exception as e:raise e#打印消费的日志数据ret = str(SampleConsumer.log_results)print("*** get content:")print(ret)if __name__ == '__main__':sample_consumer_group()
本页内容是否解决了您的问题?