Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion aw_client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .client import ActivityWatchClient
from .sync import AWSync

__all__ = ["ActivityWatchClient"]
__all__ = ["ActivityWatchClient", "AWSync"]
190 changes: 190 additions & 0 deletions aw_client/sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
"""
Sync client for ActivityWatch — push local events to a self-hosted aw-sync-server.

Usage::

from aw_client.sync import AWSync

sync = AWSync(sync_url="http://localhost:5667", api_key="mykey")
results = sync.sync() # {bucket_id: events_uploaded}

# Sync only window-activity buckets
results = sync.sync(bucket_filter="aw-watcher-window")

See https://github.com/TimeToBuildBob/aw-sync-server for the server implementation.
"""
import json
import logging
import socket
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional

import requests
from aw_core.models import Event

from .client import ActivityWatchClient

logger = logging.getLogger(__name__)

_DEFAULT_STATE_FILE = Path.home() / ".config" / "activitywatch" / "aw-sync-state.json"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardcoded ~/.config path is not cross-platform

Path.home() / ".config" / "activitywatch" is XDG-style and works on Linux/macOS, but on Windows it resolves to C:\Users\<user>\.config\activitywatch\, which is not the conventional location for config files there (normally %APPDATA%\activitywatch\).

The existing client.py already uses from aw_core.dirs import get_data_dir for a platform-aware data path. If aw_core.dirs exposes a get_config_dir() (or a state-directory equivalent), it should be used here to stay consistent with how ActivityWatch manages paths across platforms.

Suggested change
_DEFAULT_STATE_FILE = Path.home() / ".config" / "activitywatch" / "aw-sync-state.json"
_DEFAULT_STATE_FILE = Path(get_data_dir("aw-client")) / "aw-sync-state.json"

(Adjust to the correct aw_core.dirs helper once confirmed; the important point is to avoid hardcoding ~/.config.)



class AWSync:
"""Push ActivityWatch events to a self-hosted aw-sync-server.

The sync server must implement the standard ActivityWatch bucket+events API
and accept an ``Authorization: Bearer <api_key>`` header.

State (last-synced timestamp per bucket) is persisted in a JSON file so that
incremental syncs only upload new events.
"""

def __init__(
self,
sync_url: str,
api_key: str,
local_client: Optional[ActivityWatchClient] = None,
state_file: Optional[Path] = None,
) -> None:
"""
Args:
sync_url: Base URL of the sync server, e.g. ``http://localhost:5667``.
api_key: Bearer token for authenticating to the sync server.
local_client: Optional pre-constructed local AW client; one is created
with default settings if not provided.
state_file: Path for persisting last-synced timestamps per bucket.
Defaults to ``~/.config/activitywatch/aw-sync-state.json``.
"""
self._base_url = sync_url.rstrip("/") + "/api/0"
self._auth_headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
self.local = local_client or ActivityWatchClient(client_name="aw-sync")
self._state_file = state_file or _DEFAULT_STATE_FILE
self._state: Dict[str, str] = self._load_state()

# ------------------------------------------------------------------ state

def _load_state(self) -> Dict[str, str]:
"""Load persisted sync state (last-synced ISO timestamp per bucket)."""
if self._state_file.exists():
try:
return json.loads(self._state_file.read_text())
except (json.JSONDecodeError, OSError):
logger.warning("Could not read sync state file; starting fresh")
return {}

def _save_state(self) -> None:
"""Persist sync state to disk."""
self._state_file.parent.mkdir(parents=True, exist_ok=True)
self._state_file.write_text(json.dumps(self._state, indent=2))

# ------------------------------------------------------- sync-server API

def _url(self, path: str) -> str:
return f"{self._base_url}/{path.lstrip('/')}"

def _get_remote_buckets(self) -> Dict[str, dict]:
r = requests.get(self._url("buckets/"), headers=self._auth_headers, timeout=30)
r.raise_for_status()
return r.json()

def _ensure_remote_bucket(
self, bucket_id: str, event_type: str, hostname: str
) -> None:
"""Create bucket on sync server if it doesn't exist yet."""
r = requests.post(
self._url(f"buckets/{bucket_id}"),
json={
"client": "aw-sync",
"hostname": hostname,
"type": event_type,
},
headers=self._auth_headers,
timeout=30,
)
# 200 (already exists) and 201 (created) are both fine
if r.status_code not in (200, 201):
r.raise_for_status()

def _upload_events(self, bucket_id: str, events: List[Event]) -> int:
"""Upload a batch of events. Returns the number of events sent."""
if not events:
return 0
r = requests.post(
self._url(f"buckets/{bucket_id}/events"),
json=[e.to_json_dict() for e in events],
headers=self._auth_headers,
timeout=60,
)
r.raise_for_status()
return len(events)

# ----------------------------------------------------------- sync logic

def sync_bucket(self, bucket_id: str, bucket_info: dict) -> int:
"""Sync one local bucket to the sync server.

Args:
bucket_id: ID of the bucket to sync.
bucket_info: Metadata dict as returned by ``get_buckets()``.

Returns:
Number of events uploaded (0 if nothing new).
"""
since: Optional[datetime] = None
if bucket_id in self._state:
since = datetime.fromisoformat(self._state[bucket_id])

events = self.local.get_events(bucket_id, start=since)
if not events:
return 0

hostname: str = bucket_info.get("hostname") or socket.gethostname()
event_type: str = bucket_info.get("type", "unknown")

remote_buckets = self._get_remote_buckets()
if bucket_id not in remote_buckets:
self._ensure_remote_bucket(bucket_id, event_type, hostname)
Comment on lines +148 to +150
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remote bucket list fetched once per bucket, not once per sync

_get_remote_buckets() is called inside sync_bucket(), which is called for every bucket during a single sync() run. This means N identical GET requests hit the sync server for N local buckets — even though the list of remote buckets doesn't change between iterations within the same sync() call.

The fix is to fetch remote buckets once in sync() and pass the result down to sync_bucket():

# In sync():
remote_buckets = self._get_remote_buckets()
for bucket_id, info in buckets.items():
    ...
    count = self.sync_bucket(bucket_id, info, remote_buckets=remote_buckets)

# sync_bucket signature:
def sync_bucket(self, bucket_id: str, bucket_info: dict, remote_buckets: Optional[Dict[str, dict]] = None) -> int:
    ...
    if remote_buckets is None:
        remote_buckets = self._get_remote_buckets()
    if bucket_id not in remote_buckets:
        self._ensure_remote_bucket(bucket_id, event_type, hostname)

This also makes sync_bucket() still usable as a standalone method (with a fresh fetch) while being efficient when called from sync().


count = self._upload_events(bucket_id, events)

# Advance the high-water mark to the end of the latest event
latest: datetime = max(
e.timestamp + (e.duration or timedelta(0)) for e in events
)
self._state[bucket_id] = latest.isoformat()
Comment on lines +155 to +158
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate uploads for zero-duration events

The high-water mark is computed as e.timestamp + (e.duration or timedelta(0)). In Python, timedelta(0) is falsy, so the expression correctly evaluates to timedelta(0) in either the None or timedelta(0) case — but the result is the same: when the latest event has a zero (or absent) duration, latest equals the event's timestamp.

On the next sync, get_events(bucket_id, start=since) fetches events with timestamp >= since. Since that zero-duration event's timestamp == since, it gets included in the response again and is re-uploaded, causing a duplicate on the server.

The fix is to advance the marker by one microsecond (the minimum representable tick) so that strictly-later events are fetched next time:

Suggested change
latest: datetime = max(
e.timestamp + (e.duration or timedelta(0)) for e in events
)
self._state[bucket_id] = latest.isoformat()
latest: datetime = max(
e.timestamp + (e.duration or timedelta(0)) for e in events
)
# Add 1 µs so a zero-duration event is not re-fetched on the next sync
from datetime import timedelta as _td
latest += timedelta(microseconds=1)
self._state[bucket_id] = latest.isoformat()
self._save_state()

Or more cleanly, keep the import at module level and apply the offset:

latest: datetime = max(
    e.timestamp + (e.duration or timedelta(0)) for e in events
) + timedelta(microseconds=1)
self._state[bucket_id] = latest.isoformat()
self._save_state()

self._save_state()

return count

def sync(self, bucket_filter: Optional[str] = None) -> Dict[str, int]:
"""Sync local ActivityWatch buckets to the sync server.

Args:
bucket_filter: Optional prefix; only buckets whose id starts with
this string are synced. Pass e.g. ``"aw-watcher-window"``
to sync only window-activity buckets.

Returns:
Mapping of ``bucket_id`` → events uploaded. A value of ``-1``
indicates that the sync for that bucket failed.
"""
buckets = self.local.get_buckets()
results: Dict[str, int] = {}

for bucket_id, info in buckets.items():
if bucket_filter and not bucket_id.startswith(bucket_filter):
continue
try:
count = self.sync_bucket(bucket_id, info)
if count > 0:
logger.info("Synced %d events from %s", count, bucket_id)
results[bucket_id] = count
except Exception as e:
logger.error("Failed to sync %s: %s", bucket_id, e)
results[bucket_id] = -1

return results
Loading
Loading