Skip to content

Commit 62b390f

Browse files
authored
support FinishEventProcessor and some Span Set Method (#17)
1. support FinishEventProcessor 2. support traceTagTruncateConf 3. support traceQueueConf 4. support modify apiBasePath
1 parent 2625475 commit 62b390f

File tree

15 files changed

+443
-86
lines changed

15 files changed

+443
-86
lines changed

cozeloop/_client.py

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
import os
88
import threading
99
from datetime import datetime
10-
from typing import Dict, Any, List, Optional
10+
from typing import Dict, Any, List, Optional, Callable
11+
12+
import httpx
1113

1214
from cozeloop.client import Client
1315
from cozeloop._noop import NOOP_SPAN, _NoopClient
@@ -17,6 +19,8 @@
1719
from cozeloop.internal.httpclient import Auth
1820
from cozeloop.internal.prompt import PromptProvider
1921
from cozeloop.internal.trace import TraceProvider
22+
from cozeloop.internal.trace.model.model import FinishEventInfo, TagTruncateConf, QueueConf
23+
from cozeloop.internal.trace.trace import default_finish_event_processor
2024
from cozeloop.span import SpanContext, Span
2125

2226
logger = logging.getLogger(__name__)
@@ -35,6 +39,15 @@
3539
_default_client = None
3640
_client_lock = threading.Lock()
3741

42+
class APIBasePath:
43+
def __init__(
44+
self,
45+
trace_span_upload_path: str = None,
46+
trace_file_upload_path: str = None,
47+
):
48+
self.trace_span_upload_path = trace_span_upload_path
49+
self.trace_file_upload_path = trace_file_upload_path
50+
3851

3952
def _generate_cache_key(*args) -> str:
4053
key_str = "\t".join(str(arg) for arg in args)
@@ -54,8 +67,13 @@ def new_client(
5467
prompt_cache_max_count: int = consts.DEFAULT_PROMPT_CACHE_MAX_COUNT,
5568
prompt_cache_refresh_interval: int = consts.DEFAULT_PROMPT_CACHE_REFRESH_INTERVAL,
5669
prompt_trace: bool = False,
70+
http_client: Optional[httpx.Client] = None,
71+
trace_finish_event_processor: Optional[Callable[[FinishEventInfo], None]] = None,
72+
tag_truncate_conf: Optional[TagTruncateConf] = None,
73+
api_base_path: Optional[APIBasePath] = None,
74+
trace_queue_conf: Optional[QueueConf] = None,
5775
) -> Client:
58-
cache_key = _generate_cache_key(
76+
cache_key = _generate_cache_key( # all args are used to generate cache key
5977
api_base_url,
6078
workspace_id,
6179
api_token,
@@ -67,7 +85,12 @@ def new_client(
6785
ultra_large_report,
6886
prompt_cache_max_count,
6987
prompt_cache_refresh_interval,
70-
prompt_trace
88+
prompt_trace,
89+
http_client,
90+
trace_finish_event_processor,
91+
tag_truncate_conf,
92+
api_base_path,
93+
trace_queue_conf,
7194
)
7295

7396
with _cache_lock:
@@ -88,6 +111,11 @@ def new_client(
88111
prompt_cache_max_count=prompt_cache_max_count,
89112
prompt_cache_refresh_interval=prompt_cache_refresh_interval,
90113
prompt_trace=prompt_trace,
114+
arg_http_client=http_client,
115+
trace_finish_event_processor=trace_finish_event_processor,
116+
tag_truncate_conf=tag_truncate_conf,
117+
api_base_path=api_base_path,
118+
trace_queue_conf=trace_queue_conf,
91119
)
92120
_client_cache[cache_key] = client
93121
return client
@@ -113,7 +141,12 @@ def __init__(
113141
ultra_large_report: bool = False,
114142
prompt_cache_max_count: int = consts.DEFAULT_PROMPT_CACHE_MAX_COUNT,
115143
prompt_cache_refresh_interval: int = consts.DEFAULT_PROMPT_CACHE_REFRESH_INTERVAL,
116-
prompt_trace: bool = False
144+
prompt_trace: bool = False,
145+
arg_http_client: Optional[httpx.Client] = None,
146+
trace_finish_event_processor: Optional[Callable[[FinishEventInfo], None]] = None,
147+
tag_truncate_conf: Optional[TagTruncateConf] = None,
148+
api_base_path: Optional[APIBasePath] = None,
149+
trace_queue_conf: Optional[QueueConf] = None,
117150
):
118151
workspace_id = self._get_from_env(workspace_id, ENV_WORKSPACE_ID)
119152
api_base_url = self._get_from_env(api_base_url, ENV_API_BASE_URL)
@@ -136,6 +169,8 @@ def __init__(
136169

137170
self._workspace_id = workspace_id
138171
inner_client = httpclient.HTTPClient()
172+
if arg_http_client:
173+
inner_client = arg_http_client
139174
auth = self._build_auth(
140175
api_base_url=api_base_url,
141176
http_client=inner_client,
@@ -151,10 +186,26 @@ def __init__(
151186
timeout=timeout,
152187
upload_timeout=upload_timeout
153188
)
189+
finish_pro = default_finish_event_processor
190+
if trace_finish_event_processor:
191+
def combined_processor(event_info: FinishEventInfo):
192+
default_finish_event_processor(event_info)
193+
trace_finish_event_processor(event_info)
194+
finish_pro = combined_processor
195+
span_upload_path = None
196+
file_upload_path = None
197+
if api_base_path:
198+
span_upload_path = api_base_path.trace_span_upload_path
199+
file_upload_path = api_base_path.trace_file_upload_path
154200
self._trace_provider = TraceProvider(
155201
http_client=http_client,
156202
workspace_id=workspace_id,
157-
ultra_large_report=ultra_large_report
203+
ultra_large_report=ultra_large_report,
204+
finish_event_processor=finish_pro,
205+
tag_truncate_conf=tag_truncate_conf,
206+
span_upload_path=span_upload_path,
207+
file_upload_path=file_upload_path,
208+
queue_conf=trace_queue_conf,
158209
)
159210
self._prompt_provider = PromptProvider(
160211
workspace_id=workspace_id,
@@ -234,7 +285,7 @@ def start_span(
234285
else:
235286
return self._trace_provider.start_span(name=name, span_type=span_type, start_time=start_time,
236287
parent_span_id=child_of.span_id, trace_id=child_of.trace_id,
237-
baggage=child_of.baggage, start_new_trace=start_new_trace)
288+
baggage=child_of.baggage(), start_new_trace=start_new_trace)
238289
except Exception as e:
239290
logger.warning(f"Start span failed, returning noop span. Error: {e}")
240291
return NOOP_SPAN

cozeloop/internal/consts/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
THREAD_ID = "thread_id"
4040
START_TIME_FIRST_RESP = "start_time_first_resp"
4141
LATENCY_FIRST_RESP = "latency_first_resp"
42+
DEPLOYMENT_ENV = "deployment_env"
4243
CUT_OFF = "cut_off"
4344

4445
# ReserveFieldTypes Define the allowed types for each reserved field.

cozeloop/internal/httpclient/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,10 @@ def _set_headers(self, headers: Optional[Dict[str, str]] = None) -> Dict[str, st
4343
res.update(headers)
4444
res[consts.AUTHORIZE_HEADER] = f"Bearer {self.auth.token}"
4545

46-
tt_env = os.getenv("x-tt-env")
46+
tt_env = os.getenv("x_tt_env")
4747
if tt_env:
4848
res["x-tt-env"] = tt_env
49-
ppe_env = os.getenv("x-use-ppe")
49+
ppe_env = os.getenv("x_use_ppe")
5050
if ppe_env:
5151
res["x-use-ppe"] = "1"
5252

cozeloop/internal/trace/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,3 @@
66
from .trace import TraceProvider
77

88
from .trace import Span
9-

cozeloop/internal/trace/exporter.py

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
logger = logging.getLogger(__name__)
1919

2020
class Exporter:
21-
def export_spans(self, ctx: dict, spans: List['UploadSpan']) -> bool:
21+
def export_spans(self, ctx: dict, spans: List['UploadSpan']):
2222
raise NotImplementedError
2323

24-
def export_files(self, ctx: dict, files: List['UploadFile']) -> bool:
24+
def export_files(self, ctx: dict, files: List['UploadFile']):
2525
raise NotImplementedError
2626

2727

@@ -35,55 +35,61 @@ def export_files(self, ctx: dict, files: List['UploadFile']) -> bool:
3535
PATH_INGEST_TRACE = "/v1/loop/traces/ingest"
3636
PATH_UPLOAD_FILE = "/v1/loop/files/upload"
3737

38+
class UploadPath:
39+
def __init__(
40+
self,
41+
span_upload_path: str,
42+
file_upload_path: str,
43+
):
44+
self.span_upload_path = span_upload_path
45+
self.file_upload_path = file_upload_path
46+
3847

3948
class SpanExporter(Exporter):
40-
def __init__(self, client: Client):
49+
def __init__(
50+
self,
51+
client: Client,
52+
upload_path: UploadPath,
53+
):
4154
self.client = client
55+
self.upload_path = upload_path
4256

43-
def export_files(self, ctx: dict, files: List['UploadFile']) -> bool:
57+
def export_files(self, ctx: dict, files: List['UploadFile']):
4458
for file in files:
4559
if not file:
4660
continue
4761

4862
logger.debug(f"uploadFile start, file name: {file.name}")
4963
try:
5064
resp = self.client.upload_file(
51-
PATH_UPLOAD_FILE,
65+
self.upload_path.file_upload_path,
5266
BaseResponse,
5367
file.data,
5468
file.tos_key,
5569
{"workspace_id": file.space_id},
5670
)
5771
if resp.code != 0: # todo: some err code do not need retry
58-
logger.error(f"export files[{file.tos_key}] fail, code:[{resp.code}], msg:[{resp.msg}]")
59-
return False
72+
raise Exception(f"code:[{resp.code}], msg:[{resp.msg}]")
6073
except Exception as e:
61-
logger.error(f"export files[{file.tos_key}] fail, err:[{e}], file.name:[{file.name}]")
62-
return False
74+
raise Exception(f"export files[{file.tos_key}] fail, err:[{e}], file.name:[{file.name}]")
6375

6476
logger.debug(f"uploadFile end, file name: {file.name}")
65-
return True
6677

67-
def export_spans(self, ctx: dict, spans: List['UploadSpan']) -> bool:
68-
logger.debug(f"export spans, spans count: {len(spans)}")
6978

70-
if not spans:
71-
return True
79+
def export_spans(self, ctx: dict, spans: List['UploadSpan']):
80+
if not spans or len(spans) == 0:
81+
return
7282

7383
try:
7484
resp = self.client.post(
75-
PATH_INGEST_TRACE,
85+
self.upload_path.span_upload_path,
7686
BaseResponse,
7787
UploadSpanData(spans=spans),
7888
)
7989
if resp.code != 0: # todo: some err code do not need retry
80-
logger.error(f"export spans fail, code:[{resp.code}], msg:[{resp.msg}]")
81-
return False
90+
raise Exception(f"code:[{resp.code}], msg:[{resp.msg}]")
8291
except Exception as e:
83-
logger.error(f"export spans fail, err:[{e}]")
84-
return False
85-
86-
return True
92+
raise Exception(f"export spans fail, err:[{e}]")
8793

8894

8995
class UploadSpanData(BaseModel):
@@ -92,10 +98,12 @@ class UploadSpanData(BaseModel):
9298

9399
class UploadSpan(BaseModel):
94100
started_at_micros: int
101+
log_id: str
95102
span_id: str
96103
parent_id: str
97104
trace_id: str
98105
duration_micros: int
106+
service_name: str
99107
workspace_id: str
100108
span_name: str
101109
span_type: str
@@ -137,10 +145,12 @@ def transfer_to_upload_span_and_file(spans: List['Span']) -> (List[UploadSpan],
137145

138146
res_span.append(UploadSpan(
139147
started_at_micros=int(span.start_time.timestamp() * 1_000_000),
148+
log_id=span.log_id,
140149
span_id=span.span_id,
141150
parent_id=span.parent_span_id,
142151
trace_id=span.trace_id,
143152
duration_micros=span.get_duration(),
153+
service_name=span.service_name,
144154
workspace_id=span.get_space_id(),
145155
span_name=span.get_span_name(),
146156
span_type=span.get_span_type(),

cozeloop/internal/trace/model/model.py

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
from enum import Enum
55

66
from pydantic import BaseModel
7-
from typing import List, Optional
7+
from typing import List, Optional, Literal
8+
from pydantic.dataclasses import dataclass
89

910

1011
class ObjectStorage(BaseModel):
@@ -23,3 +24,46 @@ class Attachment(BaseModel):
2324
class UploadType(str, Enum):
2425
LONG = 1
2526
MULTI_MODALITY = 2
27+
28+
29+
SpanFinishEvent = Literal[
30+
"queue_manager.span_entry.rate",
31+
"queue_manager.file_entry.rate",
32+
"exporter.span_flush.rate",
33+
"exporter.file_flush.rate"
34+
]
35+
36+
37+
@dataclass
38+
class FinishEventInfoExtra:
39+
is_root_span: bool = False
40+
latency_ms: int = 0
41+
42+
43+
@dataclass
44+
class FinishEventInfo:
45+
event_type: SpanFinishEvent
46+
is_event_fail: bool
47+
item_num: int # maybe multiple span is processed in one event
48+
detail_msg: str
49+
extra_params: Optional[FinishEventInfoExtra] = None
50+
51+
52+
class TagTruncateConf:
53+
def __init__(
54+
self,
55+
normal_field_max_byte: int,
56+
input_output_field_max_byte: int,
57+
):
58+
self.normal_field_max_byte = normal_field_max_byte
59+
self.input_output_field_max_byte = input_output_field_max_byte
60+
61+
62+
class QueueConf:
63+
def __init__(
64+
self,
65+
span_queue_length: int,
66+
span_max_export_batch_length: int,
67+
):
68+
self.span_queue_length = span_queue_length
69+
self.span_max_export_batch_length = span_max_export_batch_length

cozeloop/internal/trace/noop_span.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,18 @@ def set_start_time_first_resp(self, start_time_first_resp: int) -> None:
110110
def set_runtime(self, runtime: Runtime) -> None:
111111
pass
112112

113+
def set_service_name(self, service_name: str) -> None:
114+
pass
115+
116+
def set_log_id(self, log_id: str) -> None:
117+
pass
118+
119+
def set_system_tags(self, system_tags: Dict[str, Any]) -> None:
120+
pass
121+
122+
def set_deployment_env(self, deployment_env: str) -> None:
123+
pass
124+
113125
def __enter__(self):
114126
return self
115127

0 commit comments

Comments
 (0)