From 5ca7e48536222846245107117fb44a4798ac195a Mon Sep 17 00:00:00 2001 From: TimeToBuildBob Date: Wed, 11 Mar 2026 14:08:59 +0000 Subject: [PATCH] feat(sync): add AWSync client for pushing events to aw-sync-server MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds `aw_client/sync.py` with `AWSync` — a lightweight client that incrementally pushes local ActivityWatch bucket events to a self-hosted aw-sync-server (https://github.com/TimeToBuildBob/aw-sync-server). Key design: - Uses `ActivityWatchClient` for the local AW instance - Talks to the sync server with raw `requests` + Bearer token auth - Persists a per-bucket high-water mark to `~/.config/activitywatch/aw-sync-state.json` so that re-runs only upload new events (incremental sync) - Handles bucket creation on the sync server automatically - Errors per bucket are caught and returned as -1 so one bad bucket doesn't abort the whole sync 9 unit tests cover: happy path, empty bucket skip, prefix filter, state persistence, incremental since= arg, error handling, missing state file, existing remote bucket not recreated, auth header present. --- aw_client/__init__.py | 3 +- aw_client/sync.py | 190 ++++++++++++++++++++++++++++++++++++++ tests/test_sync.py | 209 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 401 insertions(+), 1 deletion(-) create mode 100644 aw_client/sync.py create mode 100644 tests/test_sync.py diff --git a/aw_client/__init__.py b/aw_client/__init__.py index d43dcb9..7afffe8 100644 --- a/aw_client/__init__.py +++ b/aw_client/__init__.py @@ -1,3 +1,4 @@ from .client import ActivityWatchClient +from .sync import AWSync -__all__ = ["ActivityWatchClient"] +__all__ = ["ActivityWatchClient", "AWSync"] diff --git a/aw_client/sync.py b/aw_client/sync.py new file mode 100644 index 0000000..a4240ba --- /dev/null +++ b/aw_client/sync.py @@ -0,0 +1,190 @@ +""" +Sync client for ActivityWatch — push local events to a self-hosted aw-sync-server. + +Usage:: + + from aw_client.sync import AWSync + + sync = AWSync(sync_url="http://localhost:5667", api_key="mykey") + results = sync.sync() # {bucket_id: events_uploaded} + + # Sync only window-activity buckets + results = sync.sync(bucket_filter="aw-watcher-window") + +See https://github.com/TimeToBuildBob/aw-sync-server for the server implementation. +""" +import json +import logging +import socket +from datetime import datetime, timedelta +from pathlib import Path +from typing import Dict, List, Optional + +import requests +from aw_core.models import Event + +from .client import ActivityWatchClient + +logger = logging.getLogger(__name__) + +_DEFAULT_STATE_FILE = Path.home() / ".config" / "activitywatch" / "aw-sync-state.json" + + +class AWSync: + """Push ActivityWatch events to a self-hosted aw-sync-server. + + The sync server must implement the standard ActivityWatch bucket+events API + and accept an ``Authorization: Bearer `` header. + + State (last-synced timestamp per bucket) is persisted in a JSON file so that + incremental syncs only upload new events. + """ + + def __init__( + self, + sync_url: str, + api_key: str, + local_client: Optional[ActivityWatchClient] = None, + state_file: Optional[Path] = None, + ) -> None: + """ + Args: + sync_url: Base URL of the sync server, e.g. ``http://localhost:5667``. + api_key: Bearer token for authenticating to the sync server. + local_client: Optional pre-constructed local AW client; one is created + with default settings if not provided. + state_file: Path for persisting last-synced timestamps per bucket. + Defaults to ``~/.config/activitywatch/aw-sync-state.json``. + """ + self._base_url = sync_url.rstrip("/") + "/api/0" + self._auth_headers = { + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + } + self.local = local_client or ActivityWatchClient(client_name="aw-sync") + self._state_file = state_file or _DEFAULT_STATE_FILE + self._state: Dict[str, str] = self._load_state() + + # ------------------------------------------------------------------ state + + def _load_state(self) -> Dict[str, str]: + """Load persisted sync state (last-synced ISO timestamp per bucket).""" + if self._state_file.exists(): + try: + return json.loads(self._state_file.read_text()) + except (json.JSONDecodeError, OSError): + logger.warning("Could not read sync state file; starting fresh") + return {} + + def _save_state(self) -> None: + """Persist sync state to disk.""" + self._state_file.parent.mkdir(parents=True, exist_ok=True) + self._state_file.write_text(json.dumps(self._state, indent=2)) + + # ------------------------------------------------------- sync-server API + + def _url(self, path: str) -> str: + return f"{self._base_url}/{path.lstrip('/')}" + + def _get_remote_buckets(self) -> Dict[str, dict]: + r = requests.get(self._url("buckets/"), headers=self._auth_headers, timeout=30) + r.raise_for_status() + return r.json() + + def _ensure_remote_bucket( + self, bucket_id: str, event_type: str, hostname: str + ) -> None: + """Create bucket on sync server if it doesn't exist yet.""" + r = requests.post( + self._url(f"buckets/{bucket_id}"), + json={ + "client": "aw-sync", + "hostname": hostname, + "type": event_type, + }, + headers=self._auth_headers, + timeout=30, + ) + # 200 (already exists) and 201 (created) are both fine + if r.status_code not in (200, 201): + r.raise_for_status() + + def _upload_events(self, bucket_id: str, events: List[Event]) -> int: + """Upload a batch of events. Returns the number of events sent.""" + if not events: + return 0 + r = requests.post( + self._url(f"buckets/{bucket_id}/events"), + json=[e.to_json_dict() for e in events], + headers=self._auth_headers, + timeout=60, + ) + r.raise_for_status() + return len(events) + + # ----------------------------------------------------------- sync logic + + def sync_bucket(self, bucket_id: str, bucket_info: dict) -> int: + """Sync one local bucket to the sync server. + + Args: + bucket_id: ID of the bucket to sync. + bucket_info: Metadata dict as returned by ``get_buckets()``. + + Returns: + Number of events uploaded (0 if nothing new). + """ + since: Optional[datetime] = None + if bucket_id in self._state: + since = datetime.fromisoformat(self._state[bucket_id]) + + events = self.local.get_events(bucket_id, start=since) + if not events: + return 0 + + hostname: str = bucket_info.get("hostname") or socket.gethostname() + event_type: str = bucket_info.get("type", "unknown") + + remote_buckets = self._get_remote_buckets() + if bucket_id not in remote_buckets: + self._ensure_remote_bucket(bucket_id, event_type, hostname) + + count = self._upload_events(bucket_id, events) + + # Advance the high-water mark to the end of the latest event + latest: datetime = max( + e.timestamp + (e.duration or timedelta(0)) for e in events + ) + self._state[bucket_id] = latest.isoformat() + self._save_state() + + return count + + def sync(self, bucket_filter: Optional[str] = None) -> Dict[str, int]: + """Sync local ActivityWatch buckets to the sync server. + + Args: + bucket_filter: Optional prefix; only buckets whose id starts with + this string are synced. Pass e.g. ``"aw-watcher-window"`` + to sync only window-activity buckets. + + Returns: + Mapping of ``bucket_id`` → events uploaded. A value of ``-1`` + indicates that the sync for that bucket failed. + """ + buckets = self.local.get_buckets() + results: Dict[str, int] = {} + + for bucket_id, info in buckets.items(): + if bucket_filter and not bucket_id.startswith(bucket_filter): + continue + try: + count = self.sync_bucket(bucket_id, info) + if count > 0: + logger.info("Synced %d events from %s", count, bucket_id) + results[bucket_id] = count + except Exception as e: + logger.error("Failed to sync %s: %s", bucket_id, e) + results[bucket_id] = -1 + + return results diff --git a/tests/test_sync.py b/tests/test_sync.py new file mode 100644 index 0000000..3f1bcb7 --- /dev/null +++ b/tests/test_sync.py @@ -0,0 +1,209 @@ +"""Tests for aw_client.sync.""" +import json +from datetime import datetime, timedelta, timezone +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest +from aw_core.models import Event + +from aw_client.sync import AWSync + + +# --------------------------------------------------------------------------- +# helpers + + +def _make_event(ts_offset_secs: int = 0, duration_secs: int = 60) -> Event: + ts = datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + timedelta( + seconds=ts_offset_secs + ) + return Event( + timestamp=ts, + duration=timedelta(seconds=duration_secs), + data={"app": "test"}, + ) + + +def _mock_get_response(buckets: dict) -> MagicMock: + m = MagicMock() + m.status_code = 200 + m.raise_for_status = MagicMock() + m.json.return_value = buckets + return m + + +def _mock_post_response(status: int = 201) -> MagicMock: + m = MagicMock() + m.status_code = status + m.raise_for_status = MagicMock() + return m + + +# --------------------------------------------------------------------------- +# fixtures + + +@pytest.fixture +def mock_local_client() -> MagicMock: + client = MagicMock() + client.get_buckets.return_value = { + "aw-watcher-window_host": { + "type": "currentwindow", + "hostname": "host", + } + } + client.get_events.return_value = [_make_event(0), _make_event(60)] + return client + + +@pytest.fixture +def sync_obj(mock_local_client: MagicMock, tmp_path: Path) -> AWSync: + return AWSync( + sync_url="http://localhost:5667", + api_key="test-key", + local_client=mock_local_client, + state_file=tmp_path / "state.json", + ) + + +# --------------------------------------------------------------------------- +# tests + + +class TestAWSync: + def test_sync_uploads_events( + self, sync_obj: AWSync, mock_local_client: MagicMock + ) -> None: + """Happy path: two events are uploaded, bucket created on server.""" + with patch("requests.get") as mock_get, patch("requests.post") as mock_post: + mock_get.return_value = _mock_get_response({}) # no remote buckets yet + mock_post.return_value = _mock_post_response(201) + + results = sync_obj.sync() + + assert results["aw-watcher-window_host"] == 2 + # 1st POST → create bucket; 2nd POST → upload events + assert mock_post.call_count == 2 + + def test_sync_skips_empty_bucket( + self, sync_obj: AWSync, mock_local_client: MagicMock + ) -> None: + """Buckets with no events are skipped; no POST calls made.""" + mock_local_client.get_events.return_value = [] + + with patch("requests.get") as mock_get, patch("requests.post") as mock_post: + mock_get.return_value = _mock_get_response({}) + + results = sync_obj.sync() + + assert results["aw-watcher-window_host"] == 0 + mock_post.assert_not_called() + + def test_sync_filter_by_prefix( + self, sync_obj: AWSync, mock_local_client: MagicMock + ) -> None: + """bucket_filter excludes buckets whose id doesn't match the prefix.""" + with patch("requests.get"), patch("requests.post") as mock_post: + results = sync_obj.sync(bucket_filter="aw-watcher-afk") + + assert results == {} + mock_post.assert_not_called() + + def test_state_persisted_after_sync( + self, sync_obj: AWSync, tmp_path: Path + ) -> None: + """State file is written after a successful sync.""" + with patch("requests.get") as mock_get, patch("requests.post") as mock_post: + mock_get.return_value = _mock_get_response( + {"aw-watcher-window_host": {"type": "currentwindow"}} + ) + mock_post.return_value = _mock_post_response(200) + + sync_obj.sync() + + state_file = tmp_path / "state.json" + assert state_file.exists() + state = json.loads(state_file.read_text()) + assert "aw-watcher-window_host" in state + + def test_incremental_sync_passes_since( + self, sync_obj: AWSync, mock_local_client: MagicMock + ) -> None: + """On the second sync, get_events is called with a start= argument.""" + with patch("requests.get") as mock_get, patch("requests.post") as mock_post: + mock_get.return_value = _mock_get_response( + {"aw-watcher-window_host": {}} + ) + mock_post.return_value = _mock_post_response(200) + + sync_obj.sync() # first sync — state saved + + # Second sync — should pass start= based on state + mock_local_client.get_events.return_value = [_make_event(120)] + with patch("requests.get") as mock_get, patch("requests.post") as mock_post: + mock_get.return_value = _mock_get_response( + {"aw-watcher-window_host": {}} + ) + mock_post.return_value = _mock_post_response(200) + + sync_obj.sync() + + call_kwargs = mock_local_client.get_events.call_args + start_arg = call_kwargs.kwargs.get("start") + assert start_arg is not None, "start= should be passed on second sync" + assert isinstance(start_arg, datetime) + + def test_error_is_caught_returns_minus_one( + self, sync_obj: AWSync + ) -> None: + """Network errors are caught; the bucket entry is set to -1.""" + with patch("requests.get") as mock_get: + mock_get.side_effect = Exception("network error") + + results = sync_obj.sync() + + assert results["aw-watcher-window_host"] == -1 + + def test_missing_state_file_handled( + self, mock_local_client: MagicMock, tmp_path: Path + ) -> None: + """A missing state file path does not raise during construction.""" + nonexistent = tmp_path / "subdir" / "state.json" + s = AWSync( + "http://localhost:5667", + "key", + local_client=mock_local_client, + state_file=nonexistent, + ) + assert s._state == {} + + def test_existing_remote_bucket_not_recreated( + self, sync_obj: AWSync, mock_local_client: MagicMock + ) -> None: + """If the bucket already exists on the sync server, skip the create POST.""" + with patch("requests.get") as mock_get, patch("requests.post") as mock_post: + mock_get.return_value = _mock_get_response( + {"aw-watcher-window_host": {"type": "currentwindow"}} + ) + mock_post.return_value = _mock_post_response(200) + + sync_obj.sync() + + # Only 1 POST: the events upload (no bucket-creation POST) + assert mock_post.call_count == 1 + + def test_auth_header_sent( + self, sync_obj: AWSync, mock_local_client: MagicMock + ) -> None: + """The Bearer token is included in every request to the sync server.""" + with patch("requests.get") as mock_get, patch("requests.post") as mock_post: + mock_get.return_value = _mock_get_response({}) + mock_post.return_value = _mock_post_response(201) + + sync_obj.sync() + + for call in [*mock_get.call_args_list, *mock_post.call_args_list]: + headers = call.kwargs.get("headers", {}) + assert "Authorization" in headers + assert headers["Authorization"] == "Bearer test-key"