Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions allways/validator/scoring_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
"""Persist the scoring window and voted set across validator restarts.

Without persistence, SwapTracker.window starts empty after a restart.
With SCORING_EMA_ALPHA=1.0 (instantaneous scoring), the first scoring
cycle zeros all miner weights because the window contains no completed
swaps. This module writes the window to a JSON file after every update
and restores it on cold start so scoring is continuous.
"""

import json
import os
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple

import bittensor as bt

from allways.classes import Swap, SwapStatus


def _swap_to_dict(swap: Swap) -> dict:
return {
'id': swap.id,
'user_hotkey': swap.user_hotkey,
'miner_hotkey': swap.miner_hotkey,
'source_chain': swap.source_chain,
'dest_chain': swap.dest_chain,
'source_amount': swap.source_amount,
'dest_amount': swap.dest_amount,
'tao_amount': swap.tao_amount,
'user_source_address': swap.user_source_address,
'user_dest_address': swap.user_dest_address,
'miner_source_address': swap.miner_source_address,
'miner_dest_address': swap.miner_dest_address,
'rate': swap.rate,
'source_tx_hash': swap.source_tx_hash,
'source_tx_block': swap.source_tx_block,
'dest_tx_hash': swap.dest_tx_hash,
'dest_tx_block': swap.dest_tx_block,
'status': swap.status.value,
'initiated_block': swap.initiated_block,
'timeout_block': swap.timeout_block,
'fulfilled_block': swap.fulfilled_block,
'completed_block': swap.completed_block,
}


def _dict_to_swap(d: dict) -> Optional[Swap]:
try:
return Swap(
id=d['id'],
user_hotkey=d['user_hotkey'],
miner_hotkey=d['miner_hotkey'],
source_chain=d['source_chain'],
dest_chain=d['dest_chain'],
source_amount=d['source_amount'],
dest_amount=d['dest_amount'],
tao_amount=d['tao_amount'],
user_source_address=d['user_source_address'],
user_dest_address=d['user_dest_address'],
miner_source_address=d.get('miner_source_address', ''),
miner_dest_address=d.get('miner_dest_address', ''),
rate=d.get('rate', ''),
source_tx_hash=d.get('source_tx_hash', ''),
source_tx_block=d.get('source_tx_block', 0),
dest_tx_hash=d.get('dest_tx_hash', ''),
dest_tx_block=d.get('dest_tx_block', 0),
status=SwapStatus(d['status']),
initiated_block=d.get('initiated_block', 0),
timeout_block=d.get('timeout_block', 0),
fulfilled_block=d.get('fulfilled_block', 0),
completed_block=d.get('completed_block', 0),
)
except (KeyError, ValueError, TypeError) as e:
bt.logging.debug(f'Failed to restore swap from cache: {e}')
return None


class ScoringWindowStore:
"""Atomic JSON persistence for the scoring window and voted-id set.

Uses write-to-tmp-then-rename for crash safety (same pattern as
SwapFulfiller._save_sent_cache).
"""

def __init__(self, path: Path):
self._path = path

def save(self, window: List[Swap], voted_ids: Set[int]) -> None:
"""Persist current window and voted set to disk."""
data: Dict = {
'window': [_swap_to_dict(s) for s in window],
'voted_ids': sorted(voted_ids),
}
try:
self._path.parent.mkdir(parents=True, exist_ok=True)
tmp = self._path.with_suffix('.tmp')
tmp.write_text(json.dumps(data))
tmp.rename(self._path)
except Exception as e:
bt.logging.warning(f'Failed to persist scoring window: {e}')

def load(self, window_blocks: int, current_block: int) -> Tuple[List[Swap], Set[int]]:
"""Restore window and voted set, pruning entries older than window_blocks.

Returns (window: List[Swap], voted_ids: Set[int]).
"""
if not self._path.exists():
return [], set()

try:
raw = json.loads(self._path.read_text())
except Exception as e:
bt.logging.warning(f'Failed to read scoring window cache: {e}')
return [], set()

window_start = current_block - window_blocks
raw_window = raw.get('window', [])
raw_voted = raw.get('voted_ids', [])

window: List[Swap] = []
for entry in raw_window:
swap = _dict_to_swap(entry)
if swap is None:
continue
if resolved_block(swap) < window_start:
continue
window.append(swap)

voted_ids: Set[int] = set()
for v in raw_voted:
if isinstance(v, int):
voted_ids.add(v)

if len(window) != len(raw_window) or len(voted_ids) != len(raw_voted):
self.save(window, voted_ids)

if window:
bt.logging.info(f'Restored {len(window)} swap(s) and {len(voted_ids)} voted ID(s) from scoring cache')

return window, voted_ids

def remove(self) -> None:
"""Delete the cache file (for tests or manual reset)."""
try:
os.remove(self._path)
except FileNotFoundError:
pass


def resolved_block(swap: Swap) -> int:
"""Block when a terminal swap was resolved."""
if swap.completed_block > 0:
return swap.completed_block
if swap.timeout_block > 0:
return swap.timeout_block
return swap.initiated_block
41 changes: 38 additions & 3 deletions allways/validator/swap_tracker.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
"""Incremental swap lifecycle tracker. Eliminates O(N) full scans."""

import asyncio
from typing import Dict, List, Set
from typing import Dict, List, Optional, Set

import bittensor as bt

from allways.classes import Swap, SwapStatus
from allways.contract_client import AllwaysContractClient
from allways.validator.scoring_store import ScoringWindowStore

ACTIVE_STATUSES = (SwapStatus.ACTIVE, SwapStatus.FULFILLED)

Expand All @@ -19,14 +20,16 @@ class SwapTracker:
- Monitoring: re-fetch all tracked ACTIVE/FULFILLED swaps each poll

Resolved swaps are no longer stored on-chain, so cold start only recovers
active swaps. The scoring window populates naturally as swaps complete.
active swaps from chain. When a store is configured, the scoring window and
voted IDs are restored from disk before active-swap reconciliation.
"""

def __init__(
self,
client: AllwaysContractClient,
fulfillment_timeout_blocks: int,
window_blocks: int,
store: Optional[ScoringWindowStore] = None,
):
self.client = client
self.last_scanned_id = 0
Expand All @@ -36,11 +39,26 @@ def __init__(

self.fulfillment_timeout_blocks = fulfillment_timeout_blocks
self.window_blocks = window_blocks
self._store = store

def initialize(self, current_block: int):
"""Cold start — scan backward from latest swap to populate active set."""
"""Cold start — scan backward from latest swap to populate active set.

Also restores the scoring window and voted set from disk so that
scoring is continuous across restarts.
"""
if self._store:
restored_window, restored_voted = self._store.load(self.window_blocks, current_block)
self.window = restored_window
self.voted_ids = restored_voted

next_id = self.client.get_next_swap_id()
if next_id <= 1:
stale_voted = len(self.voted_ids)
if stale_voted > 0:
self.voted_ids.clear()
bt.logging.debug(f'SwapTracker init: pruned {stale_voted} stale voted IDs (no active swaps)')
self._persist()
self.last_scanned_id = 0
bt.logging.info('SwapTracker initialized: no swaps exist')
return
Expand All @@ -63,13 +81,22 @@ def initialize(self, current_block: int):
if swap.status in ACTIVE_STATUSES:
self.active[swap.id] = swap

if self.voted_ids:
before = len(self.voted_ids)
self.voted_ids.intersection_update(self.active.keys())
pruned = before - len(self.voted_ids)
if pruned > 0:
bt.logging.debug(f'SwapTracker init: pruned {pruned} stale voted IDs')

self.last_scanned_id = latest_id
self._persist()

bt.logging.info(f'SwapTracker initialized: active={len(self.active)}, last_scanned_id={self.last_scanned_id}')

def mark_voted(self, swap_id: int):
"""Mark a swap as voted on to prevent redundant vote extrinsics."""
self.voted_ids.add(swap_id)
self._persist()

def is_voted(self, swap_id: int) -> bool:
"""Check if we've already voted on this swap."""
Expand Down Expand Up @@ -130,6 +157,7 @@ async def _poll_inner(self):

if resolved_ids:
bt.logging.debug(f'SwapTracker: resolved {len(resolved_ids)}, {len(self.active)} still active')
self._persist()

def prune_window(self, current_block: int):
"""Remove resolved swaps older than the scoring window."""
Expand All @@ -139,6 +167,7 @@ def prune_window(self, current_block: int):
pruned = before - len(self.window)
if pruned > 0:
bt.logging.debug(f'SwapTracker: pruned {pruned} expired swaps from window')
self._persist()

def get_fulfilled(self, current_block: int) -> List[Swap]:
"""Active FULFILLED swaps not yet past timeout (ready for verification)."""
Expand Down Expand Up @@ -166,6 +195,11 @@ def get_timed_out(self, current_block: int) -> List[Swap]:
and current_block > s.timeout_block
]

def _persist(self) -> None:
"""Save window and voted set to disk if a store is configured."""
if self._store:
self._store.save(self.window, self.voted_ids)


def _resolved_block(swap: Swap) -> int:
"""Block when a terminal swap was resolved."""
Expand All @@ -174,3 +208,4 @@ def _resolved_block(swap: Swap) -> int:
if swap.timeout_block > 0:
return swap.timeout_block
return swap.initiated_block

5 changes: 5 additions & 0 deletions neurons/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import threading
import time
from functools import partial
from pathlib import Path

import bittensor as bt
from dotenv import load_dotenv
Expand All @@ -32,6 +33,7 @@
from allways.validator.chain_verification import SwapVerifier
from allways.validator.forward import forward
from allways.validator.pending_confirms import PendingConfirmQueue
from allways.validator.scoring_store import ScoringWindowStore
from allways.validator.swap_tracker import SwapTracker
from allways.validator.voting import SwapVoter
from neurons.base.validator import BaseValidatorNeuron
Expand Down Expand Up @@ -60,10 +62,13 @@ def __init__(self, config=None):
except Exception as e:
bt.logging.warning(f'Failed to read fee_divisor, using default {DEFAULT_FEE_DIVISOR}: {e}')
self.fee_divisor = DEFAULT_FEE_DIVISOR
scoring_cache_path = Path(self.config.neuron.full_path) / 'scoring_window.json'
self.scoring_store = ScoringWindowStore(scoring_cache_path)
self.swap_tracker = SwapTracker(
client=self.contract_client,
fulfillment_timeout_blocks=timeout_blocks,
window_blocks=SCORING_WINDOW_BLOCKS,
store=self.scoring_store,
)
self.swap_tracker.initialize(self.block)
bt.logging.debug(f'Validator components: fee_divisor={self.fee_divisor}, timeout={timeout_blocks}')
Expand Down
Loading