Skip to content

Commit 12dbfb2

Browse files
committed
Rework emmitter registration and other improvements
Implemented datadog specific log exporter Implement a non-blocking log store flushing system Add OTEL log store flavor
1 parent d086dbe commit 12dbfb2

File tree

13 files changed

+670
-177
lines changed

13 files changed

+670
-177
lines changed

src/zenml/log_stores/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
# OpenTelemetry log store
2424
from zenml.log_stores.otel.otel_flavor import (
2525
OtelLogStoreConfig,
26+
OtelLogStoreFlavor,
2627
)
2728
from zenml.log_stores.otel.otel_log_store import OtelLogStore
2829

@@ -50,4 +51,5 @@
5051
"DatadogLogStoreFlavor",
5152
"OtelLogStore",
5253
"OtelLogStoreConfig",
54+
"OtelLogStoreFlavor",
5355
]

src/zenml/log_stores/artifact/artifact_log_exporter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def export(self, batch: Sequence["LogData"]) -> LogExportResult:
7373
if not attrs:
7474
continue
7575

76-
log_uri = attrs.get("zenml.log_model.uri")
76+
log_uri = attrs.get("zenml.log.uri")
7777
if not log_uri or not isinstance(log_uri, str):
7878
continue
7979

src/zenml/log_stores/artifact/artifact_log_store.py

Lines changed: 57 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
from datetime import datetime
1919
from typing import (
2020
Any,
21+
Dict,
2122
Iterator,
2223
List,
2324
Optional,
25+
Type,
2426
cast,
2527
)
2628
from uuid import UUID
@@ -30,9 +32,16 @@
3032
from zenml.artifact_stores import BaseArtifactStore
3133
from zenml.enums import LoggingLevels, StackComponentType
3234
from zenml.exceptions import DoesNotExistException
33-
from zenml.log_stores.base_log_store import MAX_ENTRIES_PER_REQUEST
35+
from zenml.log_stores import BaseLogStore
36+
from zenml.log_stores.base_log_store import (
37+
MAX_ENTRIES_PER_REQUEST,
38+
BaseLogStoreEmitter,
39+
)
3440
from zenml.log_stores.otel.otel_flavor import OtelLogStoreConfig
35-
from zenml.log_stores.otel.otel_log_store import OtelLogStore
41+
from zenml.log_stores.otel.otel_log_store import (
42+
OtelLogStore,
43+
OtelLogStoreEmitter,
44+
)
3645
from zenml.logger import get_logger
3746
from zenml.models import LogsResponse
3847
from zenml.utils.io_utils import sanitize_remote_path
@@ -128,6 +137,9 @@ def _stream_logs_line_by_line(
128137
Raises:
129138
DoesNotExistException: If the artifact does not exist in the artifact store.
130139
"""
140+
if not artifact_store.exists(logs_uri):
141+
return []
142+
131143
if not artifact_store.isdir(logs_uri):
132144
# Single file case
133145
with artifact_store.open(logs_uri, "r") as file:
@@ -199,6 +211,31 @@ class ArtifactLogStoreConfig(OtelLogStoreConfig):
199211
"""Configuration for the artifact log store."""
200212

201213

214+
class ArtifactLogStoreEmitter(OtelLogStoreEmitter):
215+
"""Artifact log store emitter."""
216+
217+
def __init__(
218+
self,
219+
name: str,
220+
log_store: "BaseLogStore",
221+
log_model: LogsResponse,
222+
metadata: Dict[str, Any],
223+
) -> None:
224+
"""Initialize a log store emitter.
225+
226+
Args:
227+
name: The name of the emitter.
228+
log_store: The log store to emit logs to.
229+
log_model: The log model associated with the emitter.
230+
metadata: Additional metadata to attach to all log entries that will
231+
be emitted by this emitter.
232+
"""
233+
super().__init__(name, log_store, log_model, metadata)
234+
235+
if log_model.uri:
236+
self._metadata["zenml.log.uri"] = log_model.uri
237+
238+
202239
class ArtifactLogStore(OtelLogStore):
203240
"""Log store that saves logs to the artifact store.
204241
@@ -220,6 +257,15 @@ def __init__(
220257
super().__init__(*args, **kwargs)
221258
self._artifact_store = artifact_store
222259

260+
@property
261+
def emitter_class(self) -> Type[ArtifactLogStoreEmitter]:
262+
"""Class of the emitter.
263+
264+
Returns:
265+
The class of the emitter.
266+
"""
267+
return ArtifactLogStoreEmitter
268+
223269
@classmethod
224270
def from_artifact_store(
225271
cls, artifact_store: "BaseArtifactStore"
@@ -236,7 +282,7 @@ def from_artifact_store(
236282
artifact_store=artifact_store,
237283
id=artifact_store.id,
238284
name="default",
239-
config=ArtifactLogStoreConfig(),
285+
config=ArtifactLogStoreConfig(endpoint=artifact_store.path),
240286
flavor="artifact",
241287
type=StackComponentType.LOG_STORE,
242288
user=artifact_store.user,
@@ -265,26 +311,20 @@ def get_exporter(self) -> "LogExporter":
265311

266312
return ArtifactLogExporter(artifact_store=self._artifact_store)
267313

268-
def finalize(
314+
def _finalize(
269315
self,
270-
log_model: LogsResponse,
316+
emitter: BaseLogStoreEmitter,
271317
) -> None:
272-
"""Finalize the stream of log records associated with a log model.
318+
"""Finalize the stream of log records associated with an emitter.
273319
274320
Args:
275-
log_model: The log model to finalize.
321+
emitter: The emitter to finalize.
276322
"""
323+
assert isinstance(emitter, ArtifactLogStoreEmitter)
277324
with self._lock:
278-
if not self._provider or self._logger is None:
279-
return
280-
281-
self._logger.emit(
282-
body=END_OF_STREAM_MESSAGE,
283-
attributes={
284-
"zenml.log_model.id": str(log_model.id),
285-
"zenml.log_model.uri": str(log_model.uri),
286-
},
287-
)
325+
emitter.logger.emit(
326+
body=END_OF_STREAM_MESSAGE,
327+
)
288328

289329
def fetch(
290330
self,

src/zenml/log_stores/base_log_store.py

Lines changed: 123 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import logging
1717
import threading
18-
from abc import abstractmethod
18+
from abc import ABC, abstractmethod
1919
from datetime import datetime
2020
from typing import Any, Dict, List, Optional, Type, cast
2121

@@ -31,7 +31,73 @@ class BaseLogStoreConfig(StackComponentConfig):
3131
"""Base configuration for all log stores."""
3232

3333

34-
class BaseLogStore(StackComponent):
34+
class BaseLogStoreEmitter:
35+
"""Base class for all ZenML log store emitters.
36+
37+
The emitter is the entry point for all log records to be emitted to the log
38+
store. The process of emitting a log record is as follows:
39+
40+
1. instantiate a BaseLogStoreEmitter
41+
2. create an emitter by calling log_store.register_emitter() and passing the
42+
log model and optional metadata to be attached to each log record
43+
3. emit the log record by calling emitter.emit() and passing the log record
44+
4. deregister the emitter when all logs have been emitted by calling
45+
emitter.deregister()
46+
"""
47+
48+
def __init__(
49+
self,
50+
name: str,
51+
log_store: "BaseLogStore",
52+
log_model: LogsResponse,
53+
metadata: Dict[str, Any],
54+
) -> None:
55+
"""Initialize a log store emitter.
56+
57+
Args:
58+
name: The name of the emitter.
59+
log_store: The log store to emit logs to.
60+
log_model: The log model associated with the emitter.
61+
metadata: Additional metadata to attach to all log entries that will
62+
be emitted by this emitter.
63+
"""
64+
self._name = name
65+
self._log_store = log_store
66+
self._log_model = log_model
67+
self._metadata = metadata
68+
69+
@property
70+
def name(self) -> str:
71+
"""The name of the emitter.
72+
73+
Returns:
74+
The name of the emitter.
75+
"""
76+
return self._name
77+
78+
@property
79+
def log_model(self) -> LogsResponse:
80+
"""The log model associated with the emitter.
81+
82+
Returns:
83+
The log model.
84+
"""
85+
return self._log_model
86+
87+
def emit(self, record: logging.LogRecord) -> None:
88+
"""Emit a log record to the log store.
89+
90+
Args:
91+
record: The log record to emit.
92+
"""
93+
self._log_store._emit(self, record, metadata=self._metadata)
94+
95+
def deregister(self) -> None:
96+
"""Deregister the emitter from the log store."""
97+
self._log_store.deregister_emitter(self)
98+
99+
100+
class BaseLogStore(StackComponent, ABC):
35101
"""Base class for all ZenML log stores.
36102
37103
A log store is responsible for collecting, storing, and retrieving logs
@@ -47,7 +113,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
47113
**kwargs: Keyword arguments for the base class.
48114
"""
49115
super().__init__(*args, **kwargs)
50-
self._emitter_counter = 0
116+
self._emitters: Dict[str, BaseLogStoreEmitter] = {}
51117
self._lock = threading.RLock()
52118

53119
@property
@@ -59,56 +125,88 @@ def config(self) -> BaseLogStoreConfig:
59125
"""
60126
return cast(BaseLogStoreConfig, self._config)
61127

128+
@property
129+
def emitter_class(self) -> Type[BaseLogStoreEmitter]:
130+
"""Class of the emitter.
131+
132+
Returns:
133+
The class of the emitter.
134+
"""
135+
return BaseLogStoreEmitter
136+
137+
def register_emitter(
138+
self, name: str, log_model: LogsResponse, metadata: Dict[str, Any]
139+
) -> BaseLogStoreEmitter:
140+
"""Register an emitter for the log store.
141+
142+
Args:
143+
name: The name of the emitter.
144+
log_model: The log model associated with the emitter.
145+
metadata: Additional metadata to attach to the log entry.
146+
147+
Returns:
148+
The emitter.
149+
"""
150+
with self._lock:
151+
emitter = self.emitter_class(name, self, log_model, metadata)
152+
self._emitters[name] = emitter
153+
return emitter
154+
155+
def deregister_emitter(self, emitter: BaseLogStoreEmitter) -> None:
156+
"""Deregister an emitter registered with the log store.
157+
158+
Args:
159+
emitter: The emitter to deregister.
160+
"""
161+
with self._lock:
162+
if emitter.name not in self._emitters:
163+
return
164+
self._finalize(emitter)
165+
del self._emitters[emitter.name]
166+
if len(self._emitters) == 0:
167+
self.flush(blocking=False)
168+
62169
@abstractmethod
63-
def emit(
170+
def _emit(
64171
self,
172+
emitter: BaseLogStoreEmitter,
65173
record: logging.LogRecord,
66-
log_model: LogsResponse,
67174
metadata: Dict[str, Any],
68175
) -> None:
69176
"""Process a log record from the logging system.
70177
71178
Args:
179+
emitter: The emitter to emit the log record to.
72180
record: The Python logging.LogRecord to process.
73-
log_model: The log model to emit the log record to.
74181
metadata: Additional metadata to attach to the log entry.
75182
"""
76183

77184
@abstractmethod
78-
def finalize(
185+
def _finalize(
79186
self,
80-
log_model: LogsResponse,
187+
emitter: BaseLogStoreEmitter,
81188
) -> None:
82-
"""Finalize the stream of log records associated with a log model.
189+
"""Finalize the stream of log records associated with an emitter.
83190
84191
This is used to announce the end of the stream of log records associated
85-
with a log model and that no more log records will be emitted.
192+
with an emitter and that no more log records will be emitted.
86193
87194
The implementation should ensure that all log records associated with
88-
the log model are flushed to the backend and any resources (clients,
195+
the emitter are flushed to the backend and any resources (clients,
89196
connections, file descriptors, etc.) are released.
90197
91198
Args:
92-
log_model: The log model to finalize.
199+
emitter: The emitter to finalize.
93200
"""
94201

95-
def register_emitter(self) -> None:
96-
"""Register an emitter for the log store."""
97-
with self._lock:
98-
self._emitter_counter += 1
99-
100-
def deregister_emitter(self) -> None:
101-
"""Deregister an emitter for the log store."""
102-
with self._lock:
103-
self._emitter_counter -= 1
104-
if self._emitter_counter == 0:
105-
self.flush()
106-
107202
@abstractmethod
108-
def flush(self) -> None:
203+
def flush(self, blocking: bool = True) -> None:
109204
"""Flush the log store.
110205
111206
This method is called to ensure that all logs are flushed to the backend.
207+
208+
Args:
209+
blocking: Whether to block until the flush is complete.
112210
"""
113211

114212
@abstractmethod

src/zenml/log_stores/datadog/datadog_flavor.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
# permissions and limitations under the License.
1414
"""Datadog log store flavor."""
1515

16-
from typing import Type
16+
from typing import Any, Dict, Type
1717

18-
from pydantic import Field, field_validator
18+
from pydantic import Field, field_validator, model_validator
1919

2020
from zenml.enums import StackComponentType
2121
from zenml.log_stores import BaseLogStore, BaseLogStoreConfig
@@ -73,6 +73,22 @@ def validate_max_export_batch_size(cls, v: int) -> int:
7373
)
7474
return v
7575

76+
@model_validator(mode="before")
77+
@classmethod
78+
def set_default_endpoint(cls, data: Dict[str, Any]) -> Dict[str, Any]:
79+
"""Set the endpoint based on site if not provided.
80+
81+
Args:
82+
data: The input data dictionary.
83+
84+
Returns:
85+
The data dictionary with the endpoint set if not provided.
86+
"""
87+
if isinstance(data, dict) and not data.get("endpoint"):
88+
site = data.get("site", "datadoghq.com")
89+
data["endpoint"] = f"https://http-intake.logs.{site}/api/v2/logs"
90+
return data
91+
7692

7793
class DatadogLogStoreFlavor(Flavor):
7894
"""Datadog log store flavor."""

0 commit comments

Comments
 (0)