22import time
33from typing import Optional
44
5- from volcengine .tls .log_pb2 import LogGroup
5+ from volcengine .tls .log_pb2 import LogGroup , LogGroupList
66from volcengine .tls .producer .batch_semaphore import BatchSemaphore
77from volcengine .tls .producer .log_dispatcher import LogDispatcher
88from volcengine .tls .producer .mover import Mover
@@ -57,49 +57,83 @@ def send_logs_v2(self, hash_key: Optional[str], topic_id: str, source: Optional[
5757 max_log_group_count = ProducerConfig .MAX_LOG_GROUP_COUNT
5858 max_log_group_size = ProducerConfig .MAX_BATCH_SIZE
5959
60+ def _varint_len (x : int ) -> int :
61+ if x < 0 :
62+ return 10
63+ n = 1
64+ while x >= (1 << 7 ):
65+ n += 1
66+ x >>= 7
67+ return n
68+
69+ def _tag_len (field_number : int ) -> int :
70+ return _varint_len ((field_number << 3 ) | 2 )
71+
72+ log_groups_field_number = LogGroup .DESCRIPTOR .fields_by_name ["logs" ].number
73+ log_group_list_groups_field_number = LogGroupList .DESCRIPTOR .fields_by_name ["log_groups" ].number
74+
6075 log_group = LogGroup ()
6176 if source is not None :
6277 log_group .source = source
6378 if filename is not None :
6479 log_group .filename = filename
6580
81+ base_group_size = log_group .ByteSize ()
82+ current_estimated_group_size = base_group_size
6683 current_count = 0
6784 for v in logs :
68- new_log = log_group . logs .add () # pylint: disable=no-member
85+ new_log = getattr ( log_group , " logs" ) .add ()
6986 new_log .time = v .time
87+ if v .time_ns is not None and hasattr (new_log , "TimeNs" ):
88+ new_log .TimeNs = v .time_ns
7089 for key in v .log_dict .keys ():
7190 log_content = new_log .contents .add ()
7291 log_content .key = str (key )
7392 log_content .value = str (v .log_dict [key ])
7493
7594 current_count += 1
76- log_group_size = len (log_group .SerializeToString ())
77- if log_group_size > max_log_group_size :
95+ log_size = new_log .ByteSize ()
96+ current_estimated_group_size += _tag_len (log_groups_field_number ) + _varint_len (log_size ) + log_size
97+ group_size = current_estimated_group_size
98+ entry_size = _tag_len (log_group_list_groups_field_number ) + _varint_len (group_size ) + group_size
99+ if entry_size > max_log_group_size :
78100 if current_count == 1 :
101+ actual_group_size = log_group .ByteSize ()
102+ actual_entry_size = _tag_len (log_group_list_groups_field_number ) + _varint_len (actual_group_size ) + actual_group_size
79103 raise TLSException (
80104 error_code = "InvalidArgument" ,
81- error_message = f"log size { log_group_size } is larger than MAX_LOG_SIZE { max_log_group_size } "
105+ error_message = f"log size { actual_entry_size } is larger than MAX_LOG_SIZE { max_log_group_size } "
82106 )
83- log_group . logs .pop ()
107+ getattr ( log_group , " logs" ) .pop ()
84108 current_count -= 1
109+ current_estimated_group_size -= _tag_len (log_groups_field_number ) + _varint_len (log_size ) + log_size
85110 self .dispatcher .add_batch (hash_key , topic_id , source , filename , log_group , callback )
86111 log_group = LogGroup ()
87112 if source is not None :
88113 log_group .source = source
89114 if filename is not None :
90115 log_group .filename = filename
91- new_log = log_group .logs .add () # pylint: disable=no-member
116+ base_group_size = log_group .ByteSize ()
117+ new_log = getattr (log_group , "logs" ).add ()
92118 new_log .time = v .time
119+ if v .time_ns is not None and hasattr (new_log , "TimeNs" ):
120+ new_log .TimeNs = v .time_ns
93121 for key in v .log_dict .keys ():
94122 log_content = new_log .contents .add ()
95123 log_content .key = str (key )
96124 log_content .value = str (v .log_dict [key ])
97125 current_count = 1
98- log_group_size = len (log_group .SerializeToString ())
99- if log_group_size > max_log_group_size :
126+ current_estimated_group_size = base_group_size
127+ log_size = new_log .ByteSize ()
128+ current_estimated_group_size += _tag_len (log_groups_field_number ) + _varint_len (log_size ) + log_size
129+ group_size = current_estimated_group_size
130+ entry_size = _tag_len (log_group_list_groups_field_number ) + _varint_len (group_size ) + group_size
131+ if entry_size > max_log_group_size :
132+ actual_group_size = log_group .ByteSize ()
133+ actual_entry_size = _tag_len (log_group_list_groups_field_number ) + _varint_len (actual_group_size ) + actual_group_size
100134 raise TLSException (
101135 error_code = "InvalidArgument" ,
102- error_message = f"log size { log_group_size } is larger than MAX_LOG_SIZE { max_log_group_size } "
136+ error_message = f"log size { actual_entry_size } is larger than MAX_LOG_SIZE { max_log_group_size } "
103137 )
104138
105139 if current_count >= max_log_group_count :
@@ -110,6 +144,8 @@ def send_logs_v2(self, hash_key: Optional[str], topic_id: str, source: Optional[
110144 if filename is not None :
111145 log_group .filename = filename
112146 current_count = 0
147+ base_group_size = log_group .ByteSize ()
148+ current_estimated_group_size = base_group_size
113149
114150 if current_count > 0 :
115151 self .dispatcher .add_batch (hash_key , topic_id , source , filename , log_group , callback )
0 commit comments