Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion sauron/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ description = "A Bitcoin backend plugin relying on Esplora"
readme = "README.md"
requires-python = ">=3.9.2"

dependencies = ["pyln-client>=24.11", "requests[socks]>=2.23.0"]
dependencies = [
"pyln-client>=24.11",
"requests[socks]>=2.23.0",
"portalocker>=3.2,<4",
]

[dependency-groups]
dev = [
Expand Down
43 changes: 43 additions & 0 deletions sauron/ratelimit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import time
import json
import os
import portalocker


class GlobalRateLimiter:
def __init__(
self,
rate_per_second,
state_file="/tmp/sauron_api_rate.state",
max_wait_seconds=10,
):
self.interval = 1.0 / rate_per_second
self.state_file = state_file
self.max_wait = max_wait_seconds

if not os.path.exists(self.state_file):
with open(self.state_file, "w") as f:
json.dump({"next_ts": 0.0}, f)

def acquire(self):
start = time.time()

while True:
if time.time() - start > self.max_wait:
raise TimeoutError("Rate limiter wait exceeded")

with portalocker.Lock(self.state_file, timeout=10):
with open(self.state_file, "r+") as f:
state = json.load(f)
now = time.time()

if state["next_ts"] <= now:
state["next_ts"] = now + self.interval
f.seek(0)
json.dump(state, f)
f.truncate()
return

wait = state["next_ts"] - now

time.sleep(wait)
130 changes: 104 additions & 26 deletions sauron/sauron.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,25 @@
# dependencies = [
# "pyln-client>=24.11",
# "requests[socks]>=2.23.0",
# "portalocker>=3.2,<4",
# ]
# ///

import requests
import sys
import time

from requests.packages.urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
import os
import base64

from art import sauron_eye
from requests.packages.urllib3.util.retry import Retry
from pyln.client import Plugin
import portalocker

from ratelimit import GlobalRateLimiter
from shared_cache import SharedRequestCache


plugin = Plugin(dynamic=False)
Expand All @@ -27,25 +35,95 @@ class SauronError(Exception):
pass


def fetch(url):
rate_limiter = GlobalRateLimiter(rate_per_second=1, max_wait_seconds=30)
cache = SharedRequestCache(ttl_seconds=60)


def fetch(plugin, url):
"""Fetch this {url}, maybe through a pre-defined proxy."""

# FIXME: Maybe try to be smart and renew circuit to broadcast different
# transactions ? Hint: lightningd will agressively send us the same
# transaction a certain amount of times.
session = requests.session()
session.proxies = plugin.sauron_socks_proxies
retry_strategy = Retry(
backoff_factor=1,
total=10,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS"],
)
adapter = HTTPAdapter(max_retries=retry_strategy)

session.mount("https://", adapter)
session.mount("http://", adapter)

return session.get(url)
plugin.log(f"Making cache key for {url}", level="debug")
key = cache.make_key(url, body="fetch")
lock_file = f"/tmp/fetch_lock_{key}.lock"

# Fast path
plugin.log(f"Checking cache for {url}", level="debug")
cached = cache.get(key)
if cached:
plugin.log(f"Cache hit for {url}", level="debug")
resp = requests.Response()
resp.status_code = cached["status"]
resp._content = base64.b64decode(cached["content_b64"])
resp.headers = cached["headers"]
return resp

# Lock per URL
os.makedirs("/tmp", exist_ok=True)

max_retries = 10

for attempt in range(max_retries + 1):
try:
plugin.log(f"Getting fetch lock for {url}", level="debug")
with portalocker.Lock(lock_file, timeout=20):
# Inside lock, re-check cache
plugin.log(f"Re-checking cache for {url}", level="debug")
cached = cache.get(key)
if cached:
plugin.log(f"Cache hit for {url}", level="debug")
resp = requests.Response()
resp.status_code = cached["status"]
resp._content = base64.b64decode(cached["content_b64"])
resp.headers = cached["headers"]
return resp

plugin.log("Waiting for rate limit", level="debug")
rate_limiter.acquire()
plugin.log("Rate limit acquired", level="debug")

start = time.time()
plugin.log(f"Opening URL: {url}", level="debug")

session = requests.session()
session.proxies = plugin.sauron_socks_proxies
retry_strategy = Retry(
backoff_factor=1,
total=10,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS"],
)
adapter = HTTPAdapter(max_retries=retry_strategy)

session.mount("https://", adapter)
session.mount("http://", adapter)

resp = session.get(url, timeout=(5, 10))

elapsed = time.time() - start
plugin.log(f"Request took {elapsed:.3f}s", level="debug")

cache.set(
key,
{
"status": resp.status_code,
"headers": dict(resp.headers),
"content_b64": base64.b64encode(resp.content).decode("ascii"),
},
)

return resp

except portalocker.exceptions.LockException:
plugin.log(f"Timeout waiting for request lock for {url}")
time.sleep(0.5)
continue

except Exception as e:
plugin.log(f"Failed: {e}", level="error")
raise


@plugin.init()
Expand Down Expand Up @@ -80,7 +158,7 @@ def getchaininfo(plugin, **kwargs):
"00000008819873e925422c1ff0f99f7cc9bbb232af63a077a480a3633bee1ef6": "signet",
}

genesis_req = fetch(blockhash_url)
genesis_req = fetch(plugin, blockhash_url)
if not genesis_req.status_code == 200:
raise SauronError(
"Endpoint at {} returned {} ({}) when trying to "
Expand All @@ -89,10 +167,10 @@ def getchaininfo(plugin, **kwargs):
)
)

blockcount_req = fetch(blockcount_url)
blockcount_req = fetch(plugin, blockcount_url)
if not blockcount_req.status_code == 200:
raise SauronError(
"Endpoint at {} returned {} ({}) when trying to " "get blockcount.".format(
"Endpoint at {} returned {} ({}) when trying to get blockcount.".format(
blockcount_url, blockcount_req.status_code, blockcount_req.text
)
)
Expand All @@ -113,7 +191,7 @@ def getchaininfo(plugin, **kwargs):
@plugin.method("getrawblockbyheight")
def getrawblock(plugin, height, **kwargs):
blockhash_url = "{}/block-height/{}".format(plugin.api_endpoint, height)
blockhash_req = fetch(blockhash_url)
blockhash_req = fetch(plugin, blockhash_url)
if blockhash_req.status_code != 200:
return {
"blockhash": None,
Expand All @@ -122,7 +200,7 @@ def getrawblock(plugin, height, **kwargs):

block_url = "{}/block/{}/raw".format(plugin.api_endpoint, blockhash_req.text)
while True:
block_req = fetch(block_url)
block_req = fetch(plugin, block_url)
if block_req.status_code != 200:
return {
"blockhash": None,
Expand Down Expand Up @@ -168,17 +246,17 @@ def getutxout(plugin, txid, vout, **kwargs):
gettx_url = "{}/tx/{}".format(plugin.api_endpoint, txid)
status_url = "{}/tx/{}/outspend/{}".format(plugin.api_endpoint, txid, vout)

gettx_req = fetch(gettx_url)
gettx_req = fetch(plugin, gettx_url)
if not gettx_req.status_code == 200:
raise SauronError(
"Endpoint at {} returned {} ({}) when trying to " "get transaction.".format(
"Endpoint at {} returned {} ({}) when trying to get transaction.".format(
gettx_url, gettx_req.status_code, gettx_req.text
)
)
status_req = fetch(status_url)
status_req = fetch(plugin, status_url)
if not status_req.status_code == 200:
raise SauronError(
"Endpoint at {} returned {} ({}) when trying to " "get utxo status.".format(
"Endpoint at {} returned {} ({}) when trying to get utxo status.".format(
status_url, status_req.status_code, status_req.text
)
)
Expand All @@ -200,7 +278,7 @@ def getutxout(plugin, txid, vout, **kwargs):
def estimatefees(plugin, **kwargs):
feerate_url = "{}/fee-estimates".format(plugin.api_endpoint)

feerate_req = fetch(feerate_url)
feerate_req = fetch(plugin, feerate_url)
assert feerate_req.status_code == 200
feerates = feerate_req.json()
if plugin.sauron_network in ["test", "signet"]:
Expand Down
85 changes: 85 additions & 0 deletions sauron/shared_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import time
import os
import json
import hashlib
import portalocker
import threading


class SharedRequestCache:
def __init__(self, cache_dir="/tmp/sauron_api_cache", ttl_seconds=10):
self.cache_dir = cache_dir
self.ttl = ttl_seconds
os.makedirs(cache_dir, exist_ok=True)

self.cleanup_expired_cache()

t = threading.Thread(
target=self._periodic_cleanup, args=(ttl_seconds * 2,), daemon=True
)
t.start()

def _path(self, key):
return os.path.join(self.cache_dir, f"{key}.json")

def make_key(self, url, body=None):
h = hashlib.sha256()
h.update(url.encode())
if body:
h.update(repr(body).encode())
return h.hexdigest()

def get(self, key):
path = self._path(key)
if not os.path.exists(path):
return None

try:
with portalocker.Lock(path, timeout=1):
with open(path) as f:
entry = json.load(f)

if time.time() - entry["ts"] > self.ttl:
os.remove(path)
return None

return entry["value"]

except Exception:
return None

def set(self, key, value):
path = self._path(key)
tmp = path + ".tmp"

with portalocker.Lock(tmp, timeout=5):
with open(tmp, "w") as f:
json.dump({"ts": time.time(), "value": value}, f)

os.replace(tmp, path)

def cleanup_expired_cache(self):
"""Remove all expired cache files safely across multiple processes."""
now = time.time()
for filename in os.listdir(self.cache_dir):
if not filename.endswith(".json"):
continue
path = os.path.join(self.cache_dir, filename)
try:
with portalocker.Lock(path, timeout=0.1):
with open(path) as f:
entry = json.load(f)
if now - entry.get("ts", 0) > self.ttl:
os.remove(path)
except Exception:
# Ignore locked files, missing files, or malformed files
continue

def _periodic_cleanup(self, interval):
"""Run cleanup in the background at regular intervals."""
while True:
try:
self.cleanup_expired_cache()
except Exception:
pass # ignore errors
time.sleep(interval)
4 changes: 2 additions & 2 deletions sauron/tests/test_sauron_esplora_bitcoin.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

pyln.testing.fixtures.network_daemons["bitcoin"] = utils.BitcoinD


class LightningNode(utils.LightningNode):
def __init__(self, *args, **kwargs):
pyln.testing.utils.TEST_NETWORK = "bitcoin"
Expand Down Expand Up @@ -74,6 +75,7 @@ def test_esplora_bitcoin_getrawblockbyheight(node_factory):
}
assert response == expected_response


@pytest.mark.skip(reason="testing_theory")
def test_esplora_bitcoin_sendrawtransaction_invalid(node_factory):
"""
Expand Down Expand Up @@ -141,8 +143,6 @@ def test_esplora_bitcoin_estimatefees(node_factory):
# }
response = ln_node.rpc.call("estimatefees")



expected_response_keys = [
"opening",
"mutual_close",
Expand Down
Loading
Loading