From ef525e10148d0bf12de92e9c0d6e2da34e17404c Mon Sep 17 00:00:00 2001 From: Alex Chen Date: Fri, 3 Apr 2026 08:14:15 +0000 Subject: [PATCH] feat: device trust management & recognition (#125) - TrustedDevice model: token, fingerprint, user_agent, IP, expiry tracking - register_device: secure random token, optional fingerprint, configurable expiry - verify_device_token: validates token, checks expiry, updates last_seen - recognize_device: auto-recognize by user_agent+IP fingerprint - revoke_device / revoke_all_devices with except_token support - get_user_devices with include_revoked option - REST API: GET/POST /devices, DELETE /devices/{id}, POST /devices/verify, POST /devices/recognize, POST /devices/revoke-all - 27 tests (21 pass, 6 skip Redis) Closes #125 --- packages/backend/app/routes/__init__.py | 2 + packages/backend/app/routes/device_trust.py | 104 ++++++ packages/backend/app/services/device_trust.py | 187 +++++++++++ packages/backend/tests/test_device_trust.py | 296 ++++++++++++++++++ 4 files changed, 589 insertions(+) create mode 100644 packages/backend/app/routes/device_trust.py create mode 100644 packages/backend/app/services/device_trust.py create mode 100644 packages/backend/tests/test_device_trust.py diff --git a/packages/backend/app/routes/__init__.py b/packages/backend/app/routes/__init__.py index f13b0f89..52b65492 100644 --- a/packages/backend/app/routes/__init__.py +++ b/packages/backend/app/routes/__init__.py @@ -7,6 +7,7 @@ from .categories import bp as categories_bp from .docs import bp as docs_bp from .dashboard import bp as dashboard_bp +from .device_trust import bp as device_trust_bp def register_routes(app: Flask): @@ -18,3 +19,4 @@ def register_routes(app: Flask): app.register_blueprint(categories_bp, url_prefix="/categories") app.register_blueprint(docs_bp, url_prefix="/docs") app.register_blueprint(dashboard_bp, url_prefix="/dashboard") + app.register_blueprint(device_trust_bp, url_prefix="") diff --git a/packages/backend/app/routes/device_trust.py b/packages/backend/app/routes/device_trust.py new file mode 100644 index 00000000..318878de --- /dev/null +++ b/packages/backend/app/routes/device_trust.py @@ -0,0 +1,104 @@ +"""Routes for Device Trust Management (issue #125).""" +from flask import Blueprint, request, jsonify +from flask_jwt_extended import jwt_required, get_jwt_identity +from app.services.device_trust import ( + register_device, + verify_device_token, + revoke_device, + revoke_all_devices, + get_user_devices, + recognize_device, +) + +bp = Blueprint("device_trust", __name__) + + +@bp.route("/devices", methods=["GET"]) +@jwt_required() +def list_devices(): + """List trusted devices for the current user.""" + user_id = int(get_jwt_identity()) + include_revoked = request.args.get("include_revoked", "false").lower() == "true" + devices = get_user_devices(user_id, include_revoked=include_revoked) + return jsonify({"devices": devices, "count": len(devices)}), 200 + + +@bp.route("/devices", methods=["POST"]) +@jwt_required() +def add_device(): + """Register a new trusted device.""" + user_id = int(get_jwt_identity()) + data = request.get_json() or {} + + device_name = (data.get("device_name") or "").strip() + if not device_name: + return jsonify({"error": "device_name is required"}), 400 + + trust_days = data.get("trust_days", 30) + try: + trust_days = int(trust_days) if trust_days is not None else None + except (ValueError, TypeError): + return jsonify({"error": "trust_days must be an integer or null"}), 400 + + user_agent = request.headers.get("User-Agent") or data.get("user_agent") + ip_address = request.remote_addr or data.get("ip_address") + + device = register_device( + user_id=user_id, + device_name=device_name, + user_agent=user_agent, + ip_address=ip_address, + trust_days=trust_days, + ) + return jsonify(device.to_dict()), 201 + + +@bp.route("/devices/", methods=["DELETE"]) +@jwt_required() +def revoke_device_route(device_id: int): + """Revoke trust for a specific device.""" + user_id = int(get_jwt_identity()) + device = revoke_device(device_id=device_id, user_id=user_id) + if not device: + return jsonify({"error": "Device not found"}), 404 + return jsonify({"message": "Device trust revoked", "device": device.to_dict()}), 200 + + +@bp.route("/devices/revoke-all", methods=["POST"]) +@jwt_required() +def revoke_all_route(): + """Revoke all trusted devices (except current if token provided).""" + user_id = int(get_jwt_identity()) + data = request.get_json() or {} + except_token = data.get("except_token") + count = revoke_all_devices(user_id=user_id, except_token=except_token) + return jsonify({"revoked_count": count, "message": f"Revoked {count} device(s)"}), 200 + + +@bp.route("/devices/verify", methods=["POST"]) +@jwt_required() +def verify_device(): + """Verify a device token and mark device as seen.""" + user_id = int(get_jwt_identity()) + data = request.get_json() or {} + token = data.get("device_token", "") + if not token: + return jsonify({"error": "device_token is required"}), 400 + device = verify_device_token(token=token, user_id=user_id) + if not device: + return jsonify({"trusted": False, "message": "Device not recognized or expired"}), 200 + return jsonify({"trusted": True, "device": device.to_dict()}), 200 + + +@bp.route("/devices/recognize", methods=["POST"]) +@jwt_required() +def recognize_device_route(): + """Auto-recognize a device by fingerprint (user-agent + IP).""" + user_id = int(get_jwt_identity()) + data = request.get_json() or {} + user_agent = request.headers.get("User-Agent") or data.get("user_agent") + ip_address = request.remote_addr or data.get("ip_address") + device = recognize_device(user_id=user_id, user_agent=user_agent, ip_address=ip_address) + if not device: + return jsonify({"recognized": False}), 200 + return jsonify({"recognized": True, "device": device.to_dict()}), 200 \ No newline at end of file diff --git a/packages/backend/app/services/device_trust.py b/packages/backend/app/services/device_trust.py new file mode 100644 index 00000000..6da86a4f --- /dev/null +++ b/packages/backend/app/services/device_trust.py @@ -0,0 +1,187 @@ +""" +Device Trust Management & Recognition (issue #125) + +Allow users to view and manage trusted devices. +Trusted devices skip additional auth challenges. +""" +from __future__ import annotations + +import hashlib +import secrets +import uuid +from datetime import datetime, timedelta +from typing import Optional +from app.extensions import db + + +class TrustedDevice(db.Model): + """A device trusted by a user for authentication.""" + + __tablename__ = "trusted_devices" + + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False) + device_token = db.Column(db.String(64), unique=True, nullable=False) + device_name = db.Column(db.String(200), nullable=False) + device_fingerprint = db.Column(db.String(64), nullable=True) + user_agent = db.Column(db.String(500), nullable=True) + ip_address = db.Column(db.String(45), nullable=True) + is_trusted = db.Column(db.Boolean, default=True, nullable=False) + last_seen_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + trusted_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + expires_at = db.Column(db.DateTime, nullable=True) + created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + + def to_dict(self) -> dict: + return { + "id": self.id, + "user_id": self.user_id, + "device_name": self.device_name, + "device_token": self.device_token, + "device_fingerprint": self.device_fingerprint, + "user_agent": self.user_agent, + "ip_address": self.ip_address, + "is_trusted": self.is_trusted, + "last_seen_at": self.last_seen_at.isoformat() if self.last_seen_at else None, + "trusted_at": self.trusted_at.isoformat() if self.trusted_at else None, + "expires_at": self.expires_at.isoformat() if self.expires_at else None, + "created_at": self.created_at.isoformat() if self.created_at else None, + "is_expired": self._is_expired(), + } + + def _is_expired(self) -> bool: + if self.expires_at is None: + return False + return datetime.utcnow() > self.expires_at + + +def _generate_device_token() -> str: + """Generate a cryptographically secure device token.""" + return secrets.token_hex(32) # 64-char hex string + + +def _fingerprint_device(user_agent: str, ip_address: str) -> str: + """Create a deterministic fingerprint from device characteristics.""" + raw = f"{user_agent}|{ip_address}" + return hashlib.sha256(raw.encode()).hexdigest()[:32] + + +def register_device( + user_id: int, + device_name: str, + user_agent: Optional[str] = None, + ip_address: Optional[str] = None, + trust_days: Optional[int] = 30, +) -> TrustedDevice: + """Register a new trusted device for a user.""" + device_token = _generate_device_token() + fingerprint = None + if user_agent and ip_address: + fingerprint = _fingerprint_device(user_agent, ip_address) + + expires_at = None + if trust_days is not None: + expires_at = datetime.utcnow() + timedelta(days=trust_days) + + device = TrustedDevice( + user_id=user_id, + device_token=device_token, + device_name=device_name, + device_fingerprint=fingerprint, + user_agent=user_agent, + ip_address=ip_address, + is_trusted=True, + expires_at=expires_at, + ) + db.session.add(device) + db.session.commit() + return device + + +def verify_device_token(token: str, user_id: Optional[int] = None) -> Optional[TrustedDevice]: + """ + Verify a device token. + Returns the TrustedDevice if valid, trusted, and not expired. None otherwise. + """ + query = TrustedDevice.query.filter_by(device_token=token, is_trusted=True) + if user_id is not None: + query = query.filter_by(user_id=user_id) + device = query.first() + + if not device: + return None + + if device._is_expired(): + return None + + # Update last seen + device.last_seen_at = datetime.utcnow() + db.session.commit() + return device + + +def revoke_device(device_id: int, user_id: int) -> Optional[TrustedDevice]: + """ + Revoke trust for a specific device. + Returns the device if revoked, None if not found or not owned by user. + """ + device = TrustedDevice.query.filter_by(id=device_id, user_id=user_id).first() + if not device: + return None + device.is_trusted = False + db.session.commit() + return device + + +def revoke_all_devices(user_id: int, except_token: Optional[str] = None) -> int: + """ + Revoke all trusted devices for a user. + If except_token is given, that device is kept. + Returns count of revoked devices. + """ + query = TrustedDevice.query.filter_by(user_id=user_id, is_trusted=True) + if except_token: + query = query.filter(TrustedDevice.device_token != except_token) + devices = query.all() + for d in devices: + d.is_trusted = False + db.session.commit() + return len(devices) + + +def get_user_devices(user_id: int, include_revoked: bool = False) -> list[dict]: + """Get all devices for a user.""" + query = TrustedDevice.query.filter_by(user_id=user_id) + if not include_revoked: + query = query.filter_by(is_trusted=True) + devices = query.order_by(TrustedDevice.last_seen_at.desc()).all() + return [d.to_dict() for d in devices] + + +def recognize_device( + user_id: int, + user_agent: Optional[str] = None, + ip_address: Optional[str] = None, +) -> Optional[TrustedDevice]: + """ + Auto-recognize a device by fingerprint without requiring a token. + Returns a trusted, non-expired device if fingerprint matches. + """ + if not user_agent or not ip_address: + return None + + fingerprint = _fingerprint_device(user_agent, ip_address) + device = TrustedDevice.query.filter_by( + user_id=user_id, + device_fingerprint=fingerprint, + is_trusted=True, + ).first() + + if device and device._is_expired(): + return None + + if device: + device.last_seen_at = datetime.utcnow() + db.session.commit() + + return device \ No newline at end of file diff --git a/packages/backend/tests/test_device_trust.py b/packages/backend/tests/test_device_trust.py new file mode 100644 index 00000000..adf1211b --- /dev/null +++ b/packages/backend/tests/test_device_trust.py @@ -0,0 +1,296 @@ +"""Tests for Device Trust Management & Recognition (issue #125).""" +import pytest +from datetime import datetime, timedelta, date +from werkzeug.security import generate_password_hash +from app.services.device_trust import ( + register_device, + verify_device_token, + revoke_device, + revoke_all_devices, + get_user_devices, + recognize_device, + TrustedDevice, + _generate_device_token, + _fingerprint_device, +) +from app.models import User +from app.extensions import db + +try: + import redis as _redis_lib + _r = _redis_lib.Redis.from_url("redis://localhost:6379/15") + _r.ping() + _redis_available = True +except Exception: + _redis_available = False + +requires_redis = pytest.mark.skipif( + not _redis_available, reason="Redis not available" +) + + +def _make_user(email="device@test.com"): + user = User( + email=email, + password_hash=generate_password_hash("pass"), + preferred_currency="USD", + ) + db.session.add(user) + db.session.flush() + return user + + +# ----------------------------------------------------------------------- +# Unit tests +# ----------------------------------------------------------------------- + +class TestHelpers: + def test_generate_device_token_unique(self): + t1 = _generate_device_token() + t2 = _generate_device_token() + assert t1 != t2 + assert len(t1) == 64 + + def test_fingerprint_deterministic(self): + f1 = _fingerprint_device("Mozilla/5.0", "192.168.1.1") + f2 = _fingerprint_device("Mozilla/5.0", "192.168.1.1") + assert f1 == f2 + assert len(f1) == 32 + + def test_fingerprint_different_ua(self): + f1 = _fingerprint_device("Chrome", "192.168.1.1") + f2 = _fingerprint_device("Firefox", "192.168.1.1") + assert f1 != f2 + + def test_fingerprint_different_ip(self): + f1 = _fingerprint_device("Chrome", "1.1.1.1") + f2 = _fingerprint_device("Chrome", "2.2.2.2") + assert f1 != f2 + + +class TestTrustedDeviceModel: + def test_not_expired_when_no_expiry(self): + device = TrustedDevice( + user_id=1, + device_token="abc", + device_name="Test", + is_trusted=True, + ) + assert device._is_expired() is False + + def test_expired_with_past_expiry(self): + device = TrustedDevice( + user_id=1, + device_token="abc", + device_name="Test", + is_trusted=True, + expires_at=datetime.utcnow() - timedelta(days=1), + ) + assert device._is_expired() is True + + def test_not_expired_with_future_expiry(self): + device = TrustedDevice( + user_id=1, + device_token="abc", + device_name="Test", + is_trusted=True, + expires_at=datetime.utcnow() + timedelta(days=30), + ) + assert device._is_expired() is False + + +# ----------------------------------------------------------------------- +# Integration tests +# ----------------------------------------------------------------------- + +class TestRegisterDevice: + def test_register_creates_device(self, app_fixture): + with app_fixture.app_context(): + user = _make_user("reg@test.com") + db.session.commit() + device = register_device(user.id, "My Laptop") + assert device.id is not None + assert device.user_id == user.id + assert device.is_trusted is True + assert len(device.device_token) == 64 + + def test_register_with_expiry(self, app_fixture): + with app_fixture.app_context(): + user = _make_user("expiry@test.com") + db.session.commit() + device = register_device(user.id, "Temp Device", trust_days=7) + assert device.expires_at is not None + delta = device.expires_at - datetime.utcnow() + assert 6 <= delta.days <= 7 + + def test_register_no_expiry(self, app_fixture): + with app_fixture.app_context(): + user = _make_user("noexp@test.com") + db.session.commit() + device = register_device(user.id, "Permanent", trust_days=None) + assert device.expires_at is None + + def test_register_with_fingerprint(self, app_fixture): + with app_fixture.app_context(): + user = _make_user("fp@test.com") + db.session.commit() + device = register_device( + user.id, "Browser", + user_agent="Mozilla/5.0", + ip_address="10.0.0.1", + ) + assert device.device_fingerprint is not None + + +class TestVerifyDevice: + def test_verify_valid_token(self, app_fixture): + with app_fixture.app_context(): + user = _make_user("verify@test.com") + db.session.commit() + device = register_device(user.id, "Phone") + result = verify_device_token(device.device_token, user_id=user.id) + assert result is not None + assert result.id == device.id + + def test_verify_wrong_token(self, app_fixture): + with app_fixture.app_context(): + result = verify_device_token("invalidtoken") + assert result is None + + def test_verify_expired_device(self, app_fixture): + with app_fixture.app_context(): + user = _make_user("expdev@test.com") + db.session.commit() + device = register_device(user.id, "Old Device", trust_days=0) + # Force expiry + device.expires_at = datetime.utcnow() - timedelta(seconds=1) + db.session.commit() + result = verify_device_token(device.device_token, user_id=user.id) + assert result is None + + def test_verify_wrong_user(self, app_fixture): + with app_fixture.app_context(): + user = _make_user("owner@test.com") + db.session.commit() + device = register_device(user.id, "My Device") + result = verify_device_token(device.device_token, user_id=999) + assert result is None + + +class TestRevokeDevice: + def test_revoke_device(self, app_fixture): + with app_fixture.app_context(): + user = _make_user("revoke@test.com") + db.session.commit() + device = register_device(user.id, "Old Phone") + result = revoke_device(device.id, user.id) + assert result is not None + assert result.is_trusted is False + + def test_revoke_wrong_user(self, app_fixture): + with app_fixture.app_context(): + user = _make_user("rev2@test.com") + db.session.commit() + device = register_device(user.id, "Laptop") + result = revoke_device(device.id, 999) + assert result is None + + def test_revoke_all(self, app_fixture): + with app_fixture.app_context(): + user = _make_user("revokeall@test.com") + db.session.commit() + d1 = register_device(user.id, "Device A") + d2 = register_device(user.id, "Device B") + d3 = register_device(user.id, "Device C") + count = revoke_all_devices(user.id) + assert count == 3 + devices = get_user_devices(user.id) + assert len(devices) == 0 + + def test_revoke_all_except_current(self, app_fixture): + with app_fixture.app_context(): + user = _make_user("keepone@test.com") + db.session.commit() + d1 = register_device(user.id, "Current") + d2 = register_device(user.id, "Other") + count = revoke_all_devices(user.id, except_token=d1.device_token) + assert count == 1 + devices = get_user_devices(user.id) + assert len(devices) == 1 + assert devices[0]["device_name"] == "Current" + + +class TestRecognizeDevice: + def test_recognize_by_fingerprint(self, app_fixture): + with app_fixture.app_context(): + user = _make_user("recog@test.com") + db.session.commit() + device = register_device( + user.id, "Home PC", + user_agent="TestAgent/1.0", + ip_address="192.168.0.1", + ) + found = recognize_device( + user.id, + user_agent="TestAgent/1.0", + ip_address="192.168.0.1", + ) + assert found is not None + assert found.id == device.id + + def test_recognize_wrong_fingerprint(self, app_fixture): + with app_fixture.app_context(): + user = _make_user("recog2@test.com") + db.session.commit() + register_device(user.id, "Home", user_agent="Chrome", ip_address="1.1.1.1") + found = recognize_device(user.id, user_agent="Firefox", ip_address="2.2.2.2") + assert found is None + + +# ----------------------------------------------------------------------- +# API tests (require Redis) +# ----------------------------------------------------------------------- + +@requires_redis +class TestDeviceTrustAPI: + def test_list_devices_empty(self, client, auth_header): + resp = client.get("/devices", headers=auth_header) + assert resp.status_code == 200 + data = resp.get_json() + assert "devices" in data + + def test_add_device(self, client, auth_header): + resp = client.post("/devices", json={"device_name": "My Laptop"}, headers=auth_header) + assert resp.status_code == 201 + data = resp.get_json() + assert data["device_name"] == "My Laptop" + assert data["is_trusted"] is True + assert "device_token" in data + + def test_add_device_no_name(self, client, auth_header): + resp = client.post("/devices", json={}, headers=auth_header) + assert resp.status_code == 400 + + def test_verify_device_token(self, client, auth_header): + create_resp = client.post("/devices", json={"device_name": "Phone"}, headers=auth_header) + token = create_resp.get_json()["device_token"] + resp = client.post("/devices/verify", json={"device_token": token}, headers=auth_header) + assert resp.status_code == 200 + data = resp.get_json() + assert data["trusted"] is True + + def test_revoke_device(self, client, auth_header): + create_resp = client.post("/devices", json={"device_name": "Tablet"}, headers=auth_header) + device_id = create_resp.get_json()["id"] + resp = client.delete(f"/devices/{device_id}", headers=auth_header) + assert resp.status_code == 200 + data = resp.get_json() + assert data["device"]["is_trusted"] is False + + def test_revoke_all(self, client, auth_header): + client.post("/devices", json={"device_name": "D1"}, headers=auth_header) + client.post("/devices", json={"device_name": "D2"}, headers=auth_header) + resp = client.post("/devices/revoke-all", json={}, headers=auth_header) + assert resp.status_code == 200 + data = resp.get_json() + assert data["revoked_count"] >= 2 \ No newline at end of file