From 46956da5f944099d078a981a3ed382d3ec1fb73f Mon Sep 17 00:00:00 2001 From: "Vincent D. Warmerdam" Date: Sat, 12 Apr 2025 22:41:46 +0200 Subject: [PATCH 01/26] idea --- demo.py | 55 ++++++++++++++++++++++------- flowshow/__init__.py | 83 +++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 120 insertions(+), 18 deletions(-) diff --git a/demo.py b/demo.py index 4043975..a3494bd 100644 --- a/demo.py +++ b/demo.py @@ -1,12 +1,12 @@ import marimo -__generated_with = "0.10.19" +__generated_with = "0.12.8" app = marimo.App(width="medium") @app.cell def _(mo): - mo.md("Flowshow provides a `@task` decorator that helps you track and visualize the execution of your Python functions. Here's how to use it:") + mo.md("""Flowshow provides a `@task` decorator that helps you track and visualize the execution of your Python functions. Here's how to use it:""") return @@ -15,28 +15,32 @@ def _(): import time import random - from flowshow import task + from flowshow import task, add_artifacts, info, debug + # Turns a function into a Task, which tracks a bunch of stuff @task def my_function(x): - print("This function should always run") + info("This function should always run") time.sleep(0.5) + add_artifacts({ + "foo": "Bar" + }) return x * 2 # Tasks can also be configured to handle retries @task(retry_on=ValueError, retry_attempts=10) def might_fail(): - print("This function call might fail") + info("This function call might fail") time.sleep(1.0) - if random.random() < 0.75: + if random.random() < 0.2: raise ValueError("oh no, error!") - print("The function has passed! Yay!") + debug("The function has passed! Yay!") return "done" @task def main_job(): - print("This output will be captured by the task") + info("This output will be captured by the task") for i in range(3): my_function(10) might_fail() @@ -44,7 +48,23 @@ def main_job(): # Run like you might run a normal function _ = main_job() - return main_job, might_fail, my_function, random, task, time + return ( + add_artifacts, + debug, + info, + main_job, + might_fail, + my_function, + random, + task, + time, + ) + + +@app.cell +def _(main_job): + main_job.last_run.to_dict() + return @app.cell @@ -60,12 +80,10 @@ def _(out): @app.cell(hide_code=True) -def _(main_job): - import marimo as mo - +def _(main_job, mo): chart = mo.ui.altair_chart(main_job.plot()) chart - return chart, mo + return (chart,) @app.cell(hide_code=True) @@ -75,5 +93,16 @@ def _(chart): return +@app.cell +def _(): + import marimo as mo + return (mo,) + + +@app.cell +def _(): + return + + if __name__ == "__main__": app.run() diff --git a/flowshow/__init__.py b/flowshow/__init__.py index 2a4ad0a..e3a74f3 100644 --- a/flowshow/__init__.py +++ b/flowshow/__init__.py @@ -1,3 +1,4 @@ +import uuid import inspect import io import sys @@ -7,7 +8,7 @@ from dataclasses import asdict, dataclass, field from datetime import datetime, timezone from functools import wraps -from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union, Literal import altair as alt import pandas as pd @@ -18,6 +19,7 @@ # Thread-local storage for tracking the current task _task_context = threading.local() +LogLevel = Literal["INFO", "WARNING", "ERROR", "DEBUG"] @dataclass class TaskRun: @@ -29,29 +31,40 @@ class TaskRun: output: Any = None error: Optional[Exception] = None subtasks: List["TaskRun"] = field(default_factory=list) - logs: Optional[str] = None + logs: List[List[str]] = field(default_factory=list) retry_count: int = 0 + id: str = field(default_factory=lambda: str(uuid.uuid4())) + artifacts: Dict[str, Any] = field(default_factory=dict) def add_subtask(self, subtask: "TaskRun"): self.subtasks.append(subtask) + def _log(self, level: LogLevel, message: str) -> None: + """Add a log entry with the specified level.""" + self.logs.append([level, message, datetime.now(timezone.utc).isoformat()]) + def to_dict(self) -> Dict[str, Any]: """Convert the task run and all its subtasks to a nested dictionary.""" result = { + "id": self.id, "task_name": self.task_name, "start_time": self.start_time.isoformat(), "duration": self.duration, "inputs": self.inputs, "error": str(self.error) if self.error else None, "retry_count": self.retry_count, + "artifacts": self.artifacts, } if self.end_time: result["end_time"] = self.end_time.isoformat() - if self.logs is not None: + if self.logs: result["logs"] = self.logs + if self.artifacts: + result["artifacts"] = self.artifacts + # Only include output if it's a simple type that can be serialized if isinstance(self.output, (str, int, float, bool, type(None))): result["output"] = self.output @@ -101,7 +114,7 @@ class TaskDefinition: def __call__(self, *args, **kwargs): # Create a new run run = TaskRun( - task_name=self.name, + task_name="CALLING: " + self.name, start_time=datetime.now(timezone.utc), inputs={**{f"arg{i}": arg for i, arg in enumerate(args)}, **kwargs}, ) @@ -112,10 +125,17 @@ def __call__(self, *args, **kwargs): start = time.perf_counter() if self.capture_logs: + # We still capture stdout but now we'll add it as an INFO log stdout_capture = io.StringIO() with redirect_stdout(stdout_capture): result = self.func(*args, **kwargs) - run.logs = stdout_capture.getvalue() + + # Add captured stdout as INFO logs, one per line + captured_output = stdout_capture.getvalue() + if captured_output: + for line in captured_output.splitlines(): + if line.strip(): # Skip empty lines + run.info(line) else: result = self.func(*args, **kwargs) @@ -130,6 +150,8 @@ def __call__(self, *args, **kwargs): # Record error if task fails run.end_time = datetime.now(timezone.utc) run.error = e + # Add the error to logs as well + run.error(str(e)) raise finally: @@ -180,3 +202,54 @@ def decorator(f: Callable) -> TaskDefinition: if func is None: return decorator return decorator(func) + +def add_artifacts(artifacts: Dict[str, Any]) -> bool: + """Add artifacts to the currently running task. + + Args: + artifacts: Dictionary of artifact name to artifact value + + Returns: + True if artifacts were added successfully, False if no task is running + """ + current_run = getattr(_task_context, "current_run", None) + if current_run is None: + return False + + # Update the artifacts dictionary with the new artifacts + current_run.artifacts.update(artifacts) + return True + +def log(level: LogLevel, message: str) -> bool: + """Add a log message to the currently running task. + + Args: + level: Log level ("INFO", "WARNING", "ERROR", "DEBUG") + message: The log message + + Returns: + True if log was added successfully, False if no task is running + """ + current_run = getattr(_task_context, "current_run", None) + if current_run is None: + return False + + current_run._log(level, message) + return True + +# Convenience methods for different log levels +def info(message: str) -> bool: + """Add an INFO level log message to the currently running task.""" + return log("INFO", message) + +def warning(message: str) -> bool: + """Add a WARNING level log message to the currently running task.""" + return log("WARNING", message) + +def error(message: str) -> bool: + """Add an ERROR level log message to the currently running task.""" + return log("ERROR", message) + +def debug(message: str) -> bool: + """Add a DEBUG level log message to the currently running task.""" + return log("DEBUG", message) From 4de02166c8caabd8ca1b53ce8a1f6dca960aa446 Mon Sep 17 00:00:00 2001 From: "Vincent D. Warmerdam" Date: Sat, 12 Apr 2025 22:53:22 +0200 Subject: [PATCH 02/26] better yet --- demo.py | 4 +--- flowshow/__init__.py | 24 ++++++++++++++---------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/demo.py b/demo.py index a3494bd..9930e13 100644 --- a/demo.py +++ b/demo.py @@ -23,9 +23,7 @@ def _(): def my_function(x): info("This function should always run") time.sleep(0.5) - add_artifacts({ - "foo": "Bar" - }) + add_artifacts(foo=1, bar=2) return x * 2 # Tasks can also be configured to handle retries diff --git a/flowshow/__init__.py b/flowshow/__init__.py index e3a74f3..b94b43a 100644 --- a/flowshow/__init__.py +++ b/flowshow/__init__.py @@ -4,6 +4,7 @@ import sys import threading import time +import contextvars from contextlib import contextmanager, redirect_stdout from dataclasses import asdict, dataclass, field from datetime import datetime, timezone @@ -16,8 +17,8 @@ from .visualize import flatten_tasks -# Thread-local storage for tracking the current task -_task_context = threading.local() +# Replace threading.local() with contextvars.ContextVar +_task_context = contextvars.ContextVar('task_context', default=None) LogLevel = Literal["INFO", "WARNING", "ERROR", "DEBUG"] @@ -94,14 +95,17 @@ def plot(self): @contextmanager def _task_run_context(run: TaskRun): - parent = getattr(_task_context, "current_run", None) - _task_context.current_run = run + # Get the parent task run (if any) + parent = _task_context.get() + # Save the previous context and set the new one + token = _task_context.set(run) try: yield finally: if parent is not None: parent.add_subtask(run) - _task_context.current_run = parent + # Restore the previous context + _task_context.reset(token) @dataclass @@ -156,7 +160,7 @@ def __call__(self, *args, **kwargs): finally: # Always add the run to history if this is a top-level task - if getattr(_task_context, "current_run", None) is run: + if _task_context.get() is run: self.runs.append(run) return result @@ -203,7 +207,7 @@ def decorator(f: Callable) -> TaskDefinition: return decorator return decorator(func) -def add_artifacts(artifacts: Dict[str, Any]) -> bool: +def add_artifacts(**artifacts: Dict[str, Any]) -> bool: """Add artifacts to the currently running task. Args: @@ -212,12 +216,12 @@ def add_artifacts(artifacts: Dict[str, Any]) -> bool: Returns: True if artifacts were added successfully, False if no task is running """ - current_run = getattr(_task_context, "current_run", None) + current_run = _task_context.get() if current_run is None: return False # Update the artifacts dictionary with the new artifacts - current_run.artifacts.update(artifacts) + current_run.artifacts.update(**artifacts) return True def log(level: LogLevel, message: str) -> bool: @@ -230,7 +234,7 @@ def log(level: LogLevel, message: str) -> bool: Returns: True if log was added successfully, False if no task is running """ - current_run = getattr(_task_context, "current_run", None) + current_run = _task_context.get() if current_run is None: return False From d878bf7f191461cb9020951dc8c7c4bfe68833b1 Mon Sep 17 00:00:00 2001 From: "Vincent D. Warmerdam" Date: Sat, 12 Apr 2025 23:05:38 +0200 Subject: [PATCH 03/26] flowshows --- flowshow/__init__.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/flowshow/__init__.py b/flowshow/__init__.py index b94b43a..7f1bf92 100644 --- a/flowshow/__init__.py +++ b/flowshow/__init__.py @@ -36,6 +36,7 @@ class TaskRun: retry_count: int = 0 id: str = field(default_factory=lambda: str(uuid.uuid4())) artifacts: Dict[str, Any] = field(default_factory=dict) + table: Dict[str, Any] = field(default_factory=dict) def add_subtask(self, subtask: "TaskRun"): self.subtasks.append(subtask) @@ -55,6 +56,7 @@ def to_dict(self) -> Dict[str, Any]: "error": str(self.error) if self.error else None, "retry_count": self.retry_count, "artifacts": self.artifacts, + "table": self.table, } if self.end_time: @@ -224,6 +226,23 @@ def add_artifacts(**artifacts: Dict[str, Any]) -> bool: current_run.artifacts.update(**artifacts) return True +def add_table(**table_items: Dict[str, Any]) -> bool: + """Add artifacts to the currently running task. + + Args: + artifacts: Dictionary of artifact name to artifact value + + Returns: + True if artifacts were added successfully, False if no task is running + """ + current_run = _task_context.get() + if current_run is None: + return False + + # Update the artifacts dictionary with the new artifacts + current_run.table.update(**table_items) + return True + def log(level: LogLevel, message: str) -> bool: """Add a log message to the currently running task. From 3c060ef258b6afac8368a137ff2e9f013ff4986a Mon Sep 17 00:00:00 2001 From: "Vincent D. Warmerdam" Date: Sat, 12 Apr 2025 23:22:57 +0200 Subject: [PATCH 04/26] progress --- flowshow/__init__.py | 25 +++++++++++++++++++++--- tests/test_basics.py | 45 ++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 65 insertions(+), 5 deletions(-) diff --git a/flowshow/__init__.py b/flowshow/__init__.py index 7f1bf92..161608d 100644 --- a/flowshow/__init__.py +++ b/flowshow/__init__.py @@ -10,6 +10,7 @@ from datetime import datetime, timezone from functools import wraps from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union, Literal +import traceback import altair as alt import pandas as pd @@ -31,6 +32,7 @@ class TaskRun: inputs: Dict[str, Any] = field(default_factory=dict) output: Any = None error: Optional[Exception] = None + error_traceback: Optional[str] = None subtasks: List["TaskRun"] = field(default_factory=list) logs: List[List[str]] = field(default_factory=list) retry_count: int = 0 @@ -54,6 +56,7 @@ def to_dict(self) -> Dict[str, Any]: "duration": self.duration, "inputs": self.inputs, "error": str(self.error) if self.error else None, + "error_traceback": self.error_traceback, "retry_count": self.retry_count, "artifacts": self.artifacts, "table": self.table, @@ -115,6 +118,7 @@ class TaskDefinition: func: Callable name: str capture_logs: bool = False + callback: Optional[Callable[[Dict[str, Any]], None]] = None runs: List[TaskRun] = field(default_factory=list) def __call__(self, *args, **kwargs): @@ -156,15 +160,27 @@ def __call__(self, *args, **kwargs): # Record error if task fails run.end_time = datetime.now(timezone.utc) run.error = e + # Capture the full traceback as a formatted string with linebreaks + run.error_traceback = traceback.format_exc() # Add the error to logs as well - run.error(str(e)) + error(str(e)) raise finally: # Always add the run to history if this is a top-level task if _task_context.get() is run: self.runs.append(run) - + + # Execute the callback if provided + if self.callback is not None: + try: + # Convert TaskRun to dictionary before passing to callback + self.callback(run.to_dict()) + except Exception as callback_error: + # Log but don't propagate callback errors + error_msg = f"Task callback error: {str(callback_error)}" + run._log("ERROR", error_msg) + return result @property @@ -189,6 +205,7 @@ def task( log: bool = True, retry_on: Optional[Union[Type[Exception], Tuple[Type[Exception], ...]]] = None, retry_attempts: Optional[int] = None, + callback: Optional[Callable[[Dict[str, Any]], None]] = None, ) -> Callable: """Decorator to mark a function as a trackable task. @@ -197,13 +214,15 @@ def task( log: If True, capture stdout during task execution retry_on: Exception or tuple of exceptions to retry on retry_attempts: Number of retry attempts + callback: Function to call after task completion (success or failure) + The callback receives the task run data as a dictionary """ def decorator(f: Callable) -> TaskDefinition: # Apply stamina retry if retry parameters are provided if retry_on is not None and retry_attempts is not None: f = stamina.retry(on=retry_on, attempts=retry_attempts)(f) - return TaskDefinition(func=f, name=f.__name__, capture_logs=log) + return TaskDefinition(func=f, name=f.__name__, capture_logs=log, callback=callback) if func is None: return decorator diff --git a/tests/test_basics.py b/tests/test_basics.py index 30315f4..5fe86d0 100644 --- a/tests/test_basics.py +++ b/tests/test_basics.py @@ -16,7 +16,7 @@ def simple_task(x: int) -> int: # Check task run information last_run = simple_task.last_run - assert last_run.task_name == "simple_task" + assert last_run.task_name == "CALLING: simple_task" assert isinstance(last_run.start_time, datetime) assert isinstance(last_run.end_time, datetime) assert last_run.duration > 0 @@ -41,7 +41,7 @@ def outer_task(x: int) -> int: outer_run = outer_task.last_run assert len(outer_run.subtasks) == 1 inner_run = outer_run.subtasks[0] - assert inner_run.task_name == "inner_task" + assert inner_run.task_name == "CALLING: inner_task" assert inner_run.inputs == {"arg0": 5} assert inner_run.output == 6 @@ -116,3 +116,44 @@ def visualized_task(): assert "start_time" in str(chart.encoding.x) assert "end_time" in str(chart.encoding.x2) assert "task_name" in str(chart.encoding.y) + + +def test_task_callbacks(): + # Track callback executions + callback_executions = [] + + def record_callback(task_data): + callback_executions.append(task_data) + + # Test successful task with callback + @task(callback=record_callback) + def success_task(x): + return x * 2 + + # Test task with error and callback + @task(callback=record_callback) + def error_task(): + raise ValueError("Expected error") + + # Run the successful task + result = success_task(5) + assert result == 10 + + # Run the failing task (should raise but callback should still execute) + with pytest.raises(ValueError, match="Expected error"): + error_task() + + # Verify both callbacks executed + assert len(callback_executions) == 2 + + # Check successful task callback data + success_data = callback_executions[0] + assert success_data["task_name"] == "CALLING: success_task" + assert success_data["error"] is None + assert success_data["inputs"]["arg0"] == 5 + + # Check error task callback data + error_data = callback_executions[1] + assert error_data["task_name"] == "CALLING: error_task" + assert "Expected error" in error_data["error"] + assert error_data["error_traceback"] is not None # Traceback should be present From 210db6dad0c00f591f8f0fc92a91ed36fa40d2ff Mon Sep 17 00:00:00 2001 From: "Vincent D. Warmerdam" Date: Sat, 12 Apr 2025 23:27:58 +0200 Subject: [PATCH 05/26] floshow --- flowshow/__init__.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/flowshow/__init__.py b/flowshow/__init__.py index 161608d..f43a09c 100644 --- a/flowshow/__init__.py +++ b/flowshow/__init__.py @@ -221,7 +221,21 @@ def task( def decorator(f: Callable) -> TaskDefinition: # Apply stamina retry if retry parameters are provided if retry_on is not None and retry_attempts is not None: - f = stamina.retry(on=retry_on, attempts=retry_attempts)(f) + # Create a wrapper that logs retries + original_func = f + + # This will be called by stamina on each retry + @wraps(original_func) + def retry_wrapper(*args, **kwargs): + current_run = _task_context.get() + if current_run is not None: + current_run.retry_count += 1 + warning(f"Retrying task (attempt {current_run.retry_count}/{retry_attempts}) after error") + return original_func(*args, **kwargs) + + # Apply stamina retry to our wrapper + f = stamina.retry(on=retry_on, attempts=retry_attempts)(retry_wrapper) + return TaskDefinition(func=f, name=f.__name__, capture_logs=log, callback=callback) if func is None: From 8cfd7518c29cd4b5b61ea223bfe8c1522a63a64d Mon Sep 17 00:00:00 2001 From: "Vincent D. Warmerdam" Date: Sat, 12 Apr 2025 23:38:23 +0200 Subject: [PATCH 06/26] vibe coding async support --- demo.py | 66 +++++++++++++++++++++++++++++++++--- flowshow/__init__.py | 79 ++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 139 insertions(+), 6 deletions(-) diff --git a/demo.py b/demo.py index 9930e13..fe3ac4f 100644 --- a/demo.py +++ b/demo.py @@ -17,6 +17,11 @@ def _(): from flowshow import task, add_artifacts, info, debug + d = {} + + def store(b): + global d + d = b # Turns a function into a Task, which tracks a bunch of stuff @task @@ -27,16 +32,16 @@ def my_function(x): return x * 2 # Tasks can also be configured to handle retries - @task(retry_on=ValueError, retry_attempts=10) + @task(retry_on=ValueError, retry_attempts=3) def might_fail(): info("This function call might fail") time.sleep(1.0) - if random.random() < 0.2: + if random.random() < 0.9: raise ValueError("oh no, error!") debug("The function has passed! Yay!") return "done" - @task + @task(callback=store) def main_job(): info("This output will be captured by the task") for i in range(3): @@ -48,20 +53,73 @@ def main_job(): _ = main_job() return ( add_artifacts, + d, debug, info, main_job, might_fail, my_function, random, + store, task, time, ) +@app.cell +def _(d): + d + return + + @app.cell def _(main_job): - main_job.last_run.to_dict() + main_job.last_run.to_dict()['error_traceback'] + return + + +@app.cell +async def _(info, task, time): + import asyncio + + @task + async def async_sleep(seconds: float, name: str) -> str: + """Asynchronous sleep function that returns a message after completion""" + info("it works, right?") + await asyncio.sleep(seconds) + return f"{name} finished sleeping for {seconds} seconds" + + @task + async def run_concurrent_tasks(): + """Run multiple sleep tasks concurrently""" + start_time = time.time() + + # Create multiple sleep tasks + tasks = [ + async_sleep(2, "Task 1"), + async_sleep(1, "Task 2"), + async_sleep(3, "Task 3") + ] + + # Run tasks concurrently and gather results + results = await asyncio.gather(*tasks) + + end_time = time.time() + total_time = end_time - start_time + + # Return results and timing information + return { + "results": results, + "total_time": f"Total execution time: {total_time:.2f} seconds" + } + + await run_concurrent_tasks() + return async_sleep, asyncio, run_concurrent_tasks + + +@app.cell +def _(run_concurrent_tasks): + run_concurrent_tasks.last_run.to_dict() return diff --git a/flowshow/__init__.py b/flowshow/__init__.py index f43a09c..442c6df 100644 --- a/flowshow/__init__.py +++ b/flowshow/__init__.py @@ -5,7 +5,7 @@ import threading import time import contextvars -from contextlib import contextmanager, redirect_stdout +from contextlib import contextmanager, redirect_stdout, asynccontextmanager from dataclasses import asdict, dataclass, field from datetime import datetime, timezone from functools import wraps @@ -113,6 +113,18 @@ def _task_run_context(run: TaskRun): _task_context.reset(token) +@asynccontextmanager +async def _async_task_run_context(run: TaskRun): + parent = _task_context.get() + token = _task_context.set(run) + try: + yield + finally: + if parent is not None: + parent.add_subtask(run) + _task_context.reset(token) + + @dataclass class TaskDefinition: func: Callable @@ -120,15 +132,28 @@ class TaskDefinition: capture_logs: bool = False callback: Optional[Callable[[Dict[str, Any]], None]] = None runs: List[TaskRun] = field(default_factory=list) + is_async: bool = field(default=False) + + def __post_init__(self): + # Detect if the wrapped function is async + self.is_async = inspect.iscoroutinefunction(self.func) def __call__(self, *args, **kwargs): + if self.is_async: + # Return awaitable for async functions + return self._async_call(*args, **kwargs) + else: + # Execute synchronously for regular functions + return self._sync_call(*args, **kwargs) + + def _sync_call(self, *args, **kwargs): # Create a new run run = TaskRun( task_name="CALLING: " + self.name, start_time=datetime.now(timezone.utc), inputs={**{f"arg{i}": arg for i, arg in enumerate(args)}, **kwargs}, ) - + with _task_run_context(run): try: # Execute the task @@ -198,6 +223,56 @@ def get_all_runs_history(self) -> List[Dict[str, Any]]: """Returns the complete history of all runs with their nested subtasks.""" return [run.to_dict() for run in self.runs] + async def _async_call(self, *args, **kwargs): + # Create a new run + run = TaskRun( + task_name="CALLING: " + self.name, + start_time=datetime.now(timezone.utc), + inputs={**{f"arg{i}": arg for i, arg in enumerate(args)}, **kwargs}, + ) + + # Need async context manager + async with _async_task_run_context(run): + try: + # Execute the task + start = time.perf_counter() + + if self.capture_logs: + # Handling logs in async is more complex + # Simplified version for now + result = await self.func(*args, **kwargs) + else: + result = await self.func(*args, **kwargs) + + end = time.perf_counter() + + # Record successful completion + run.end_time = datetime.now(timezone.utc) + run.duration = end - start + run.output = result + + except Exception as e: + # Similar error handling as in sync version + run.end_time = datetime.now(timezone.utc) + run.error = e + run.error_traceback = traceback.format_exc() + error(str(e)) + raise + + finally: + # Same callback and bookkeeping logic + if _task_context.get() is run: + self.runs.append(run) + + if self.callback is not None: + try: + self.callback(run.to_dict()) + except Exception as callback_error: + error_msg = f"Task callback error: {str(callback_error)}" + run._log("ERROR", error_msg) + + return result + def task( func: Optional[Callable] = None, From b04030583289e62e7f67e80e6ecebf88317158cc Mon Sep 17 00:00:00 2001 From: "Vincent D. Warmerdam" Date: Sun, 13 Apr 2025 08:16:31 +0200 Subject: [PATCH 07/26] go --- index.jinja2 | 269 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 269 insertions(+) create mode 100644 index.jinja2 diff --git a/index.jinja2 b/index.jinja2 new file mode 100644 index 0000000..66765a7 --- /dev/null +++ b/index.jinja2 @@ -0,0 +1,269 @@ + + + + + + Task Span Visualization + + + + + + +
+
+ +
+ +
+ +
+ +
+ + +
+ +
+
+
+
+
+ + + +
+ + +
+
+ +
+
+
+
+ + + + \ No newline at end of file From 0ba96d6aec1ed259dcea87443cb546013a7020c7 Mon Sep 17 00:00:00 2001 From: "Vincent D. Warmerdam" Date: Sun, 13 Apr 2025 08:38:26 +0200 Subject: [PATCH 08/26] go --- demo.py | 46 +++- flowshow/__init__.py | 4 +- index.html | 551 +++++++++++++++++++++---------------------- index.jinja2 | 160 +++++++------ 4 files changed, 403 insertions(+), 358 deletions(-) diff --git a/demo.py b/demo.py index fe3ac4f..e11bbc8 100644 --- a/demo.py +++ b/demo.py @@ -1,7 +1,7 @@ import marimo __generated_with = "0.12.8" -app = marimo.App(width="medium") +app = marimo.App(width="full") @app.cell @@ -15,7 +15,7 @@ def _(): import time import random - from flowshow import task, add_artifacts, info, debug + from flowshow import task, add_artifacts, info, debug, warning, error d = {} @@ -27,7 +27,7 @@ def store(b): @task def my_function(x): info("This function should always run") - time.sleep(0.5) + time.sleep(0.2) add_artifacts(foo=1, bar=2) return x * 2 @@ -35,9 +35,8 @@ def my_function(x): @task(retry_on=ValueError, retry_attempts=3) def might_fail(): info("This function call might fail") - time.sleep(1.0) - if random.random() < 0.9: - raise ValueError("oh no, error!") + time.sleep(0.2) + my_function(2) debug("The function has passed! Yay!") return "done" @@ -55,6 +54,7 @@ def main_job(): add_artifacts, d, debug, + error, info, main_job, might_fail, @@ -63,9 +63,16 @@ def main_job(): store, task, time, + warning, ) +@app.cell +def _(d, mo, template): + mo.iframe(template.render(data=d)) + return + + @app.cell def _(d): d @@ -79,7 +86,7 @@ def _(main_job): @app.cell -async def _(info, task, time): +async def _(error, info, task, time, warning): import asyncio @task @@ -87,6 +94,7 @@ async def async_sleep(seconds: float, name: str) -> str: """Asynchronous sleep function that returns a message after completion""" info("it works, right?") await asyncio.sleep(seconds) + info("it did!") return f"{name} finished sleeping for {seconds} seconds" @task @@ -113,14 +121,28 @@ async def run_concurrent_tasks(): "total_time": f"Total execution time: {total_time:.2f} seconds" } - await run_concurrent_tasks() - return async_sleep, asyncio, run_concurrent_tasks + @task + async def run_many_nested(): + info("About to start task 1") + await run_concurrent_tasks() + info("About to start task 2") + await run_concurrent_tasks() + warning("They both ran!") + error("They both ran!") + + await run_many_nested() + return async_sleep, asyncio, run_concurrent_tasks, run_many_nested @app.cell -def _(run_concurrent_tasks): - run_concurrent_tasks.last_run.to_dict() - return +def _(mo, run_many_nested): + from pathlib import Path + from jinja2 import Template + + template = Template(Path("index.jinja2").read_text()) + + mo.iframe(template.render(data=run_many_nested.last_run.to_dict())) + return Path, Template, template @app.cell diff --git a/flowshow/__init__.py b/flowshow/__init__.py index 442c6df..e5736e4 100644 --- a/flowshow/__init__.py +++ b/flowshow/__init__.py @@ -1,8 +1,6 @@ import uuid import inspect import io -import sys -import threading import time import contextvars from contextlib import contextmanager, redirect_stdout, asynccontextmanager @@ -149,7 +147,7 @@ def __call__(self, *args, **kwargs): def _sync_call(self, *args, **kwargs): # Create a new run run = TaskRun( - task_name="CALLING: " + self.name, + task_name=self.name, start_time=datetime.now(timezone.utc), inputs={**{f"arg{i}": arg for i, arg in enumerate(args)}, **kwargs}, ) diff --git a/index.html b/index.html index 9578920..f07cd84 100644 --- a/index.html +++ b/index.html @@ -1,317 +1,316 @@ - + + + + Task Span Visualization + + + - -
-
-
-
+ +
+
+ +
+ +
+ +
+ +
+ + +
+ +
+
+
+
+
+ + + +
+ + +
+
+ +
+
-
\ No newline at end of file diff --git a/index.jinja2 b/index.jinja2 index 66765a7..8fdc651 100644 --- a/index.jinja2 +++ b/index.jinja2 @@ -56,46 +56,29 @@
- -
- -
- -
- - -
- -
-
-
-
-
- - -