Skip to content

Commit a353f99

Browse files
Fix telemetry connection issue when disabling telemetry (#1290)
* Fix telemetry connection issue when disabling telemetry - use ENABLE_OPEA_TELEMETRY to control whether to enable open telemetry, default false - fix the issue that logs always show telemetry connection error with each request when telemetry is disabled - ban the above error propagation to microservices when telemetry is disabled Signed-off-by: Spycsh <sihan.chen@intel.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Fix ut failure where required the flag to be on * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Signed-off-by: Spycsh <sihan.chen@intel.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 7c2e7f6 commit a353f99

File tree

4 files changed

+26
-8
lines changed

4 files changed

+26
-8
lines changed

comps/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,7 @@
5151
from comps.cores.mega.micro_service import MicroService, register_microservice, opea_microservices
5252

5353
# Telemetry
54-
if os.getenv("ENABLE_OPEA_TELEMETRY", "false").lower() == "true":
55-
from comps.cores.telemetry.opea_telemetry import opea_telemetry
54+
from comps.cores.telemetry.opea_telemetry import opea_telemetry
5655

5756
# Common
5857
from comps.cores.common.component import OpeaComponent, OpeaComponentRegistry, OpeaComponentLoader

comps/cores/mega/orchestrator.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
logger = CustomLogger("comps-core-orchestrator")
2727
LOGFLAG = os.getenv("LOGFLAG", False)
28+
ENABLE_OPEA_TELEMETRY = os.getenv("ENABLE_OPEA_TELEMETRY", "false").lower() == "true"
2829

2930

3031
class OrchestratorMetrics:
@@ -208,11 +209,11 @@ def process_outputs(self, prev_nodes: List, result_dict: Dict) -> Dict:
208209

209210
def wrap_iterable(self, iterable, is_first=True):
210211

211-
with tracer.start_as_current_span("llm_generate_stream"):
212+
with tracer.start_as_current_span("llm_generate_stream") if ENABLE_OPEA_TELEMETRY else contextlib.nullcontext():
212213
while True:
213214
with (
214215
tracer.start_as_current_span("llm_generate_stream_first_token")
215-
if is_first
216+
if is_first and ENABLE_OPEA_TELEMETRY
216217
else contextlib.nullcontext()
217218
): # else tracer.start_as_current_span(f"llm_generate_stream_next_token")
218219
try:
@@ -253,7 +254,11 @@ async def execute(
253254
# Still leave to sync requests.post for StreamingResponse
254255
if LOGFLAG:
255256
logger.info(inputs)
256-
with tracer.start_as_current_span(f"{cur_node}_asyn_generate"):
257+
with (
258+
tracer.start_as_current_span(f"{cur_node}_asyn_generate")
259+
if ENABLE_OPEA_TELEMETRY
260+
else contextlib.nullcontext()
261+
):
257262
response = requests.post(
258263
url=endpoint,
259264
data=json.dumps(inputs),
@@ -320,8 +325,14 @@ def generate():
320325
input_data = {k: v for k, v in input_data.items() if v is not None}
321326
else:
322327
input_data = inputs
323-
with tracer.start_as_current_span(f"{cur_node}_generate"):
328+
329+
with (
330+
tracer.start_as_current_span(f"{cur_node}_generate")
331+
if ENABLE_OPEA_TELEMETRY
332+
else contextlib.nullcontext()
333+
):
324334
response = await session.post(endpoint, json=input_data)
335+
325336
if response.content_type == "audio/wav":
326337
audio_data = await response.read()
327338
data = self.align_outputs(audio_data, cur_node, inputs, runtime_graph, llm_parameters_dict, **kwargs)

comps/cores/telemetry/opea_telemetry.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Copyright (C) 2024 Intel Corporation
22
# SPDX-License-Identifier: Apache-2.0
33

4+
import contextlib
45
import inspect
56
import os
67
from functools import wraps
@@ -13,6 +14,8 @@
1314
from opentelemetry.sdk.trace.export import BatchSpanProcessor
1415
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
1516

17+
ENABLE_OPEA_TELEMETRY = os.getenv("ENABLE_OPEA_TELEMETRY", "false").lower() == "true"
18+
1619

1720
def detach_ignore_err(self, token: object) -> None:
1821
"""Resets Context to a previous value.
@@ -47,15 +50,15 @@ def opea_telemetry(func):
4750

4851
@wraps(func)
4952
async def wrapper(*args, **kwargs):
50-
with tracer.start_as_current_span(func.__name__):
53+
with tracer.start_as_current_span(func.__name__) if ENABLE_OPEA_TELEMETRY else contextlib.nullcontext():
5154
res = await func(*args, **kwargs)
5255
return res
5356

5457
else:
5558

5659
@wraps(func)
5760
def wrapper(*args, **kwargs):
58-
with tracer.start_as_current_span(func.__name__):
61+
with tracer.start_as_current_span(func.__name__) if ENABLE_OPEA_TELEMETRY else contextlib.nullcontext():
5962
res = func(*args, **kwargs)
6063
return res
6164

tests/cores/telemetry/test_telemetry.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22
# SPDX-License-Identifier: Apache-2.0
33

44
import asyncio
5+
import os
56
import time
67
import unittest
78

9+
os.environ["ENABLE_OPEA_TELEMETRY"] = "True"
810
from comps.cores.telemetry.opea_telemetry import in_memory_exporter, opea_telemetry
911

1012

@@ -32,6 +34,9 @@ async def dummy_async_func():
3234

3335
class TestTelemetry(unittest.TestCase):
3436

37+
def tearDown(self):
38+
os.environ["ENABLE_OPEA_TELEMETRY"] = "False"
39+
3540
def test_time_tracing(self):
3641
dummy_func()
3742
asyncio.run(dummy_async_func())

0 commit comments

Comments
 (0)