Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/zenml/cli/stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,14 @@ def register_stack(
type=str,
required=False,
)
@click.option(
"-l",
"--log_store",
"log_store",
help="Name of the log store for this stack.",
type=str,
required=False,
)
@click.option(
"--secret",
"secrets",
Expand Down Expand Up @@ -757,6 +765,7 @@ def update_stack(
image_builder: Optional[str] = None,
model_registry: Optional[str] = None,
deployer: Optional[str] = None,
log_store: Optional[str] = None,
secrets: List[str] = [],
remove_secrets: List[str] = [],
environment_variables: List[str] = [],
Expand All @@ -779,6 +788,7 @@ def update_stack(
image_builder: Name of the new image builder for this stack.
model_registry: Name of the new model registry for this stack.
deployer: Name of the new deployer for this stack.
log_store: Name of the log store for this stack.
secrets: Secrets to attach to the stack.
remove_secrets: Secrets to remove from the stack.
environment_variables: Environment variables to set when running on this
Expand Down Expand Up @@ -825,6 +835,8 @@ def update_stack(
updates[StackComponentType.STEP_OPERATOR] = [step_operator]
if deployer:
updates[StackComponentType.DEPLOYER] = [deployer]
if log_store:
updates[StackComponentType.LOG_STORE] = [log_store]

try:
updated_stack = client.update_stack(
Expand Down Expand Up @@ -938,6 +950,14 @@ def update_stack(
is_flag=True,
required=False,
)
@click.option(
"-l",
"--log_store",
"log_store_flag",
help="Include this to remove the log store from this stack.",
is_flag=True,
required=False,
)
def remove_stack_component(
stack_name_or_id: Optional[str] = None,
container_registry_flag: Optional[bool] = False,
Expand All @@ -951,6 +971,7 @@ def remove_stack_component(
image_builder_flag: Optional[bool] = False,
model_registry_flag: Optional[str] = None,
deployer_flag: Optional[bool] = False,
log_store_flag: Optional[bool] = False,
) -> None:
"""Remove stack components from a stack.

Expand All @@ -969,6 +990,7 @@ def remove_stack_component(
image_builder_flag: To remove the image builder from this stack.
model_registry_flag: To remove the model registry from this stack.
deployer_flag: To remove the deployer from this stack.
log_store_flag: To remove the log store from this stack.
"""
client = Client()

Expand Down Expand Up @@ -1008,6 +1030,9 @@ def remove_stack_component(
if deployer_flag:
stack_component_update[StackComponentType.DEPLOYER] = []

if log_store_flag:
stack_component_update[StackComponentType.LOG_STORE] = []

try:
updated_stack = client.update_stack(
name_id_or_prefix=stack_name_or_id,
Expand Down
2 changes: 2 additions & 0 deletions src/zenml/log_stores/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
# OpenTelemetry log store
from zenml.log_stores.otel.otel_flavor import (
OtelLogStoreConfig,
OtelLogStoreFlavor,
)
from zenml.log_stores.otel.otel_log_store import OtelLogStore

Expand Down Expand Up @@ -50,4 +51,5 @@
"DatadogLogStoreFlavor",
"OtelLogStore",
"OtelLogStoreConfig",
"OtelLogStoreFlavor",
]
2 changes: 1 addition & 1 deletion src/zenml/log_stores/artifact/artifact_log_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def export(self, batch: Sequence["LogData"]) -> LogExportResult:
if not attrs:
continue

log_uri = attrs.get("zenml.log_model.uri")
log_uri = attrs.get("zenml.log.uri")
if not log_uri or not isinstance(log_uri, str):
continue

Expand Down
78 changes: 59 additions & 19 deletions src/zenml/log_stores/artifact/artifact_log_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
from datetime import datetime
from typing import (
Any,
Iterator,
Dict,
Generator,
List,
Optional,
Type,
cast,
)
from uuid import UUID
Expand All @@ -30,9 +32,16 @@
from zenml.artifact_stores import BaseArtifactStore
from zenml.enums import LoggingLevels, StackComponentType
from zenml.exceptions import DoesNotExistException
from zenml.log_stores.base_log_store import MAX_ENTRIES_PER_REQUEST
from zenml.log_stores import BaseLogStore
from zenml.log_stores.base_log_store import (
MAX_ENTRIES_PER_REQUEST,
BaseLogStoreEmitter,
)
from zenml.log_stores.otel.otel_flavor import OtelLogStoreConfig
from zenml.log_stores.otel.otel_log_store import OtelLogStore
from zenml.log_stores.otel.otel_log_store import (
OtelLogStore,
OtelLogStoreEmitter,
)
from zenml.logger import get_logger
from zenml.models import LogsResponse
from zenml.utils.io_utils import sanitize_remote_path
Expand Down Expand Up @@ -112,7 +121,7 @@ def fetch_log_records(
def _stream_logs_line_by_line(
artifact_store: "BaseArtifactStore",
logs_uri: str,
) -> Iterator[str]:
) -> Generator[str, None, None]:
"""Stream logs line by line without loading the entire file into memory.

This generator yields log lines one by one, handling both single files
Expand All @@ -128,6 +137,9 @@ def _stream_logs_line_by_line(
Raises:
DoesNotExistException: If the artifact does not exist in the artifact store.
"""
if not artifact_store.exists(logs_uri):
return

if not artifact_store.isdir(logs_uri):
# Single file case
with artifact_store.open(logs_uri, "r") as file:
Expand Down Expand Up @@ -199,6 +211,31 @@ class ArtifactLogStoreConfig(OtelLogStoreConfig):
"""Configuration for the artifact log store."""


class ArtifactLogStoreEmitter(OtelLogStoreEmitter):
"""Artifact log store emitter."""

def __init__(
self,
name: str,
log_store: "BaseLogStore",
log_model: LogsResponse,
metadata: Dict[str, Any],
) -> None:
"""Initialize a log store emitter.

Args:
name: The name of the emitter.
log_store: The log store to emit logs to.
log_model: The log model associated with the emitter.
metadata: Additional metadata to attach to all log entries that will
be emitted by this emitter.
"""
super().__init__(name, log_store, log_model, metadata)

if log_model.uri:
self._metadata["zenml.log.uri"] = log_model.uri


class ArtifactLogStore(OtelLogStore):
"""Log store that saves logs to the artifact store.

Expand All @@ -220,6 +257,15 @@ def __init__(
super().__init__(*args, **kwargs)
self._artifact_store = artifact_store

@property
def emitter_class(self) -> Type[ArtifactLogStoreEmitter]:
"""Class of the emitter.

Returns:
The class of the emitter.
"""
return ArtifactLogStoreEmitter

@classmethod
def from_artifact_store(
cls, artifact_store: "BaseArtifactStore"
Expand All @@ -236,7 +282,7 @@ def from_artifact_store(
artifact_store=artifact_store,
id=artifact_store.id,
name="default",
config=ArtifactLogStoreConfig(),
config=ArtifactLogStoreConfig(endpoint=artifact_store.path),
flavor="artifact",
type=StackComponentType.LOG_STORE,
user=artifact_store.user,
Expand Down Expand Up @@ -265,26 +311,20 @@ def get_exporter(self) -> "LogExporter":

return ArtifactLogExporter(artifact_store=self._artifact_store)

def finalize(
def _finalize(
self,
log_model: LogsResponse,
emitter: BaseLogStoreEmitter,
) -> None:
"""Finalize the stream of log records associated with a log model.
"""Finalize the stream of log records associated with an emitter.

Args:
log_model: The log model to finalize.
emitter: The emitter to finalize.
"""
assert isinstance(emitter, ArtifactLogStoreEmitter)
with self._lock:
if not self._provider or self._logger is None:
return

self._logger.emit(
body=END_OF_STREAM_MESSAGE,
attributes={
"zenml.log_model.id": str(log_model.id),
"zenml.log_model.uri": str(log_model.uri),
},
)
emitter.logger.emit(
body=END_OF_STREAM_MESSAGE,
)

def fetch(
self,
Expand Down
Loading
Loading