From 8c048b72391e3c268b7d3adae5261c6c778491f4 Mon Sep 17 00:00:00 2001 From: owen-eth Date: Mon, 11 May 2026 12:50:56 -0400 Subject: [PATCH 1/2] feat: fastswap miles sweep redesign foundation --- tools/fastswap-miles/chainlink_oracle.go | 316 +++++++++++++ .../chainlink_oracle_live_test.go | 284 +++++++++++ tools/fastswap-miles/chainlink_oracle_test.go | 131 +++++ tools/fastswap-miles/cost_estimator.go | 180 +++++++ tools/fastswap-miles/cost_estimator_test.go | 93 ++++ tools/fastswap-miles/gas_buffer.go | 200 ++++++++ tools/fastswap-miles/gas_buffer_test.go | 127 +++++ tools/fastswap-miles/miles.go | 9 + tools/fastswap-miles/reconciliation.go | 210 +++++++++ tools/fastswap-miles/reconciliation_test.go | 48 ++ .../sweep_clock_persist_test.go | 168 +++++++ tools/fastswap-miles/sweep_loop.go | 446 ++++++++++++++++++ tools/fastswap-miles/sweep_scheduler.go | 251 ++++++++++ tools/fastswap-miles/sweep_scheduler_test.go | 205 ++++++++ tools/fastswap-miles/tiers.go | 141 ++++-- tools/fastswap-miles/tiers_test.go | 108 ++++- 16 files changed, 2860 insertions(+), 57 deletions(-) create mode 100644 tools/fastswap-miles/chainlink_oracle.go create mode 100644 tools/fastswap-miles/chainlink_oracle_live_test.go create mode 100644 tools/fastswap-miles/chainlink_oracle_test.go create mode 100644 tools/fastswap-miles/cost_estimator.go create mode 100644 tools/fastswap-miles/cost_estimator_test.go create mode 100644 tools/fastswap-miles/gas_buffer.go create mode 100644 tools/fastswap-miles/gas_buffer_test.go create mode 100644 tools/fastswap-miles/reconciliation.go create mode 100644 tools/fastswap-miles/reconciliation_test.go create mode 100644 tools/fastswap-miles/sweep_clock_persist_test.go create mode 100644 tools/fastswap-miles/sweep_loop.go create mode 100644 tools/fastswap-miles/sweep_scheduler.go create mode 100644 tools/fastswap-miles/sweep_scheduler_test.go diff --git a/tools/fastswap-miles/chainlink_oracle.go b/tools/fastswap-miles/chainlink_oracle.go new file mode 100644 index 000000000..fbfb2036e --- /dev/null +++ b/tools/fastswap-miles/chainlink_oracle.go @@ -0,0 +1,316 @@ +package main + +import ( + "context" + "fmt" + "log/slog" + "math/big" + "strings" + "sync" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" +) + +// chainlinkFeedRegistryAddr is the Chainlink Feed Registry on Ethereum +// mainnet. A single contract that maps (base, quote) → underlying feed and +// exposes latestRoundData / decimals directly. New pairs were frozen years +// ago, so some legitimate tokens (PEPE, ARB, possibly others) revert; the +// caller routes those rows to sweep-time pricing instead. +const chainlinkFeedRegistryAddr = "0x47Fb2585D2C56Fe188D0E6ec628a38b74fCeeeDf" + +// chainlinkEthDenomination is the sentinel address Chainlink uses to +// represent ETH on the quote side of a (base, quote) pair. +const chainlinkEthDenomination = "0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE" + +// priceOracleCacheTTL bounds how long a fetched Chainlink rate is reused. +// Major Chainlink feeds heartbeat hourly and update on price-deviation +// thresholds; 5 min is comfortably tighter than the heartbeat and well +// within miles-grade precision. +const priceOracleCacheTTL = 5 * time.Minute + +const chainlinkFeedRegistryABI = `[ + {"inputs":[{"name":"base","type":"address"},{"name":"quote","type":"address"}], + "name":"latestRoundData", + "outputs":[{"name":"roundId","type":"uint80"},{"name":"answer","type":"int256"},{"name":"startedAt","type":"uint256"},{"name":"updatedAt","type":"uint256"},{"name":"answeredInRound","type":"uint80"}], + "stateMutability":"view","type":"function"}, + {"inputs":[{"name":"base","type":"address"},{"name":"quote","type":"address"}], + "name":"decimals", + "outputs":[{"name":"","type":"uint8"}], + "stateMutability":"view","type":"function"} +]` + +const erc20DecimalsABI = `[ + {"inputs":[],"name":"decimals","outputs":[{"name":"","type":"uint8"}],"stateMutability":"view","type":"function"} +]` + +// priceOracle resolves the ETH-wei value of a fastswap surplus at miles-award +// time. Three sources, picked per swap shape: +// +// - Output is ETH/WETH: handled upstream (surplus is already ETH wei). +// - ETH/WETH input + whitelisted ERC20 output: event-derived from the +// trade's executed rate (most accurate possible — it IS the realized +// rate of this exact swap). +// - ERC20 input + whitelisted ERC20 output: Chainlink Feed Registry. Reads +// are out-of-band so flash-loan manipulation doesn't apply; the oracle +// network itself is not pool-manipulable. +// +// Non-whitelisted output tokens are always deferred to sweep time. This is +// the structural defense against the attacker-token attack: a malicious +// actor who mints their own token and controls its on-chain liquidity +// cannot extract upfront miles, because their token isn't in tokenConfigs. +type priceOracle struct { + client *ethclient.Client + logger *slog.Logger + weth common.Address + + registryAddr common.Address + registryABI abi.ABI + erc20ABI abi.ABI + + mu sync.RWMutex + rateCache map[common.Address]rateEntry + decimalsCache map[common.Address]uint8 +} + +type rateEntry struct { + answer *big.Int + feedDecimals uint8 + fetchedAt time.Time +} + +func newPriceOracle(client *ethclient.Client, weth common.Address, logger *slog.Logger) (*priceOracle, error) { + regABI, err := abi.JSON(strings.NewReader(chainlinkFeedRegistryABI)) + if err != nil { + return nil, fmt.Errorf("parse chainlink registry ABI: %w", err) + } + ercABI, err := abi.JSON(strings.NewReader(erc20DecimalsABI)) + if err != nil { + return nil, fmt.Errorf("parse erc20 decimals ABI: %w", err) + } + return &priceOracle{ + client: client, + logger: logger, + weth: weth, + registryAddr: common.HexToAddress(chainlinkFeedRegistryAddr), + registryABI: regABI, + erc20ABI: ercABI, + rateCache: map[common.Address]rateEntry{}, + decimalsCache: map[common.Address]uint8{}, + }, nil +} + +// PriceSurplusEth returns the ETH-wei value of an ERC20-output surplus. +// +// Returns (eth wei, eligible, source). When eligible is false the caller MUST +// route the row to sweep-time pro-rata pricing instead — this is the +// attacker-token defense and the no-Chainlink-feed fallback rolled into one. +// +// Source values for logging: +// +// "event_derived" ETH/WETH input + whitelisted output +// "chainlink" ERC20 input + whitelisted output, Registry hit +// "deferred:not_whitelisted" output token not in tokenConfigs +// "deferred:invalid_event" event surplus + userAmtOut sum to zero +// "deferred:no_chainlink" Registry call reverted or returned bad data +// "deferred:no_token_decim" ERC20 decimals call failed +func (o *priceOracle) PriceSurplusEth( + ctx context.Context, + inputToken, outputToken common.Address, + inputAmt, userAmtOut, surplus *big.Int, +) (*big.Int, bool, string) { + if !isWhitelisted(outputToken) { + return nil, false, "deferred:not_whitelisted" + } + + if inputToken == zeroAddr || inputToken == o.weth { + v := deriveEthInputSurplusEth(inputAmt, userAmtOut, surplus) + if v == nil { + return nil, false, "deferred:invalid_event" + } + return v, true, "event_derived" + } + + rate, feedDecimals, ok := o.getChainlinkRate(ctx, outputToken) + if !ok { + return nil, false, "deferred:no_chainlink" + } + tokenDecimals, ok := o.getTokenDecimals(ctx, outputToken) + if !ok { + return nil, false, "deferred:no_token_decim" + } + return scaleChainlinkAnswer(surplus, rate, tokenDecimals, feedDecimals), true, "chainlink" +} + +// deriveEthInputSurplusEth computes surplus_eth from event data alone when +// the input was ETH or WETH (so inputAmt is denominated in ETH wei). +// +// surplus_eth = surplus × inputAmt / (userAmtOut + surplus) +// +// This IS the trade's executed exchange rate — the contract took inputAmt +// wei from the user, Barter returned (userAmtOut + surplus) tokens. No +// external oracle can be more accurate than the trade's own realized price. +// Whitelist-gated to prevent attacker-controlled-pool manipulation. +// +// Returns nil for degenerate events where the divisor is non-positive. +func deriveEthInputSurplusEth(inputAmt, userAmtOut, surplus *big.Int) *big.Int { + if inputAmt == nil || userAmtOut == nil || surplus == nil { + return nil + } + denom := new(big.Int).Add(userAmtOut, surplus) + if denom.Sign() <= 0 { + return nil + } + result := new(big.Int).Mul(surplus, inputAmt) + result.Div(result, denom) + return result +} + +// scaleChainlinkAnswer converts a Chainlink rate into surplus_eth. +// +// surplus_eth_wei = surplus_raw × answer × 10^(18 - feed_decimals) / 10^token_decimals +// +// Combined into one divisor exponent (token_decimals + feed_decimals - 18) +// which can be negative — handled both directions. +func scaleChainlinkAnswer(surplus, answer *big.Int, tokenDecimals, feedDecimals uint8) *big.Int { + result := new(big.Int).Mul(surplus, answer) + expDiv := int(tokenDecimals) + int(feedDecimals) - 18 + switch { + case expDiv > 0: + divisor := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(expDiv)), nil) + result.Div(result, divisor) + case expDiv < 0: + multiplier := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(-expDiv)), nil) + result.Mul(result, multiplier) + } + return result +} + +func (o *priceOracle) getChainlinkRate(ctx context.Context, token common.Address) (*big.Int, uint8, bool) { + o.mu.RLock() + if entry, ok := o.rateCache[token]; ok && time.Since(entry.fetchedAt) < priceOracleCacheTTL { + o.mu.RUnlock() + return entry.answer, entry.feedDecimals, true + } + o.mu.RUnlock() + + answer, decimals, err := o.fetchChainlinkRate(ctx, token) + if err != nil { + o.logger.Warn("chainlink registry lookup failed; deferring to sweep for this token", + slog.String("token", token.Hex()), slog.Any("error", err)) + return nil, 0, false + } + + o.mu.Lock() + o.rateCache[token] = rateEntry{answer: answer, feedDecimals: decimals, fetchedAt: time.Now()} + o.mu.Unlock() + return answer, decimals, true +} + +func (o *priceOracle) fetchChainlinkRate(ctx context.Context, token common.Address) (*big.Int, uint8, error) { + ethSentinel := common.HexToAddress(chainlinkEthDenomination) + + answer, err := o.callRegistryReturningBigInt(ctx, "latestRoundData", token, ethSentinel, 1) + if err != nil { + return nil, 0, err + } + if answer.Sign() <= 0 { + return nil, 0, fmt.Errorf("non-positive chainlink answer: %s", answer.String()) + } + + decimals, err := o.callRegistryReturningUint8(ctx, "decimals", token, ethSentinel) + if err != nil { + return nil, 0, err + } + return answer, decimals, nil +} + +// callRegistryReturningBigInt invokes a registry method that returns one or +// more values, extracting the *big.Int at outputIndex. +func (o *priceOracle) callRegistryReturningBigInt( + ctx context.Context, method string, base, quote common.Address, outputIndex int, +) (*big.Int, error) { + data, err := o.registryABI.Pack(method, base, quote) + if err != nil { + return nil, fmt.Errorf("pack %s: %w", method, err) + } + raw, err := o.client.CallContract(ctx, ethereum.CallMsg{To: &o.registryAddr, Data: data}, nil) + if err != nil { + return nil, fmt.Errorf("call %s: %w", method, err) + } + out, err := o.registryABI.Unpack(method, raw) + if err != nil { + return nil, fmt.Errorf("unpack %s: %w", method, err) + } + if len(out) <= outputIndex { + return nil, fmt.Errorf("unexpected %s output length %d", method, len(out)) + } + v, ok := out[outputIndex].(*big.Int) + if !ok { + return nil, fmt.Errorf("%s output[%d] is %T, want *big.Int", method, outputIndex, out[outputIndex]) + } + return v, nil +} + +func (o *priceOracle) callRegistryReturningUint8( + ctx context.Context, method string, base, quote common.Address, +) (uint8, error) { + data, err := o.registryABI.Pack(method, base, quote) + if err != nil { + return 0, fmt.Errorf("pack %s: %w", method, err) + } + raw, err := o.client.CallContract(ctx, ethereum.CallMsg{To: &o.registryAddr, Data: data}, nil) + if err != nil { + return 0, fmt.Errorf("call %s: %w", method, err) + } + out, err := o.registryABI.Unpack(method, raw) + if err != nil { + return 0, fmt.Errorf("unpack %s: %w", method, err) + } + if len(out) < 1 { + return 0, fmt.Errorf("empty %s output", method) + } + v, ok := out[0].(uint8) + if !ok { + return 0, fmt.Errorf("%s output is %T, want uint8", method, out[0]) + } + return v, nil +} + +func (o *priceOracle) getTokenDecimals(ctx context.Context, token common.Address) (uint8, bool) { + o.mu.RLock() + if dec, ok := o.decimalsCache[token]; ok { + o.mu.RUnlock() + return dec, true + } + o.mu.RUnlock() + + data, err := o.erc20ABI.Pack("decimals") + if err != nil { + o.logger.Warn("erc20 decimals pack failed", slog.String("token", token.Hex()), slog.Any("error", err)) + return 0, false + } + raw, err := o.client.CallContract(ctx, ethereum.CallMsg{To: &token, Data: data}, nil) + if err != nil { + o.logger.Warn("erc20 decimals call failed", slog.String("token", token.Hex()), slog.Any("error", err)) + return 0, false + } + out, err := o.erc20ABI.Unpack("decimals", raw) + if err != nil || len(out) < 1 { + o.logger.Warn("erc20 decimals unpack failed", slog.String("token", token.Hex()), slog.Any("error", err)) + return 0, false + } + dec, ok := out[0].(uint8) + if !ok { + o.logger.Warn("erc20 decimals not uint8", slog.String("token", token.Hex())) + return 0, false + } + + o.mu.Lock() + o.decimalsCache[token] = dec + o.mu.Unlock() + return dec, true +} diff --git a/tools/fastswap-miles/chainlink_oracle_live_test.go b/tools/fastswap-miles/chainlink_oracle_live_test.go new file mode 100644 index 000000000..18d8accc8 --- /dev/null +++ b/tools/fastswap-miles/chainlink_oracle_live_test.go @@ -0,0 +1,284 @@ +//go:build integration + +package main + +import ( + "context" + "log/slog" + "math/big" + "os" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" +) + +// Live integration test against Ethereum mainnet. Probes the Chainlink Feed +// Registry contract for every whitelisted token, reports coverage + the +// actual surplus_eth value computed for a synthetic 1-unit surplus. +// +// Run with: +// +// MAINNET_RPC_URL=https://eth.llamarpc.com go test -tags=integration -v -run TestPriceOracle_Live ./tools/fastswap-miles/... +// +// Skipped in normal test runs because it makes real RPC calls. Build tag +// `integration` keeps it out of `go test ./...` by default. + +func TestPriceOracle_Live_ChainlinkCoverage(t *testing.T) { + rpcURL := os.Getenv("MAINNET_RPC_URL") + if rpcURL == "" { + t.Skip("MAINNET_RPC_URL not set; skipping live test") + } + + client, err := ethclient.Dial(rpcURL) + if err != nil { + t.Fatalf("dial RPC: %v", err) + } + defer client.Close() + + weth := common.HexToAddress(defaultWETH) + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})) + o, err := newPriceOracle(client, weth, logger) + if err != nil { + t.Fatalf("newPriceOracle: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + cases := []struct { + name string + token common.Address + decimals int + }{ + {"USDC", usdcAddr, 6}, + {"USDT", usdtAddr, 6}, + {"DAI", daiAddr, 18}, + {"WBTC", wbtcAddr, 8}, + {"ARB", arbAddr, 18}, + {"LINK", linkAddr, 18}, + {"COMP", compAddr, 18}, + {"UNI", uniAddr, 18}, + {"SUSHI", sushiAddr, 18}, + {"1INCH", inchAddr, 18}, + {"YFI", yfiAddr, 18}, + {"PEPE", pepeAddr, 18}, + } + + // We synthesize an ERC20-input swap to force the Chainlink path + // (event-derivation only kicks in for ETH/WETH input). The actual + // inputAmt and userAmtOut don't matter because the Chainlink branch + // only uses surplus + the token rate. + inputToken := usdtAddr + inputAmt := big.NewInt(0) + userAmtOut := big.NewInt(0) + + t.Log("=== Per-token Chainlink Feed Registry coverage ===") + covered := 0 + deferred := 0 + rateLimitedRetries := []int{} + for i, c := range cases { + // 1 token's worth of surplus in raw units: 10^decimals. + surplus := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(c.decimals)), nil) + + // Throttle to stay under Infura free-tier limits (~10 rps). + time.Sleep(300 * time.Millisecond) + + ethValue, eligible, source := o.PriceSurplusEth(ctx, inputToken, c.token, inputAmt, userAmtOut, surplus) + if !eligible { + deferred++ + t.Logf(" %-6s %s DEFERRED source=%s", + c.name, c.token.Hex(), source) + // Schedule retry — we want to differentiate "feed truly missing" + // (revert) from "rate limited" so the user sees real coverage. + rateLimitedRetries = append(rateLimitedRetries, i) + continue + } + covered++ + ethFloat := weiToEth(ethValue) + t.Logf(" %-6s %s COVERED source=%s 1 token = %.10f ETH (%s wei)", + c.name, c.token.Hex(), source, ethFloat, ethValue.String()) + } + t.Logf("=== First-pass coverage: %d covered, %d deferred ===", covered, deferred) + + if len(rateLimitedRetries) == 0 { + return + } + + t.Log("") + t.Log("=== Retry pass for deferred tokens (slower, fresh oracle to clear cached failures) ===") + // Build a fresh oracle so cached-failure state (if any) is cleared. + o2, err := newPriceOracle(client, weth, logger) + if err != nil { + t.Fatalf("newPriceOracle (retry): %v", err) + } + finalCovered := covered + finalDeferred := 0 + for _, i := range rateLimitedRetries { + c := cases[i] + surplus := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(c.decimals)), nil) + + // Bigger delay on retry to avoid rate-limit noise. + time.Sleep(2 * time.Second) + + ethValue, eligible, source := o2.PriceSurplusEth(ctx, inputToken, c.token, inputAmt, userAmtOut, surplus) + if !eligible { + finalDeferred++ + t.Logf(" %-6s %s STILL DEFERRED source=%s (likely real coverage gap)", + c.name, c.token.Hex(), source) + continue + } + finalCovered++ + ethFloat := weiToEth(ethValue) + t.Logf(" %-6s %s RECOVERED source=%s 1 token = %.10f ETH", + c.name, c.token.Hex(), source, ethFloat) + } + t.Logf("=== Final coverage: %d covered, %d deferred (out of %d) ===", + finalCovered, finalDeferred, len(cases)) +} + +// Exercises every realistic swap shape end-to-end through PriceSurplusEth +// against mainnet. Logs the surplus_eth that would be used for miles +// awarding for each case so it can be eyeballed before deploy. +func TestPriceOracle_Live_AllSwapShapes(t *testing.T) { + rpcURL := os.Getenv("MAINNET_RPC_URL") + if rpcURL == "" { + t.Skip("MAINNET_RPC_URL not set; skipping live test") + } + + client, err := ethclient.Dial(rpcURL) + if err != nil { + t.Fatalf("dial RPC: %v", err) + } + defer client.Close() + + weth := common.HexToAddress(defaultWETH) + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelWarn})) + o, _ := newPriceOracle(client, weth, logger) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + cases := []struct { + name string + inputTok common.Address + outputTok common.Address + inputAmt *big.Int + userAmtOut *big.Int + surplus *big.Int + expect string // expected source + }{ + { + name: "ETH→USDC (typical, 1% surplus)", + inputTok: zeroAddr, + outputTok: usdcAddr, + inputAmt: big.NewInt(1_000_000_000_000_000_000), // 1 ETH + userAmtOut: big.NewInt(3_000_000_000), // 3000 USDC + surplus: big.NewInt(30_000_000), // 30 USDC + expect: "event_derived", + }, + { + name: "WETH→WBTC (event-derived since WBTC has no Chainlink)", + inputTok: weth, + outputTok: wbtcAddr, + inputAmt: new(big.Int).Mul(big.NewInt(10), big.NewInt(1_000_000_000_000_000_000)), + userAmtOut: big.NewInt(40_000_000), // ~0.4 BTC at 8 decimals + surplus: big.NewInt(800_000), // ~0.008 BTC + expect: "event_derived", + }, + { + name: "USDC→USDT (Chainlink path)", + inputTok: usdcAddr, + outputTok: usdtAddr, + inputAmt: big.NewInt(3_000_000_000), // 3000 USDC + userAmtOut: big.NewInt(2_990_000_000), // 2990 USDT (typical) + surplus: big.NewInt(10_000_000), // 10 USDT + expect: "chainlink", + }, + { + name: "USDC→WBTC (defers: WBTC not in Registry)", + inputTok: usdcAddr, + outputTok: wbtcAddr, + inputAmt: big.NewInt(50_000_000_000), // 50K USDC + userAmtOut: big.NewInt(80_000_000), // 0.8 WBTC + surplus: big.NewInt(1_600_000), // 0.016 WBTC + expect: "deferred:no_chainlink", + }, + { + name: "USDC→AttackerToken (defers: not whitelisted)", + inputTok: usdcAddr, + outputTok: common.HexToAddress("0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"), + inputAmt: big.NewInt(1_000_000_000), + userAmtOut: big.NewInt(100), + surplus: big.NewInt(5), + expect: "deferred:not_whitelisted", + }, + { + name: "ETH→AttackerToken (defers: not whitelisted, defends attack vector)", + inputTok: zeroAddr, + outputTok: common.HexToAddress("0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"), + inputAmt: big.NewInt(1_000_000_000_000_000_000), + userAmtOut: big.NewInt(98), + surplus: big.NewInt(2), + expect: "deferred:not_whitelisted", + }, + } + + for _, c := range cases { + // Pacing for shared RPC. + time.Sleep(400 * time.Millisecond) + + ethValue, eligible, source := o.PriceSurplusEth( + ctx, c.inputTok, c.outputTok, c.inputAmt, c.userAmtOut, c.surplus, + ) + if source != c.expect { + t.Errorf("%s: source = %s, want %s", c.name, source, c.expect) + } + if eligible { + t.Logf(" %s -> %s, surplus_eth = %.10f ETH", + c.name, source, weiToEth(ethValue)) + } else { + t.Logf(" %s -> %s (deferred, correct)", c.name, source) + } + } +} + +// Sanity-check that ETH-leg event derivation needs no RPC at all (it's +// pure event-data math). This complements the unit test by running it +// through PriceSurplusEth with a real client wired up. +func TestPriceOracle_Live_EthLegNoRPCNeeded(t *testing.T) { + rpcURL := os.Getenv("MAINNET_RPC_URL") + if rpcURL == "" { + t.Skip("MAINNET_RPC_URL not set; skipping live test") + } + + client, err := ethclient.Dial(rpcURL) + if err != nil { + t.Fatalf("dial RPC: %v", err) + } + defer client.Close() + + weth := common.HexToAddress(defaultWETH) + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo})) + o, _ := newPriceOracle(client, weth, logger) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // 1 ETH → 3000 USDC user out + 30 USDC surplus. + // Expected surplus_eth ≈ 30 / 3030 ≈ 0.0099 ETH + inputAmt := big.NewInt(1_000_000_000_000_000_000) + userAmtOut := big.NewInt(3_000_000_000) + surplus := big.NewInt(30_000_000) + + ethValue, eligible, source := o.PriceSurplusEth(ctx, zeroAddr, usdcAddr, inputAmt, userAmtOut, surplus) + if !eligible { + t.Fatalf("ETH→USDC must be eligible; got source=%s", source) + } + if source != "event_derived" { + t.Errorf("source = %s, want event_derived", source) + } + t.Logf("ETH→USDC 1 ETH / 3000+30 USDC: surplus_eth = %.6f ETH (%s wei)", + weiToEth(ethValue), ethValue.String()) +} diff --git a/tools/fastswap-miles/chainlink_oracle_test.go b/tools/fastswap-miles/chainlink_oracle_test.go new file mode 100644 index 000000000..d5d069a0b --- /dev/null +++ b/tools/fastswap-miles/chainlink_oracle_test.go @@ -0,0 +1,131 @@ +package main + +import ( + "math/big" + "testing" +) + +func TestDeriveEthInputSurplusEth_TypicalETHtoUSDC(t *testing.T) { + // 1 ETH input → 3000 USDC out (userAmtOut) + 30 USDC surplus (1% of total). + // USDC has 6 decimals, so: + // inputAmt = 1e18 wei + // userAmtOut = 3000 × 1e6 = 3e9 + // surplus = 30 × 1e6 = 3e7 + // Expected surplus_eth = surplus × inputAmt / (userAmtOut + surplus) + // = 3e7 × 1e18 / 3.03e9 + // ≈ 9.901e15 wei (~0.0099 ETH) + inputAmt := new(big.Int).Mul(big.NewInt(1), big.NewInt(1_000_000_000_000_000_000)) + userAmtOut := big.NewInt(3_000_000_000) // 3000 USDC + surplus := big.NewInt(30_000_000) // 30 USDC + + got := deriveEthInputSurplusEth(inputAmt, userAmtOut, surplus) + + wantApprox := new(big.Int).Mul(big.NewInt(9_900_000_000), big.NewInt(1_000_000)) // 9.9e15 + tolerance := new(big.Int).Mul(big.NewInt(10_000_000), big.NewInt(1_000_000)) // 1e13 (0.001 ETH) + + diff := new(big.Int).Sub(got, wantApprox) + diff.Abs(diff) + if diff.Cmp(tolerance) > 0 { + t.Errorf("surplus_eth = %s, want ~%s ± %s", got, wantApprox, tolerance) + } +} + +func TestDeriveEthInputSurplusEth_AtTheTwoPercentSlippageCap(t *testing.T) { + // userAmtOut = 98 (uniswap × 0.98), surplus = 2 → exactly the cap shape. + // 1 ETH bought 100 token total. surplus_eth = 2/100 of inputAmt = 0.02 ETH. + inputAmt := big.NewInt(1_000_000_000_000_000_000) // 1 ETH + userAmtOut := big.NewInt(98) + surplus := big.NewInt(2) + + got := deriveEthInputSurplusEth(inputAmt, userAmtOut, surplus) + + want := big.NewInt(20_000_000_000_000_000) // 0.02 ETH + if got.Cmp(want) != 0 { + t.Errorf("surplus_eth = %s, want %s", got, want) + } +} + +func TestDeriveEthInputSurplusEth_ZeroDenominatorReturnsNil(t *testing.T) { + got := deriveEthInputSurplusEth(big.NewInt(1e18), big.NewInt(0), big.NewInt(0)) + if got != nil { + t.Errorf("expected nil for zero denominator, got %s", got) + } +} + +func TestDeriveEthInputSurplusEth_NilInputsReturnNil(t *testing.T) { + if got := deriveEthInputSurplusEth(nil, big.NewInt(1), big.NewInt(1)); got != nil { + t.Errorf("nil inputAmt: expected nil, got %s", got) + } + if got := deriveEthInputSurplusEth(big.NewInt(1), nil, big.NewInt(1)); got != nil { + t.Errorf("nil userAmtOut: expected nil, got %s", got) + } + if got := deriveEthInputSurplusEth(big.NewInt(1), big.NewInt(1), nil); got != nil { + t.Errorf("nil surplus: expected nil, got %s", got) + } +} + +func TestScaleChainlinkAnswer_USDCFeed18Decimals(t *testing.T) { + // Realistic USDC/ETH at $3000 ETH: + // 1 USDC ≈ 0.000333 ETH → answer = 333_333_333_333_333 (3.33e14, 18 decimals) + // 1 USDC raw = 1e6 (USDC has 6 decimals) + // Expected: 1 USDC → 0.000333 ETH = 3.33e14 wei + answer := big.NewInt(333_333_333_333_333) // ~1/3000 ETH per USDC, 1e18 scale + surplus := big.NewInt(1_000_000) // 1 USDC raw + + got := scaleChainlinkAnswer(surplus, answer, 6, 18) + + want := big.NewInt(333_333_333_333_333) // back to the same magnitude — 0.000333 ETH wei + if got.Cmp(want) != 0 { + t.Errorf("scaleChainlinkAnswer = %s, want %s", got, want) + } +} + +func TestScaleChainlinkAnswer_WBTCFeed18Decimals(t *testing.T) { + // WBTC has 8 decimals. Suppose WBTC/ETH = 20 (1 BTC = 20 ETH). + // answer = 20 × 1e18 + // 1 WBTC raw = 1e8 + // Expected: 1 WBTC → 20 ETH = 20e18 wei + answer := new(big.Int).Mul(big.NewInt(20), big.NewInt(1_000_000_000_000_000_000)) + surplus := big.NewInt(100_000_000) // 1 WBTC raw + + got := scaleChainlinkAnswer(surplus, answer, 8, 18) + + want := new(big.Int).Mul(big.NewInt(20), big.NewInt(1_000_000_000_000_000_000)) + if got.Cmp(want) != 0 { + t.Errorf("scaleChainlinkAnswer WBTC = %s, want %s", got, want) + } +} + +func TestScaleChainlinkAnswer_NonStandardFeedDecimals(t *testing.T) { + // Exercises the negative-exponent branch of the scaler. + // surplus_raw = 1e6 + // answer = 333 + // token_decimals = 6 + // feed_decimals = 8 + // exp = token_decimals + feed_decimals - 18 = -4 → multiply by 10^4 + // result = 1e6 × 333 × 10^4 / 10^0 = 333 × 10^10 = 3.33e12 + answer := big.NewInt(333) + surplus := big.NewInt(1_000_000) + + got := scaleChainlinkAnswer(surplus, answer, 6, 8) + + want := big.NewInt(3_330_000_000_000) + if got.Cmp(want) != 0 { + t.Errorf("scaleChainlinkAnswer non-std feed = %s, want %s", got, want) + } +} + +func TestScaleChainlinkAnswer_HighDecimalsTokenZeroSurplus(t *testing.T) { + // Zero surplus → zero output regardless of rate. + got := scaleChainlinkAnswer(big.NewInt(0), big.NewInt(1e18), 18, 18) + if got.Sign() != 0 { + t.Errorf("zero surplus should give zero, got %s", got) + } +} + +// Note: the live Chainlink Registry / ERC20 decimals calls require an L1 RPC +// client and are not unit-tested here. The math above plus whitelist gating +// (covered in tiers_test.go) covers everything that doesn't depend on the +// network layer. The Registry round-trip is exercised in deployment by the +// reconciliation monitor (which catches drift between estimates and reality) +// and by the warning-log path inside getChainlinkRate. diff --git a/tools/fastswap-miles/cost_estimator.go b/tools/fastswap-miles/cost_estimator.go new file mode 100644 index 000000000..afcab8864 --- /dev/null +++ b/tools/fastswap-miles/cost_estimator.go @@ -0,0 +1,180 @@ +package main + +import ( + "context" + "database/sql" + "fmt" + "log/slog" + "strings" + "sync" + "time" +) + +// costEstimateLookbackDays is the rolling window over which per-token sweep +// overhead percentiles are computed. +const costEstimateLookbackDays = 14 + +// costEstimateRefreshInterval is the period between background refreshes of +// the per-token estimate cache. +const costEstimateRefreshInterval = 30 * time.Minute + +// costEstimateMinSweeps is the minimum number of recent sweeps required for +// the "primary" percentile (p25 by default). Below this threshold the +// estimator falls back to a more conservative p75 to compensate for noisy +// data on low-volume tokens. +const costEstimateMinSweeps = 10 + +// costEstimateLastResort is the hardcoded fallback overhead used when a token +// has no historical sweep data at all (brand-new token). Conservative — +// chosen so users still get *some* miles on the first swap of a novel token, +// while leaving room for protocol margin once real data arrives. +const costEstimateLastResort = 0.001 // ETH + +// costEstimate holds the per-token sweep overhead estimate (in ETH) used for +// upfront miles awarding. Estimates are refreshed periodically from +// fastswap_miles realized sweep data. +type costEstimate struct { + // PerRowOverhead is the estimated sweep overhead per user row, in ETH. + // This is the value subtracted in the miles formula in lieu of realized + // pro-rata sweep gas. + PerRowOverhead float64 + + // Source describes how this estimate was computed (for observability). + // One of: "p25", "p75_low_data", "default_no_data". + Source string + + // SweepCount is the number of sweeps the estimate is based on. + // Zero indicates the default-no-data fallback. + SweepCount int + + // ComputedAt is when this estimate was last computed. + ComputedAt time.Time +} + +// costEstimator maintains an in-memory, periodically refreshed cache of +// per-token cost estimates. It is safe for concurrent use. +type costEstimator struct { + db *sql.DB + logger *slog.Logger + + mu sync.RWMutex + estimates map[string]costEstimate // key: lowercased token hex + lastFresh time.Time +} + +func newCostEstimator(db *sql.DB, logger *slog.Logger) *costEstimator { + return &costEstimator{ + db: db, + logger: logger, + estimates: make(map[string]costEstimate), + } +} + +// Get returns the current estimate for a token, computing it from cache. +// If no estimate exists for this token, returns the default-no-data fallback. +func (c *costEstimator) Get(token string) costEstimate { + c.mu.RLock() + defer c.mu.RUnlock() + if est, ok := c.estimates[strings.ToLower(token)]; ok { + return est + } + return costEstimate{ + PerRowOverhead: costEstimateLastResort, + Source: "default_no_data", + SweepCount: 0, + ComputedAt: time.Now(), + } +} + +// Refresh recomputes per-token estimates from realized fastswap_miles data +// over the configured lookback window. This is the only method that touches +// the database; intended to be called periodically by a background goroutine. +func (c *costEstimator) Refresh(ctx context.Context) error { + rows, err := c.db.QueryContext(ctx, fmt.Sprintf(` +SELECT output_token, + COUNT(*) as n, + percentile_approx(per_row_oh, 0.25) as p25, + percentile_approx(per_row_oh, 0.75) as p75 +FROM ( + SELECT output_token, + surplus_eth - net_profit_eth - CAST(bid_cost AS DOUBLE)/1e18 as per_row_oh + FROM mevcommit_57173.fastswap_miles + WHERE processed = 1 + AND swap_type = 'erc20' + AND miles > 0 + AND surplus_eth > 0 + AND surplus_eth IS NOT NULL + AND surplus_eth < 1.0 + AND block_timestamp >= NOW() - INTERVAL %d DAY +) t +GROUP BY output_token`, costEstimateLookbackDays)) + if err != nil { + return fmt.Errorf("query cost estimates: %w", err) + } + defer func() { _ = rows.Close() }() + + fresh := make(map[string]costEstimate) + for rows.Next() { + var token string + var n int + var p25, p75 float64 + if err := rows.Scan(&token, &n, &p25, &p75); err != nil { + c.logger.Warn("cost estimate scan failed", slog.Any("error", err)) + continue + } + + var overhead float64 + var source string + if n >= costEstimateMinSweeps { + overhead = p25 + source = "p25" + } else { + overhead = p75 + source = "p75_low_data" + } + + fresh[strings.ToLower(token)] = costEstimate{ + PerRowOverhead: overhead, + Source: source, + SweepCount: n, + ComputedAt: time.Now(), + } + } + if err := rows.Err(); err != nil { + return fmt.Errorf("iterate cost estimates: %w", err) + } + + c.mu.Lock() + c.estimates = fresh + c.lastFresh = time.Now() + c.mu.Unlock() + + c.logger.Info("cost estimates refreshed", + slog.Int("tokens", len(fresh)), + slog.Duration("window", costEstimateLookbackDays*24*time.Hour)) + return nil +} + +// Run starts a background loop that refreshes estimates on the configured +// interval. Returns when the context is cancelled. Performs an immediate +// initial refresh on startup so estimates are warm before the first miles +// computation. +func (c *costEstimator) Run(ctx context.Context) { + if err := c.Refresh(ctx); err != nil { + c.logger.Warn("initial cost estimate refresh failed; using defaults", slog.Any("error", err)) + } + + ticker := time.NewTicker(costEstimateRefreshInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := c.Refresh(ctx); err != nil { + c.logger.Warn("cost estimate refresh failed", slog.Any("error", err)) + } + } + } +} diff --git a/tools/fastswap-miles/cost_estimator_test.go b/tools/fastswap-miles/cost_estimator_test.go new file mode 100644 index 000000000..10048650f --- /dev/null +++ b/tools/fastswap-miles/cost_estimator_test.go @@ -0,0 +1,93 @@ +package main + +import ( + "log/slog" + "strings" + "testing" + "time" +) + +func newTestEstimator() *costEstimator { + return &costEstimator{ + logger: slog.Default(), + estimates: make(map[string]costEstimate), + } +} + +func TestCostEstimator_Get_NoData_FallsBackToLastResort(t *testing.T) { + // Exercise the real constructor so it's not unused. + c := newCostEstimator(nil, slog.Default()) + got := c.Get("0xdeadbeef") + if got.Source != "default_no_data" { + t.Errorf("source = %q, want default_no_data", got.Source) + } + if got.PerRowOverhead != costEstimateLastResort { + t.Errorf("overhead = %v, want %v (last resort)", got.PerRowOverhead, costEstimateLastResort) + } + if got.SweepCount != 0 { + t.Errorf("sweep count = %d, want 0", got.SweepCount) + } +} + +func TestCostEstimator_Get_HighData_UsesP25(t *testing.T) { + c := newTestEstimator() + c.estimates["0xtoken"] = costEstimate{ + PerRowOverhead: 5e-5, + Source: "p25", + SweepCount: 50, + ComputedAt: time.Now(), + } + got := c.Get("0xToken") // case-insensitive lookup + if got.Source != "p25" { + t.Errorf("source = %q, want p25", got.Source) + } + if got.PerRowOverhead != 5e-5 { + t.Errorf("overhead = %v, want 5e-5", got.PerRowOverhead) + } + if got.SweepCount != 50 { + t.Errorf("sweep count = %d, want 50", got.SweepCount) + } +} + +func TestCostEstimator_Get_LowData_UsesP75(t *testing.T) { + c := newTestEstimator() + c.estimates["0xtoken"] = costEstimate{ + PerRowOverhead: 1.5e-4, + Source: "p75_low_data", + SweepCount: 5, + ComputedAt: time.Now(), + } + got := c.Get("0xtoken") + if got.Source != "p75_low_data" { + t.Errorf("source = %q, want p75_low_data", got.Source) + } +} + +func TestCostEstimator_Get_CaseInsensitive(t *testing.T) { + c := newTestEstimator() + c.estimates["0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"] = costEstimate{ + PerRowOverhead: 4e-5, + Source: "p25", + SweepCount: 100, + } + upper := c.Get(strings.ToUpper("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48")) + if upper.Source != "p25" { + t.Errorf("upper-case lookup failed: source = %q", upper.Source) + } +} + +func TestCostEstimateMinSweeps_BoundaryBehavior(t *testing.T) { + // Sanity-check the constant matches what the doc says (n >= 10 uses p25). + if costEstimateMinSweeps != 10 { + t.Errorf("costEstimateMinSweeps = %d, doc says 10", costEstimateMinSweeps) + } +} + +func TestCostEstimateLastResort_Reasonable(t *testing.T) { + // Sanity: the last-resort value should be conservative but not absurd. + // Currently 0.001 ETH (~$3 at $3000/ETH) — high enough that miles for a + // novel-token swap will be modest, low enough that it's not impossible. + if costEstimateLastResort < 1e-4 || costEstimateLastResort > 1e-2 { + t.Errorf("costEstimateLastResort = %v, expected within [1e-4, 1e-2] sanity range", costEstimateLastResort) + } +} diff --git a/tools/fastswap-miles/gas_buffer.go b/tools/fastswap-miles/gas_buffer.go new file mode 100644 index 000000000..6c1739330 --- /dev/null +++ b/tools/fastswap-miles/gas_buffer.go @@ -0,0 +1,200 @@ +package main + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "log/slog" + "math/big" + "slices" + "sync" + "time" +) + +// gasBufferMaxAge bounds how old a sample stays in the buffer. Sized for the +// longest cadence period we use (DAI/USDT at 48h) plus a small margin. +const gasBufferMaxAge = 50 * time.Hour + +// gasBufferPersistInterval controls how often the in-memory buffer is +// snapshotted to the meta table for restart resilience. +const gasBufferPersistInterval = 5 * time.Minute + +// gasBufferMetaKey is the key under which the serialized buffer is stored in +// fastswap_miles_meta. +const gasBufferMetaKey = "gas_observations_v1" + +// gasObservation is one L1 gas-price reading at a point in time. +type gasObservation struct { + At time.Time `json:"at"` + WeiPerGas uint64 `json:"wpg"` // wei per gas; nominally fits comfortably in 64 bits at typical L1 gwei +} + +// gasBuffer stores recent L1 gas observations in memory with periodic +// persistence. It is safe for concurrent use. +type gasBuffer struct { + db *sql.DB + logger *slog.Logger + + mu sync.RWMutex + data []gasObservation +} + +func newGasBuffer(db *sql.DB, logger *slog.Logger) *gasBuffer { + return &gasBuffer{ + db: db, + logger: logger, + data: make([]gasObservation, 0, 1024), + } +} + +// Observe records a gas-price reading. Pruning of old samples happens +// opportunistically here. +func (g *gasBuffer) Observe(weiPerGas *big.Int) { + if weiPerGas == nil || weiPerGas.Sign() < 0 { + return + } + wpg := weiPerGas.Uint64() + now := time.Now() + + g.mu.Lock() + defer g.mu.Unlock() + g.data = append(g.data, gasObservation{At: now, WeiPerGas: wpg}) + g.pruneLocked(now) +} + +// Percentile returns the p-th percentile (0-100) of observations within the +// given lookback window, in wei per gas. Returns (0, false) if no data falls +// in the window. +func (g *gasBuffer) Percentile(p int, lookback time.Duration) (uint64, bool) { + if p < 0 || p > 100 { + return 0, false + } + cutoff := time.Now().Add(-lookback) + + g.mu.RLock() + values := make([]uint64, 0, len(g.data)) + for _, obs := range g.data { + if obs.At.After(cutoff) { + values = append(values, obs.WeiPerGas) + } + } + g.mu.RUnlock() + + if len(values) == 0 { + return 0, false + } + slices.Sort(values) + + // Nearest-rank percentile, 1-indexed, ceiling convention: + // rank = ceil(p/100 × N) implemented via integer math. + rank := min(max((p*len(values)+99)/100, 1), len(values)) + return values[rank-1], true +} + +// Size returns the number of samples currently in the buffer. +func (g *gasBuffer) Size() int { + g.mu.RLock() + defer g.mu.RUnlock() + return len(g.data) +} + +func (g *gasBuffer) pruneLocked(now time.Time) { + cutoff := now.Add(-gasBufferMaxAge) + keep := 0 + for _, obs := range g.data { + if obs.At.After(cutoff) { + break + } + keep++ + } + if keep > 0 { + g.data = append(g.data[:0], g.data[keep:]...) + } +} + +// persist writes the current buffer to the meta table. Best-effort; logs +// failures and returns nil unless the DB call itself errors. +func (g *gasBuffer) persist(ctx context.Context) error { + g.mu.RLock() + snapshot := make([]gasObservation, len(g.data)) + copy(snapshot, g.data) + g.mu.RUnlock() + + blob, err := json.Marshal(snapshot) + if err != nil { + return fmt.Errorf("marshal gas buffer: %w", err) + } + + _, err = g.db.ExecContext(ctx, ` +INSERT INTO mevcommit_57173.fastswap_miles_meta (k, v) VALUES (?, ?) +`, gasBufferMetaKey, string(blob)) + if err != nil { + return fmt.Errorf("persist gas buffer: %w", err) + } + g.logger.Debug("gas buffer persisted", slog.Int("samples", len(snapshot))) + return nil +} + +// load reads the persisted buffer on startup. Missing or invalid data leaves +// the buffer empty (will refill from live observations). +func (g *gasBuffer) load(ctx context.Context) error { + var raw string + err := g.db.QueryRowContext(ctx, + "SELECT v FROM mevcommit_57173.fastswap_miles_meta WHERE k = ?", + gasBufferMetaKey).Scan(&raw) + if err == sql.ErrNoRows { + g.logger.Info("no persisted gas buffer; starting empty") + return nil + } + if err != nil { + return fmt.Errorf("load gas buffer: %w", err) + } + + var snapshot []gasObservation + if err := json.Unmarshal([]byte(raw), &snapshot); err != nil { + g.logger.Warn("persisted gas buffer is corrupt; starting empty", + slog.Any("error", err)) + return nil + } + + now := time.Now() + cutoff := now.Add(-gasBufferMaxAge) + g.mu.Lock() + defer g.mu.Unlock() + g.data = g.data[:0] + for _, obs := range snapshot { + if obs.At.After(cutoff) { + g.data = append(g.data, obs) + } + } + g.logger.Info("gas buffer loaded", slog.Int("samples", len(g.data))) + return nil +} + +// Run starts the periodic persistence loop. Returns when ctx is cancelled. +// Loads the persisted snapshot before entering the loop. +func (g *gasBuffer) Run(ctx context.Context) { + if err := g.load(ctx); err != nil { + g.logger.Warn("gas buffer load failed; continuing with empty buffer", + slog.Any("error", err)) + } + + ticker := time.NewTicker(gasBufferPersistInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + // Best-effort final flush. + flushCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + _ = g.persist(flushCtx) + cancel() + return + case <-ticker.C: + if err := g.persist(ctx); err != nil { + g.logger.Warn("gas buffer persist failed", slog.Any("error", err)) + } + } + } +} diff --git a/tools/fastswap-miles/gas_buffer_test.go b/tools/fastswap-miles/gas_buffer_test.go new file mode 100644 index 000000000..0ec38dc0e --- /dev/null +++ b/tools/fastswap-miles/gas_buffer_test.go @@ -0,0 +1,127 @@ +package main + +import ( + "log/slog" + "math/big" + "testing" + "time" +) + +func newTestGasBuffer() *gasBuffer { + return &gasBuffer{ + logger: slog.Default(), + data: make([]gasObservation, 0, 32), + } +} + +func TestGasBuffer_Observe_AppendsSample(t *testing.T) { + // Exercise the real constructor so it isn't reported as unused. + g := newGasBuffer(nil, slog.Default()) + g.Observe(big.NewInt(1_000_000_000)) // 1 gwei + if g.Size() != 1 { + t.Errorf("Size = %d, want 1", g.Size()) + } +} + +func TestGasBuffer_Observe_RejectsNegativeAndNil(t *testing.T) { + g := newTestGasBuffer() + g.Observe(nil) + g.Observe(big.NewInt(-1)) + if g.Size() != 0 { + t.Errorf("Size = %d, want 0 (negative/nil rejected)", g.Size()) + } +} + +func TestGasBuffer_Percentile_EmptyReturnsFalse(t *testing.T) { + g := newTestGasBuffer() + _, ok := g.Percentile(50, 1*time.Hour) + if ok { + t.Errorf("expected ok=false for empty buffer") + } +} + +func TestGasBuffer_Percentile_SingleSample(t *testing.T) { + g := newTestGasBuffer() + g.Observe(big.NewInt(5_000_000_000)) + val, ok := g.Percentile(50, 1*time.Hour) + if !ok { + t.Fatalf("expected ok=true with single sample") + } + if val != 5_000_000_000 { + t.Errorf("p50 of single sample = %d, want 5_000_000_000", val) + } +} + +func TestGasBuffer_Percentile_NearestRank(t *testing.T) { + g := newTestGasBuffer() + for _, v := range []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} { + g.Observe(big.NewInt(v)) + } + cases := []struct { + p int + want uint64 + }{ + {25, 3}, // rank = (25*10)/100 = 2 → values[2-1+1]? nearest-rank: rank=ceil(0.25*10)=3 → values[2]=3 + {50, 5}, // rank = 5 → values[4] = 5 + {75, 8}, // rank = 7.5 → 8 → values[7] = 8 + {90, 9}, // rank = 9 → values[8] = 9 + {100, 10}, + } + for _, c := range cases { + got, ok := g.Percentile(c.p, 1*time.Hour) + if !ok { + t.Errorf("p%d: ok=false unexpectedly", c.p) + continue + } + if got != c.want { + t.Errorf("p%d = %d, want %d", c.p, got, c.want) + } + } +} + +func TestGasBuffer_Percentile_LookbackWindow(t *testing.T) { + g := newTestGasBuffer() + + // Inject "old" samples directly (before lookback) and "recent" samples. + now := time.Now() + g.data = append(g.data, + gasObservation{At: now.Add(-2 * time.Hour), WeiPerGas: 100}, + gasObservation{At: now.Add(-2 * time.Hour), WeiPerGas: 200}, + ) + g.Observe(big.NewInt(50)) // recent + + val, ok := g.Percentile(50, 30*time.Minute) + if !ok { + t.Fatalf("expected ok=true within lookback window") + } + if val != 50 { + t.Errorf("p50 within 30m lookback = %d, want 50 (older samples should be excluded)", val) + } +} + +func TestGasBuffer_Percentile_OutOfRangeReturnsFalse(t *testing.T) { + g := newTestGasBuffer() + g.Observe(big.NewInt(1)) + + if _, ok := g.Percentile(-1, time.Hour); ok { + t.Errorf("expected ok=false for p=-1") + } + if _, ok := g.Percentile(101, time.Hour); ok { + t.Errorf("expected ok=false for p=101") + } +} + +func TestGasBuffer_Prune_OldSamplesDropped(t *testing.T) { + g := newTestGasBuffer() + + // Inject samples older than gasBufferMaxAge directly. + old := time.Now().Add(-gasBufferMaxAge - time.Hour) + g.data = append(g.data, gasObservation{At: old, WeiPerGas: 999}) + + // Trigger prune via a fresh Observe (which calls pruneLocked). + g.Observe(big.NewInt(1)) + + if g.Size() != 1 { + t.Errorf("Size = %d, want 1 (old sample should be pruned)", g.Size()) + } +} diff --git a/tools/fastswap-miles/miles.go b/tools/fastswap-miles/miles.go index b2a872190..3cd565ad1 100644 --- a/tools/fastswap-miles/miles.go +++ b/tools/fastswap-miles/miles.go @@ -88,6 +88,15 @@ type serviceConfig struct { FundsRecipient common.Address SettlementAddr common.Address MaxGasGwei uint64 + + // Sweep redesign components. Optional in the sense that nil values are + // tolerated by every consumer (sweep loop, miles processor) so this + // foundation commit can land without any of them being wired up yet — + // production wiring lives in the integration commit's main.go. + PriceOracle *priceOracle + CostEstimator *costEstimator + GasBuffer *gasBuffer + SweepClock *sweepClock } type ethRow struct { diff --git a/tools/fastswap-miles/reconciliation.go b/tools/fastswap-miles/reconciliation.go new file mode 100644 index 000000000..2faec8cd4 --- /dev/null +++ b/tools/fastswap-miles/reconciliation.go @@ -0,0 +1,210 @@ +package main + +import ( + "context" + "database/sql" + "fmt" + "log/slog" + "time" +) + +// reconciliationLookbackDays is the rolling window used by the reconciliation +// metric. Sized to be much larger than the longest sweep cadence so the +// timing mismatch between "miles awarded for a swap" and "sweep settling" +// averages out within the window. +const reconciliationLookbackDays = 7 + +// reconciliationInterval is how often the metric is recomputed and logged. +const reconciliationInterval = 1 * time.Hour + +// reconciliationAlertHigh / Low are the ratio thresholds at which we surface +// log-level alerts. Above 1.0 means we're over-paying users (estimates are +// too aggressive); well below 1.0 means we're under-paying (estimates are +// too conservative). +const ( + reconciliationAlertHigh = 1.05 + reconciliationAlertLow = 0.75 +) + +// reconciliationStats is the snapshot of one metric run. +type reconciliationStats struct { + MilesPaidETH float64 + RealizedProfitETH float64 + Ratio float64 // miles_paid / realized_profit + NRowsAwarded int + NSweepsRealized int + WindowDays int + ComputedAt time.Time +} + +// reconciliationMonitor periodically computes the +// (miles_paid_eth / realized_sweep_profit_eth) ratio over the configured +// lookback window. Used as a tuning signal: if the ratio drifts above 1.0 +// for sustained periods, the per-token estimate percentile should be raised. +type reconciliationMonitor struct { + db *sql.DB + logger *slog.Logger + executorAddr string // lowercased hex +} + +func newReconciliationMonitor(db *sql.DB, logger *slog.Logger, executorAddr string) *reconciliationMonitor { + return &reconciliationMonitor{ + db: db, + logger: logger, + executorAddr: executorAddr, + } +} + +// Compute runs one reconciliation pass and returns the stats. +func (r *reconciliationMonitor) Compute(ctx context.Context) (reconciliationStats, error) { + stats := reconciliationStats{ + WindowDays: reconciliationLookbackDays, + ComputedAt: time.Now(), + } + + // Miles awarded to real users (excludes executor) over the lookback. + // miles_paid_eth = sum(miles) * weiPerPoint / 1e18. This is the 90% + // user-share basis. Total economic-value basis is miles_paid_eth / 0.9. + var totalMiles int64 + err := r.db.QueryRowContext(ctx, fmt.Sprintf(` +SELECT COALESCE(SUM(miles), 0) AS total_miles, COUNT(*) AS n +FROM mevcommit_57173.fastswap_miles +WHERE processed = 1 + AND miles > 0 + AND LOWER(user_address) != ? + AND block_timestamp >= NOW() - INTERVAL %d DAY +`, reconciliationLookbackDays), r.executorAddr).Scan(&totalMiles, &stats.NRowsAwarded) + if err != nil { + return stats, fmt.Errorf("query miles paid: %w", err) + } + stats.MilesPaidETH = float64(totalMiles) * float64(weiPerPoint) / 1e18 + + // Realized sweep profit: executor's sweep rows. ETH received from sweep + // minus L1 gas paid. user_amt_out + surplus is the gross ETH return + // (since output token is ETH for these rows); gas_cost is the L1 gas cost. + err = r.db.QueryRowContext(ctx, fmt.Sprintf(` +SELECT + COALESCE(SUM( + (CAST(user_amt_out AS DOUBLE) + CAST(surplus AS DOUBLE) - CAST(gas_cost AS DOUBLE)) / 1e18 + ), 0) AS realized_eth, + COUNT(*) AS n_sweeps +FROM mevcommit_57173.fastswap_miles +WHERE LOWER(user_address) = ? + AND swap_type = 'eth_weth' + AND block_timestamp >= NOW() - INTERVAL %d DAY +`, reconciliationLookbackDays), r.executorAddr).Scan(&stats.RealizedProfitETH, &stats.NSweepsRealized) + if err != nil { + return stats, fmt.Errorf("query realized profit: %w", err) + } + + if stats.RealizedProfitETH > 0 { + stats.Ratio = stats.MilesPaidETH / stats.RealizedProfitETH + } + return stats, nil +} + +// perTokenBreakdown holds a per-output-token slice of the reconciliation +// metric. Useful for spotting tokens that drift individually while the +// aggregate stays balanced. +type perTokenBreakdown struct { + Token string + MilesPaidETH float64 + NRows int +} + +// ComputePerToken returns the per-output-token miles-paid breakdown over +// the same lookback window. Sweep-side realized profit isn't attributed to a +// single output token in fastswap_miles (sweep tx rows have swap_type=eth_weth +// and don't carry the swept token), so this method only reports the +// obligation side. Pair with sweep_executed logs to compare against realized. +func (r *reconciliationMonitor) ComputePerToken(ctx context.Context) ([]perTokenBreakdown, error) { + rows, err := r.db.QueryContext(ctx, fmt.Sprintf(` +SELECT LOWER(output_token) AS token, + SUM(miles) AS miles_sum, + COUNT(*) AS n +FROM mevcommit_57173.fastswap_miles +WHERE processed = 1 + AND miles > 0 + AND LOWER(user_address) != ? + AND swap_type = 'erc20' + AND block_timestamp >= NOW() - INTERVAL %d DAY +GROUP BY LOWER(output_token) +ORDER BY miles_sum DESC +`, reconciliationLookbackDays), r.executorAddr) + if err != nil { + return nil, fmt.Errorf("per-token reconciliation query: %w", err) + } + defer func() { _ = rows.Close() }() + + var out []perTokenBreakdown + for rows.Next() { + var token string + var milesSum int64 + var n int + if err := rows.Scan(&token, &milesSum, &n); err != nil { + return out, err + } + out = append(out, perTokenBreakdown{ + Token: token, + MilesPaidETH: float64(milesSum) * float64(weiPerPoint) / 1e18, + NRows: n, + }) + } + return out, rows.Err() +} + +// Run starts the periodic reconciliation loop. Returns when ctx is +// cancelled. Each tick computes and logs the metric; threshold breaches log +// at warning level. +func (r *reconciliationMonitor) Run(ctx context.Context) { + ticker := time.NewTicker(reconciliationInterval) + defer ticker.Stop() + + tick := func() { + stats, err := r.Compute(ctx) + if err != nil { + r.logger.Warn("reconciliation compute failed", slog.Any("error", err)) + return + } + level := slog.LevelInfo + switch { + case stats.Ratio > reconciliationAlertHigh: + level = slog.LevelWarn + case stats.Ratio > 0 && stats.Ratio < reconciliationAlertLow: + level = slog.LevelWarn + } + r.logger.Log(ctx, level, "reconciliation_metric", + slog.Float64("ratio", stats.Ratio), + slog.Float64("miles_paid_eth", stats.MilesPaidETH), + slog.Float64("realized_profit_eth", stats.RealizedProfitETH), + slog.Int("rows_awarded", stats.NRowsAwarded), + slog.Int("sweeps_realized", stats.NSweepsRealized), + slog.Int("window_days", stats.WindowDays)) + + // Per-token breakdown: helps spot a single token drifting while + // the aggregate looks healthy. Best-effort — a query failure here + // shouldn't suppress the aggregate metric. + breakdown, err := r.ComputePerToken(ctx) + if err != nil { + r.logger.Warn("per-token reconciliation failed", slog.Any("error", err)) + return + } + for _, b := range breakdown { + r.logger.Info("reconciliation_per_token", + slog.String("token", b.Token), + slog.Float64("miles_paid_eth", b.MilesPaidETH), + slog.Int("rows", b.NRows), + slog.Int("window_days", stats.WindowDays)) + } + } + + tick() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + tick() + } + } +} diff --git a/tools/fastswap-miles/reconciliation_test.go b/tools/fastswap-miles/reconciliation_test.go new file mode 100644 index 000000000..c69110461 --- /dev/null +++ b/tools/fastswap-miles/reconciliation_test.go @@ -0,0 +1,48 @@ +package main + +import ( + "log/slog" + "testing" + "time" +) + +func TestReconciliation_Constructor(t *testing.T) { + r := newReconciliationMonitor(nil, slog.Default(), "0xabc") + if r.executorAddr != "0xabc" { + t.Errorf("executorAddr = %q, want 0xabc", r.executorAddr) + } +} + +func TestReconciliation_ConstantsSane(t *testing.T) { + if reconciliationAlertHigh <= 1.0 { + t.Errorf("alert high = %v, should be above 1.0 (over-paying threshold)", reconciliationAlertHigh) + } + if reconciliationAlertLow >= 1.0 { + t.Errorf("alert low = %v, should be below 1.0 (under-paying threshold)", reconciliationAlertLow) + } + if reconciliationLookbackDays < 1 { + t.Errorf("lookback = %d, expected at least 1 day", reconciliationLookbackDays) + } + if reconciliationInterval < time.Minute { + t.Errorf("interval = %v, expected at least 1 minute (avoid hammering DB)", reconciliationInterval) + } +} + +func TestReconciliationStats_RatioComputedCorrectly(t *testing.T) { + // Sanity check the math relationship without hitting a DB. + // 100 miles awarded × 1e-5 ETH/mile = 1e-3 ETH user share. + // Realized profit 2e-3 ETH → ratio = 0.5 (under-paying). + totalMiles := int64(100) + milesEth := float64(totalMiles) * float64(weiPerPoint) / 1e18 + if milesEth != 1e-3 { + t.Errorf("miles ETH conversion = %v, want 0.001", milesEth) + } + realized := 2e-3 + ratio := milesEth / realized + if ratio != 0.5 { + t.Errorf("ratio = %v, want 0.5", ratio) + } + if ratio >= reconciliationAlertLow { + t.Errorf("ratio 0.5 should be below alert low %v", reconciliationAlertLow) + } +} diff --git a/tools/fastswap-miles/sweep_clock_persist_test.go b/tools/fastswap-miles/sweep_clock_persist_test.go new file mode 100644 index 000000000..b7d1cb92b --- /dev/null +++ b/tools/fastswap-miles/sweep_clock_persist_test.go @@ -0,0 +1,168 @@ +package main + +import ( + "encoding/json" + "strings" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" +) + +// Persistence touches the DB, so a full round-trip test requires an in-memory +// SQL stub which the rest of this package doesn't yet have. The +// JSON-encoded representation IS the persisted blob, though, so testing the +// encoding/decoding step covers the failure mode that actually matters in +// practice (data shape changes silently breaking restore). + +func TestSweepClock_PersistEncodingRoundtrip(t *testing.T) { + original := newSweepClock() + t1 := time.Date(2026, 5, 1, 10, 0, 0, 0, time.UTC) + t2 := time.Date(2026, 5, 9, 14, 30, 0, 0, time.UTC) + original.MarkSwept(usdcAddr, t1) + original.MarkSwept(wbtcAddr, t2) + + // Encode the same way persist() does: snapshot → checksum-hex keys → + // JSON blob. + snapshot := original.Snapshot() + encoded := make(map[string]time.Time, len(snapshot)) + for k, v := range snapshot { + encoded[k.Hex()] = v + } + blob, err := json.Marshal(encoded) + if err != nil { + t.Fatalf("marshal: %v", err) + } + + // Decode the same way load() does: JSON → checksum-hex keys → addresses. + var decoded map[string]time.Time + if err := json.Unmarshal(blob, &decoded); err != nil { + t.Fatalf("unmarshal: %v", err) + } + rehydrated := make(map[common.Address]time.Time, len(decoded)) + for k, v := range decoded { + rehydrated[common.HexToAddress(k)] = v + } + + restored := newSweepClock() + restored.Restore(rehydrated) + + if !restored.LastSwept(usdcAddr).Equal(t1) { + t.Errorf("USDC restored = %v, want %v", restored.LastSwept(usdcAddr), t1) + } + if !restored.LastSwept(wbtcAddr).Equal(t2) { + t.Errorf("WBTC restored = %v, want %v", restored.LastSwept(wbtcAddr), t2) + } + // A token never marked must come back as zero. + if !restored.LastSwept(daiAddr).IsZero() { + t.Errorf("DAI should be zero, got %v", restored.LastSwept(daiAddr)) + } +} + +func TestSweepClock_PersistEncodingHandlesEmptyClock(t *testing.T) { + c := newSweepClock() + encoded := make(map[string]time.Time) + for k, v := range c.Snapshot() { + encoded[k.Hex()] = v + } + blob, err := json.Marshal(encoded) + if err != nil { + t.Fatalf("marshal empty: %v", err) + } + // Empty map serializes as `{}`; ensure we don't accidentally produce + // `null` (which would unmarshal back as nil and look like missing). + if !strings.HasPrefix(string(blob), "{") { + t.Errorf("empty clock JSON = %q, want object literal", string(blob)) + } +} + +func TestSweepLoopConstantsSane(t *testing.T) { + // 1.05× sweep gas — leaves margin for between-quote gas drift. + if sweepProfitabilityNumerator != 105 { + t.Errorf("sweepProfitabilityNumerator = %d, want 105 (5%% margin)", sweepProfitabilityNumerator) + } + // Dust threshold should be tiny — most tokens have 6+ decimals so 1000 + // raw units is essentially nothing. If raised significantly, real + // sweeps could be erroneously skipped. + if sweepDustRawUnits != 1000 { + t.Errorf("sweepDustRawUnits = %d, want 1000", sweepDustRawUnits) + } + // Barter min-return fraction matches the deferred path's value (0.98) + // for consistency. + if sweepBarterMinReturnFraction != 0.98 { + t.Errorf("sweepBarterMinReturnFraction = %f, want 0.98", sweepBarterMinReturnFraction) + } + // Override margin: 20% buffer above raw breakeven before bypassing + // cadence. Below 1.0 would allow overriding into losing trades. + if cadenceOverrideMargin < 1.0 { + t.Errorf("cadenceOverrideMargin = %v; must be >= 1.0 to avoid bypassing cadence into losses", cadenceOverrideMargin) + } +} + +func TestCadenceOverrideMet_AboveThreshold(t *testing.T) { + // 100 rows × 0.00005 ETH = 0.005 ETH budget. + // Sweep gas = 0.003 ETH × 1.2 = 0.0036 threshold. + // 0.005 >= 0.0036 → override fires. + if !cadenceOverrideMet(100, 0.00005, 0.003) { + t.Errorf("100 rows at $0.00005 per-row vs $0.003 sweep gas should override") + } +} + +func TestCadenceOverrideMet_BelowThreshold(t *testing.T) { + // 10 rows × 0.00005 = 0.0005 budget. + // 0.003 × 1.2 = 0.0036 threshold. Far below. + if cadenceOverrideMet(10, 0.00005, 0.003) { + t.Errorf("10 rows should not be enough to override at $0.003 sweep gas") + } +} + +func TestCadenceOverrideMet_ExactlyAtThreshold(t *testing.T) { + // At exactly the threshold (budget == 1.2 × sweep_gas), allow override. + // 1000 rows × 0.0001 = 0.1 budget. Sweep gas = 0.1 / 1.2 ≈ 0.0833. + if !cadenceOverrideMet(1000, 0.0001, 0.1/1.2) { + t.Errorf("budget exactly at threshold should still trigger override") + } +} + +func TestCadenceOverrideMet_NoVolumeNoOverride(t *testing.T) { + if cadenceOverrideMet(0, 0.0001, 0.001) { + t.Errorf("0 rows must never override") + } +} + +func TestCadenceOverrideMet_NoCostEstimateNoOverride(t *testing.T) { + // Zero or negative per-row overhead means we don't have an estimate; + // override should never fire because we have no budget to compare to. + if cadenceOverrideMet(1000, 0, 0.001) { + t.Errorf("zero per-row estimate must never override") + } + if cadenceOverrideMet(1000, -1, 0.001) { + t.Errorf("negative per-row estimate must never override") + } +} + +func TestIsBarterAmountTooLow(t *testing.T) { + cases := []struct { + err error + want bool + }{ + {nil, false}, + {errStrFunc("barter API error 400: Amount too low for token"), true}, + {errStrFunc("amount too low"), true}, + {errStrFunc("connection refused"), false}, + {errStrFunc("barter API error 500: internal server error"), false}, + } + for _, c := range cases { + if got := isBarterAmountTooLow(c.err); got != c.want { + t.Errorf("isBarterAmountTooLow(%v) = %v, want %v", c.err, got, c.want) + } + } +} + +// errStrFunc returns a one-shot error with the given message. Local helper to +// avoid importing the errors package just for this test file. +type strErr string + +func (e strErr) Error() string { return string(e) } + +func errStrFunc(s string) error { return strErr(s) } diff --git a/tools/fastswap-miles/sweep_loop.go b/tools/fastswap-miles/sweep_loop.go new file mode 100644 index 000000000..7354df443 --- /dev/null +++ b/tools/fastswap-miles/sweep_loop.go @@ -0,0 +1,446 @@ +package main + +import ( + "context" + "fmt" + "log/slog" + "math/big" + "strconv" + "strings" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" +) + +// sweepProfitabilityNumerator is the multiplier in the profitability guard +// applied to the sweep gas cost. expected_return > sweep_gas × 1.05 is the +// minimum margin to fire a sweep — 5% buffer for gas-price slippage between +// quote and execution. +const sweepProfitabilityNumerator = 105 + +// sweepDustRawUnits is a per-token raw-units floor below which we don't bother +// fetching a Barter quote. Most tokens have 6+ decimals so 1000 raw units is +// effectively zero (1e-3 units of a 6-decimal token, etc). +const sweepDustRawUnits = 1000 + +// sweepBarterMinReturnFraction is the lower bound on Barter's MinReturn +// relative to its quoted output, used in the cadence-sweep quote request. +// Mirrors the deferred path's value (0.98). +const sweepBarterMinReturnFraction = 0.98 + +const erc20BalanceOfABI = `[ + {"constant":true,"inputs":[{"name":"_owner","type":"address"}], + "name":"balanceOf", + "outputs":[{"name":"balance","type":"uint256"}], + "type":"function"} +]` + +// sweepLoop runs cadence-based per-token sweeps for tokens whose surplus is +// already credited to users via upfront miles awarding. The miles pipeline +// has marked those rows processed, so this loop's sole job is to convert +// accumulated surplus tokens to ETH on the configured cadence — fully +// decoupled from any miles processing for them. +// +// Tokens whose rows are deferred (non-whitelisted, no Chainlink feed) are +// swept by processDeferredERC20Batch instead; that path triggers its own +// sweep when its batch becomes profitable. Both paths share a profitability +// guard and the same submitFastSwapSweep underneath, so they don't conflict +// — the wallet balance is the source of truth for what's available. +type sweepLoop struct { + cfg *serviceConfig + balance abi.ABI +} + +func newSweepLoop(cfg *serviceConfig) (*sweepLoop, error) { + a, err := abi.JSON(strings.NewReader(erc20BalanceOfABI)) + if err != nil { + return nil, fmt.Errorf("parse balanceOf ABI: %w", err) + } + return &sweepLoop{cfg: cfg, balance: a}, nil +} + +// RunOnce iterates every whitelisted token in tokenConfigs once and processes +// any that are due for a sweep. Cheap to call repeatedly; the cadence check +// is in-memory and short-circuits before any RPC traffic for tokens that +// haven't reached their cadence floor. +func (s *sweepLoop) RunOnce(ctx context.Context) { + if s.cfg.SweepClock == nil || s.cfg.GasBuffer == nil { + return + } + for token, tokCfg := range tokenConfigs { + if ctx.Err() != nil { + return + } + s.processToken(ctx, token, tokCfg) + } +} + +func (s *sweepLoop) processToken(ctx context.Context, token common.Address, tokCfg tokenConfig) { + cfg := s.cfg + now := time.Now() + lastSwept := cfg.SweepClock.LastSwept(token) + elapsed := now.Sub(lastSwept) + + // Cadence floor with high-volume override. If cadence hasn't elapsed, + // check whether enough rows have accumulated that we can sweep at a + // per-row cost within our estimated overhead even at current gas. If + // so, bypass cadence (and below, the gas cap too). This is the "we + // don't need to wait when volume already justifies sweeping" path. + earlyOverride := false + if tokCfg.SweepCadence > 0 && elapsed < tokCfg.SweepCadence { + ok, nRows, estGasWei := s.shouldOverrideCadenceEarly(ctx, token, lastSwept) + if !ok { + return + } + earlyOverride = true + cfg.Logger.Info("cadence_override_triggered", + slog.String("token", token.Hex()), + slog.String("tier", tokCfg.Tier.String()), + slog.Duration("elapsed", elapsed), + slog.Duration("cadence", tokCfg.SweepCadence), + slog.Int("rows_since_last_sweep", nRows), + slog.Float64("est_sweep_gas_eth", weiToEth(estGasWei)), + slog.String("reason", "high_volume_amortizes_below_per_row_estimate")) + } + + balance, err := s.fetchBalance(ctx, token) + if err != nil { + cfg.Logger.Warn("balanceOf failed", slog.String("token", token.Hex()), slog.Any("error", err)) + return + } + if balance.Cmp(big.NewInt(sweepDustRawUnits)) <= 0 { + // Dust — log only at debug, this is fine. + cfg.Logger.Debug("sweep_evaluation_skipped_dust", + slog.String("token", token.Hex()), + slog.String("balance", balance.String())) + return + } + + gasPrice, err := cfg.Client.SuggestGasPrice(ctx) + if err != nil { + cfg.Logger.Warn("suggest gas price failed", slog.Any("error", err)) + return + } + + var decision sweepDecisionOutput + if earlyOverride { + // Volume-based override: skip the gas-cap percentile gate entirely. + // Profitability gate still applies below — we never sweep at a loss. + decision = sweepDecisionOutput{ + Decision: SweepForce, + Reason: "early_volume_override", + } + } else { + decision = decideSweep(sweepDecisionInput{ + Now: now, + Cfg: tokCfg, + LastSweepAt: lastSwept, + CurrentGasWei: gasPrice.Uint64(), + Buf: cfg.GasBuffer, + }) + } + + cfg.Logger.Debug("sweep_evaluation", + slog.String("token", token.Hex()), + slog.String("tier", tokCfg.Tier.String()), + slog.Duration("since_last_sweep", elapsed), + slog.String("balance", balance.String()), + slog.Uint64("gas_wei", gasPrice.Uint64()), + slog.Uint64("gas_cap_wei", decision.GasCapWei), + slog.Bool("has_gas_data", decision.HasGasData), + slog.String("decision", decision.Decision.String()), + slog.String("reason", decision.Reason), + slog.Bool("early_override", earlyOverride)) + + if decision.Decision == SweepSkip { + cfg.Logger.Info("sweep_skipped", + slog.String("token", token.Hex()), + slog.String("tier", tokCfg.Tier.String()), + slog.String("reason", decision.Reason), + slog.Uint64("gas_wei", gasPrice.Uint64()), + slog.Uint64("gas_cap_wei", decision.GasCapWei), + slog.Duration("since_last_sweep", elapsed)) + return + } + + // Attempt or Force: get a fresh Barter quote for the entire balance. + barterResp, err := callBarter(ctx, cfg.HTTPClient, cfg.BarterURL, cfg.BarterKey, barterRequest{ + Source: token.Hex(), + Target: cfg.WETH.Hex(), + SellAmount: balance.String(), + Recipient: cfg.ExecutorAddr.Hex(), + Origin: cfg.ExecutorAddr.Hex(), + MinReturnFraction: sweepBarterMinReturnFraction, + Deadline: fmt.Sprintf("%d", now.Add(10*time.Minute).Unix()), + }) + if err != nil { + // "Amount too low" is steady-state expected for tokens with small + // accumulated balances; demote to debug to keep the warn stream + // clean. Other errors (timeouts, malformed responses) still warn. + if isBarterAmountTooLow(err) { + cfg.Logger.Debug("cadence sweep skipped: balance under barter minimum", + slog.String("token", token.Hex()), slog.String("balance", balance.String())) + } else { + cfg.Logger.Warn("cadence sweep barter quote failed", + slog.String("token", token.Hex()), slog.Any("error", err)) + } + return + } + + gasLimit, err := strconv.ParseUint(barterResp.GasLimit, 10, 64) + if err != nil { + cfg.Logger.Warn("invalid gasLimit from barter", + slog.String("token", token.Hex()), slog.String("gasLimit", barterResp.GasLimit)) + return + } + gasLimit += 50000 + + expectedGas := new(big.Int).Mul(big.NewInt(int64(gasLimit)), gasPrice) + expectedReturn, ok := new(big.Int).SetString(barterResp.MinReturn, 10) + if !ok { + cfg.Logger.Warn("invalid MinReturn from barter", + slog.String("token", token.Hex()), slog.String("minReturn", barterResp.MinReturn)) + return + } + + // Profitability guard: expected_return > sweep_gas × 1.05. + // This is the absolute floor — even force-sweep applies it. We never + // sweep at a loss. + threshold := new(big.Int).Mul(expectedGas, big.NewInt(sweepProfitabilityNumerator)) + threshold.Div(threshold, big.NewInt(100)) + if expectedReturn.Cmp(threshold) <= 0 { + cfg.Logger.Info("sweep_skipped", + slog.String("token", token.Hex()), + slog.String("tier", tokCfg.Tier.String()), + slog.String("reason", "unprofitable"), + slog.String("decision", decision.Decision.String()), + slog.Float64("expected_return_eth", weiToEth(expectedReturn)), + slog.Float64("sweep_gas_eth", weiToEth(expectedGas)), + slog.Float64("threshold_eth", weiToEth(threshold))) + return + } + + if cfg.DryRun { + cfg.Logger.Info("sweep_executed_dryrun", + slog.String("token", token.Hex()), + slog.String("tier", tokCfg.Tier.String()), + slog.String("decision", decision.Decision.String()), + slog.Float64("expected_return_eth", weiToEth(expectedReturn)), + slog.Float64("expected_gas_eth", weiToEth(expectedGas))) + cfg.SweepClock.MarkSwept(token, now) + return + } + + actualReturn, actualGas, err := submitFastSwapSweep( + ctx, cfg.Logger, cfg.Client, cfg.L1Client, cfg.HTTPClient, cfg.Signer, + cfg.ExecutorAddr, token, balance, cfg.FastSwapURL, cfg.FundsRecipient, + cfg.SettlementAddr, barterResp, cfg.MaxGasGwei, + ) + if err != nil { + cfg.Logger.Error("cadence sweep failed", + slog.String("token", token.Hex()), slog.Any("error", err)) + return + } + + cfg.SweepClock.MarkSwept(token, now) + actualNetEth := new(big.Int).Sub(actualReturn, actualGas) + cfg.Logger.Info("sweep_executed", + slog.String("token", token.Hex()), + slog.String("tier", tokCfg.Tier.String()), + slog.String("decision", decision.Decision.String()), + slog.String("balance_swept", balance.String()), + slog.Float64("actual_return_eth", weiToEth(actualReturn)), + slog.Float64("actual_gas_eth", weiToEth(actualGas)), + slog.Float64("actual_net_eth", weiToEth(actualNetEth))) + + // Compare the just-realized sweep profit against the miles obligation + // already accrued for this token's recent rows. Surfaces the question + // "are we awarding more miles than we're earning?" per-sweep, not just + // in the hourly aggregate. Best-effort — a query failure shouldn't + // hold up the sweep marking. + if obligation, err := s.queryRecentMilesObligation(ctx, token); err == nil { + cfg.Logger.Info("sweep_vs_obligation", + slog.String("token", token.Hex()), + slog.Float64("realized_net_eth", weiToEth(actualNetEth)), + slog.Float64("recent_miles_obligation_eth", obligation.milesEth), + slog.Int("recent_miles_rows", obligation.nRows), + slog.Int("lookback_days", sweepObligationLookbackDays), + slog.Float64("ratio", safeRatio(obligation.milesEth, weiToEth(actualNetEth)))) + } else { + cfg.Logger.Debug("sweep vs obligation query failed", + slog.String("token", token.Hex()), slog.Any("error", err)) + } +} + +// sweepObligationLookbackDays bounds how far back we sum miles for the +// post-sweep comparison. Matched to the cadence-period scale; long enough +// that a stable token's last few sweeps fall in the window, short enough +// that it reflects recent estimate accuracy rather than ancient history. +const sweepObligationLookbackDays = 7 + +type recentObligation struct { + milesEth float64 + nRows int +} + +func (s *sweepLoop) queryRecentMilesObligation(ctx context.Context, token common.Address) (recentObligation, error) { + var milesSum int64 + var n int + err := s.cfg.DB.QueryRowContext(ctx, fmt.Sprintf(` +SELECT COALESCE(SUM(miles), 0), COUNT(*) +FROM mevcommit_57173.fastswap_miles +WHERE processed = 1 + AND miles > 0 + AND swap_type = 'erc20' + AND LOWER(output_token) = ? + AND LOWER(user_address) != ? + AND block_timestamp >= NOW() - INTERVAL %d DAY +`, sweepObligationLookbackDays), + strings.ToLower(token.Hex()), + strings.ToLower(s.cfg.ExecutorAddr.Hex()), + ).Scan(&milesSum, &n) + if err != nil { + return recentObligation{}, err + } + return recentObligation{ + milesEth: float64(milesSum) * float64(weiPerPoint) / 1e18, + nRows: n, + }, nil +} + +func safeRatio(num, den float64) float64 { + if den == 0 { + return 0 + } + return num / den +} + +// isBarterAmountTooLow reports whether a Barter error is the "Amount too low" +// response — Barter rejects inputs below its per-token minimum, and this is +// the dominant steady-state error for early/small batches. Callers use it to +// demote noise from warn to debug while keeping real errors loud. +func isBarterAmountTooLow(err error) bool { + if err == nil { + return false + } + msg := err.Error() + return strings.Contains(msg, "Amount too low") || strings.Contains(msg, "amount too low") +} + +// cadenceOverrideMargin is the safety multiplier on estimated sweep gas +// when checking whether volume justifies an early sweep. A value of 1.2 +// means we require the row-count budget to exceed sweep gas by 20% before +// bypassing cadence — buffer for gas drift between estimate and execution. +const cadenceOverrideMargin = 1.2 + +// typicalSweepGasLimit is a conservative gas-limit estimate used purely to +// size the "is current gas affordable for sweeping right now" check +// without making an extra Barter call. Real sweeps will discover their +// actual gasLimit from the Barter response. Slightly overestimating here +// just makes the early-override harder to trigger — preferable to under- +// estimating and overriding when sweep would actually be expensive. +const typicalSweepGasLimit = 500_000 + +// cadenceOverrideLookbackCap bounds the COUNT(*) query when a token has +// never been swept (lastSwept is zero). 14 days mirrors the cost +// estimator's lookback so the threshold uses comparable row volumes. +const cadenceOverrideLookbackCap = 14 * 24 * time.Hour + +// shouldOverrideCadenceEarly reports whether accumulated row volume since +// the last sweep is high enough that the realized sweep gas can be +// amortized below our per-row overhead estimate at current gas levels. +// +// threshold: N × per_row_overhead_estimate >= cadenceOverrideMargin × est_sweep_gas +// +// This says: we have enough rows that even at the current gas, sweeping +// would land us at or under our estimated per-row cost. No reason to wait +// for cadence — bigger batches won't materially improve the protocol P&L +// from here, and waiting just adds inventory risk on stables. +// +// Returns (override, nRows, est_sweep_gas_wei) for caller logging. +func (s *sweepLoop) shouldOverrideCadenceEarly( + ctx context.Context, token common.Address, lastSwept time.Time, +) (bool, int, *big.Int) { + cfg := s.cfg + if cfg.CostEstimator == nil { + return false, 0, nil + } + est := cfg.CostEstimator.Get(strings.ToLower(token.Hex())) + if est.PerRowOverhead <= 0 { + return false, 0, nil + } + + // Recent gas median — cheaper than another SuggestGasPrice and reflects + // typical gas in the window we'd actually sweep at. Cold start (no + // samples yet): cannot evaluate; never override. + medianGas, ok := cfg.GasBuffer.Percentile(50, 30*time.Minute) + if !ok { + return false, 0, nil + } + estSweepGasWei := new(big.Int).Mul( + new(big.Int).SetUint64(medianGas), big.NewInt(typicalSweepGasLimit)) + estSweepGasEth := weiToEth(estSweepGasWei) + + sinceTS := lastSwept + if sinceTS.IsZero() { + sinceTS = time.Now().Add(-cadenceOverrideLookbackCap) + } + var nRows int + err := cfg.DB.QueryRowContext(ctx, ` +SELECT COUNT(*) +FROM mevcommit_57173.fastswap_miles +WHERE swap_type = 'erc20' + AND LOWER(output_token) = ? + AND block_timestamp >= ? +`, strings.ToLower(token.Hex()), sinceTS).Scan(&nRows) + if err != nil { + cfg.Logger.Debug("cadence override row-count query failed", + slog.String("token", token.Hex()), slog.Any("error", err)) + return false, 0, estSweepGasWei + } + if nRows == 0 { + return false, 0, estSweepGasWei + } + + return cadenceOverrideMet(nRows, est.PerRowOverhead, estSweepGasEth), nRows, estSweepGasWei +} + +// cadenceOverrideMet applies the override threshold: +// +// N × per_row_overhead >= cadenceOverrideMargin × estimated_sweep_gas_eth +// +// Separated out so the threshold math is unit-testable without needing a DB +// or gas-buffer fixture. +func cadenceOverrideMet(nRows int, perRowOverheadEth, estSweepGasEth float64) bool { + if nRows <= 0 || perRowOverheadEth <= 0 { + return false + } + budget := float64(nRows) * perRowOverheadEth + threshold := estSweepGasEth * cadenceOverrideMargin + return budget >= threshold +} + +func (s *sweepLoop) fetchBalance(ctx context.Context, token common.Address) (*big.Int, error) { + data, err := s.balance.Pack("balanceOf", s.cfg.ExecutorAddr) + if err != nil { + return nil, fmt.Errorf("pack balanceOf: %w", err) + } + raw, err := s.cfg.Client.CallContract(ctx, ethereum.CallMsg{To: &token, Data: data}, nil) + if err != nil { + return nil, fmt.Errorf("call balanceOf: %w", err) + } + out, err := s.balance.Unpack("balanceOf", raw) + if err != nil { + return nil, fmt.Errorf("unpack balanceOf: %w", err) + } + if len(out) < 1 { + return nil, fmt.Errorf("empty balanceOf output") + } + bal, ok := out[0].(*big.Int) + if !ok { + return nil, fmt.Errorf("balanceOf returned %T not *big.Int", out[0]) + } + return bal, nil +} diff --git a/tools/fastswap-miles/sweep_scheduler.go b/tools/fastswap-miles/sweep_scheduler.go new file mode 100644 index 000000000..674b8b952 --- /dev/null +++ b/tools/fastswap-miles/sweep_scheduler.go @@ -0,0 +1,251 @@ +package main + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "log/slog" + "maps" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" +) + +// sweepClockPersistInterval bounds how often the in-memory clock is +// snapshotted to the meta table for restart resilience. +const sweepClockPersistInterval = 5 * time.Minute + +// sweepClockMetaKey is the row in fastswap_miles_meta that stores the +// JSON-serialized per-token last-swept timestamps. +const sweepClockMetaKey = "sweep_clock_v1" + +// sweepDecision is the scheduler's verdict for a single token at a single +// evaluation tick. Profitability is intentionally NOT a scheduler concern — +// the scheduler decides "are we even allowed to attempt"; the caller then +// runs the (expensive) Barter quote and applies the profitability guard. +type sweepDecision int + +const ( + // SweepSkip — do not attempt this cycle. Either cadence hasn't elapsed, + // or gas is above the tier cap. + SweepSkip sweepDecision = iota + // SweepAttempt — eligible to sweep if profitability passes. Gas cap + // passed (or there's no gas data yet). + SweepAttempt + // SweepForce — force-sweep window reached. Gas cap is waived; the caller + // should still apply the profitability guard (we never sweep at a loss). + SweepForce +) + +func (d sweepDecision) String() string { + switch d { + case SweepSkip: + return "skip" + case SweepAttempt: + return "attempt" + case SweepForce: + return "force" + default: + return "unknown" + } +} + +// sweepDecisionInput packages the runtime state needed to make a decision. +type sweepDecisionInput struct { + Now time.Time + Cfg tokenConfig + LastSweepAt time.Time // zero value means "never swept" + CurrentGasWei uint64 // wei per gas + Buf *gasBuffer +} + +// sweepDecisionOutput is the scheduler's verdict plus enough context for +// downstream logging and reconciliation. +type sweepDecisionOutput struct { + Decision sweepDecision + Reason string // short tag for logs + GasCapWei uint64 // wei per gas; the cap that was applied (0 if no cap) + HasGasData bool // false when the gas buffer had no samples in the window +} + +// decideSweep applies cadence, gas-cap, and force-sweep logic. Profitability +// is checked separately by the caller after a successful Barter quote. +func decideSweep(in sweepDecisionInput) sweepDecisionOutput { + elapsed := in.Now.Sub(in.LastSweepAt) + + // Cadence floor (only applies when SweepCadence > 0; volatile has zero). + if in.Cfg.SweepCadence > 0 && elapsed < in.Cfg.SweepCadence { + return sweepDecisionOutput{ + Decision: SweepSkip, + Reason: "waiting_for_cadence", + } + } + + // Force-sweep window: bypass the gas cap, but profitability still applies + // downstream so we never sweep at a loss. + if elapsed >= in.Cfg.forceSweepInterval() { + return sweepDecisionOutput{ + Decision: SweepForce, + Reason: "force_sweep_window", + } + } + + // Gas cap check using percentile of recent observations. + gasCap, ok := in.Buf.Percentile(in.Cfg.Tier.GasCapPercentile(), in.Cfg.gasCapLookback()) + if !ok { + // No gas data yet (cold start). Allow the attempt; profitability + // guard is still the absolute floor. + return sweepDecisionOutput{ + Decision: SweepAttempt, + Reason: "no_gas_data", + HasGasData: false, + } + } + + if in.CurrentGasWei > gasCap { + return sweepDecisionOutput{ + Decision: SweepSkip, + Reason: "gas_above_cap", + GasCapWei: gasCap, + HasGasData: true, + } + } + + return sweepDecisionOutput{ + Decision: SweepAttempt, + Reason: "gas_within_cap", + GasCapWei: gasCap, + HasGasData: true, + } +} + +// sweepClock tracks per-token last-successful-sweep timestamps. Used by the +// scheduler to compute the elapsed-since-last-sweep input. +// +// Persisted across restarts so cadence enforcement isn't reset by pod +// recycles. (Persistence wired through main.go alongside the gas buffer.) +type sweepClock struct { + mu sync.RWMutex + lastSweep map[common.Address]time.Time // key: token address +} + +func newSweepClock() *sweepClock { + return &sweepClock{lastSweep: make(map[common.Address]time.Time)} +} + +// MarkSwept records a successful sweep for the token. Call only after the +// sweep transaction has been submitted (or simulated in dry-run). +func (c *sweepClock) MarkSwept(token common.Address, at time.Time) { + c.mu.Lock() + defer c.mu.Unlock() + c.lastSweep[token] = at +} + +// LastSwept returns the time of the most recent successful sweep for the +// token. Returns the zero time if the token has never been swept (which the +// scheduler treats as "elapsed = a very long time" — i.e., it'll force-sweep +// on the first opportunity, subject to profitability). +func (c *sweepClock) LastSwept(token common.Address) time.Time { + c.mu.RLock() + defer c.mu.RUnlock() + return c.lastSweep[token] +} + +// Snapshot returns a copy of the per-token timestamps. Used for persistence. +func (c *sweepClock) Snapshot() map[common.Address]time.Time { + c.mu.RLock() + defer c.mu.RUnlock() + out := make(map[common.Address]time.Time, len(c.lastSweep)) + maps.Copy(out, c.lastSweep) + return out +} + +// Restore replaces the in-memory state with the provided snapshot. Used +// during startup-from-persistence. +func (c *sweepClock) Restore(snapshot map[common.Address]time.Time) { + c.mu.Lock() + defer c.mu.Unlock() + c.lastSweep = make(map[common.Address]time.Time, len(snapshot)) + maps.Copy(c.lastSweep, snapshot) +} + +// persist writes the current snapshot to the meta table. Best-effort. +func (c *sweepClock) persist(ctx context.Context, db *sql.DB) error { + // Map keys serialize to checksum-cased hex addresses; reload normalizes + // back via common.HexToAddress. + snapshot := c.Snapshot() + encoded := make(map[string]time.Time, len(snapshot)) + for k, v := range snapshot { + encoded[k.Hex()] = v + } + blob, err := json.Marshal(encoded) + if err != nil { + return fmt.Errorf("marshal sweep clock: %w", err) + } + _, err = db.ExecContext(ctx, ` +INSERT INTO mevcommit_57173.fastswap_miles_meta (k, v) VALUES (?, ?) +`, sweepClockMetaKey, string(blob)) + if err != nil { + return fmt.Errorf("persist sweep clock: %w", err) + } + return nil +} + +// load reads the persisted snapshot on startup. Missing or invalid data +// leaves the clock empty (every token's lastSweep is zero, scheduler +// treats that as "elapsed = a very long time" and is willing to sweep on +// the first opportunity subject to profitability). +func (c *sweepClock) load(ctx context.Context, db *sql.DB, logger *slog.Logger) error { + var raw string + err := db.QueryRowContext(ctx, + "SELECT v FROM mevcommit_57173.fastswap_miles_meta WHERE k = ?", + sweepClockMetaKey).Scan(&raw) + if err == sql.ErrNoRows { + logger.Info("no persisted sweep clock; starting empty") + return nil + } + if err != nil { + return fmt.Errorf("load sweep clock: %w", err) + } + var encoded map[string]time.Time + if err := json.Unmarshal([]byte(raw), &encoded); err != nil { + logger.Warn("persisted sweep clock is corrupt; starting empty", + slog.Any("error", err)) + return nil + } + snapshot := make(map[common.Address]time.Time, len(encoded)) + for k, v := range encoded { + snapshot[common.HexToAddress(k)] = v + } + c.Restore(snapshot) + logger.Info("sweep clock loaded", slog.Int("tokens", len(snapshot))) + return nil +} + +// Run starts the periodic persistence loop. Returns when ctx is cancelled. +// Loads the persisted snapshot before entering the loop. +func (c *sweepClock) Run(ctx context.Context, db *sql.DB, logger *slog.Logger) { + if err := c.load(ctx, db, logger); err != nil { + logger.Warn("sweep clock load failed; continuing with empty clock", + slog.Any("error", err)) + } + + ticker := time.NewTicker(sweepClockPersistInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + flushCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + _ = c.persist(flushCtx, db) + cancel() + return + case <-ticker.C: + if err := c.persist(ctx, db); err != nil { + logger.Warn("sweep clock persist failed", slog.Any("error", err)) + } + } + } +} diff --git a/tools/fastswap-miles/sweep_scheduler_test.go b/tools/fastswap-miles/sweep_scheduler_test.go new file mode 100644 index 000000000..f67ee14cd --- /dev/null +++ b/tools/fastswap-miles/sweep_scheduler_test.go @@ -0,0 +1,205 @@ +package main + +import ( + "log/slog" + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" +) + +// helper: a buffer pre-loaded with N samples at uniform values so percentile +// math is predictable. +func bufWithUniform(n int, weiPerGas uint64) *gasBuffer { + g := &gasBuffer{ + logger: slog.Default(), + data: make([]gasObservation, 0, n), + } + now := time.Now() + for i := range n { + g.data = append(g.data, gasObservation{At: now.Add(-time.Duration(i) * time.Second), WeiPerGas: weiPerGas}) + } + return g +} + +func TestDecideSweep_SkipBeforeCadence(t *testing.T) { + cfg := lookupTokenConfig(usdcAddr) // 24h cadence + out := decideSweep(sweepDecisionInput{ + Now: time.Now(), + Cfg: cfg, + LastSweepAt: time.Now().Add(-1 * time.Hour), // recent sweep + CurrentGasWei: 1_000_000, + Buf: bufWithUniform(100, 1_000_000_000), + }) + if out.Decision != SweepSkip { + t.Errorf("Decision = %v, want SweepSkip", out.Decision) + } + if out.Reason != "waiting_for_cadence" { + t.Errorf("Reason = %q, want waiting_for_cadence", out.Reason) + } +} + +func TestDecideSweep_AttemptAfterCadence_GasOk(t *testing.T) { + cfg := lookupTokenConfig(usdcAddr) // 24h cadence + // Buffer sample = 5_000_000_000 (5 gwei), current = 1_000_000_000 (1 gwei) → below p25. + out := decideSweep(sweepDecisionInput{ + Now: time.Now(), + Cfg: cfg, + LastSweepAt: time.Now().Add(-25 * time.Hour), + CurrentGasWei: 1_000_000_000, + Buf: bufWithUniform(100, 5_000_000_000), + }) + if out.Decision != SweepAttempt { + t.Errorf("Decision = %v, want SweepAttempt", out.Decision) + } + if !out.HasGasData { + t.Errorf("HasGasData = false, want true") + } +} + +func TestDecideSweep_SkipAboveGasCap(t *testing.T) { + cfg := lookupTokenConfig(usdcAddr) // 24h cadence, p25 cap + // All samples at 5 gwei → p25 = 5 gwei. Current at 10 gwei should skip. + out := decideSweep(sweepDecisionInput{ + Now: time.Now(), + Cfg: cfg, + LastSweepAt: time.Now().Add(-25 * time.Hour), + CurrentGasWei: 10_000_000_000, + Buf: bufWithUniform(100, 5_000_000_000), + }) + if out.Decision != SweepSkip { + t.Errorf("Decision = %v, want SweepSkip", out.Decision) + } + if out.Reason != "gas_above_cap" { + t.Errorf("Reason = %q, want gas_above_cap", out.Reason) + } +} + +func TestDecideSweep_ForceAfterCadenceX1_5(t *testing.T) { + cfg := lookupTokenConfig(usdcAddr) // 24h cadence → 36h force + out := decideSweep(sweepDecisionInput{ + Now: time.Now(), + Cfg: cfg, + LastSweepAt: time.Now().Add(-37 * time.Hour), + CurrentGasWei: 999_999_999_999, // very high gas — would normally skip + Buf: bufWithUniform(100, 1_000_000_000), + }) + if out.Decision != SweepForce { + t.Errorf("Decision = %v, want SweepForce", out.Decision) + } +} + +func TestDecideSweep_VolatileNoCadenceFloor(t *testing.T) { + cfg := lookupTokenConfig(pepeAddr) // 0 cadence + out := decideSweep(sweepDecisionInput{ + Now: time.Now(), + Cfg: cfg, + LastSweepAt: time.Now().Add(-1 * time.Minute), // very recent + CurrentGasWei: 1, + Buf: bufWithUniform(100, 1_000_000_000), + }) + if out.Decision != SweepAttempt { + t.Errorf("Decision = %v, want SweepAttempt (volatile has no cadence floor)", out.Decision) + } +} + +func TestDecideSweep_VolatileForceAt6h(t *testing.T) { + cfg := lookupTokenConfig(pepeAddr) // 6h force + out := decideSweep(sweepDecisionInput{ + Now: time.Now(), + Cfg: cfg, + LastSweepAt: time.Now().Add(-7 * time.Hour), + CurrentGasWei: 999_999_999_999, + Buf: bufWithUniform(100, 1_000_000_000), + }) + if out.Decision != SweepForce { + t.Errorf("Decision = %v, want SweepForce", out.Decision) + } +} + +func TestDecideSweep_NoGasData_AllowsAttempt(t *testing.T) { + cfg := lookupTokenConfig(usdcAddr) + emptyBuf := &gasBuffer{logger: slog.Default(), data: nil} + out := decideSweep(sweepDecisionInput{ + Now: time.Now(), + Cfg: cfg, + LastSweepAt: time.Now().Add(-25 * time.Hour), + CurrentGasWei: 100, + Buf: emptyBuf, + }) + if out.Decision != SweepAttempt { + t.Errorf("Decision = %v, want SweepAttempt", out.Decision) + } + if out.Reason != "no_gas_data" { + t.Errorf("Reason = %q, want no_gas_data", out.Reason) + } + if out.HasGasData { + t.Errorf("HasGasData = true, want false") + } +} + +func TestSweepClock_NeverSweptIsZero(t *testing.T) { + c := newSweepClock() + if got := c.LastSwept(usdcAddr); !got.IsZero() { + t.Errorf("LastSwept on never-swept = %v, want zero time", got) + } +} + +func TestSweepClock_MarkAndGet(t *testing.T) { + c := newSweepClock() + now := time.Now() + c.MarkSwept(usdcAddr, now) + if got := c.LastSwept(usdcAddr); !got.Equal(now) { + t.Errorf("LastSwept = %v, want %v", got, now) + } +} + +func TestSweepClock_SnapshotAndRestore(t *testing.T) { + c := newSweepClock() + now := time.Now() + c.MarkSwept(usdcAddr, now) + c.MarkSwept(usdtAddr, now.Add(-1*time.Hour)) + + snap := c.Snapshot() + if len(snap) != 2 { + t.Fatalf("snapshot size = %d, want 2", len(snap)) + } + + c2 := newSweepClock() + c2.Restore(snap) + if got := c2.LastSwept(usdcAddr); !got.Equal(now) { + t.Errorf("restored USDC = %v, want %v", got, now) + } +} + +func TestSweepClock_RestoreReplacesState(t *testing.T) { + c := newSweepClock() + c.MarkSwept(usdcAddr, time.Now()) + // Restore an unrelated snapshot — should clear USDC. + c.Restore(map[common.Address]time.Time{ + usdtAddr: time.Now().Add(-1 * time.Hour), + }) + if got := c.LastSwept(usdcAddr); !got.IsZero() { + t.Errorf("USDC after restore = %v, want zero (not in snapshot)", got) + } + // Quiet the unused-import warning if big.Int isn't used elsewhere. + _ = big.NewInt(0) +} + +func TestSweepDecisionString(t *testing.T) { + cases := []struct { + d sweepDecision + want string + }{ + {SweepSkip, "skip"}, + {SweepAttempt, "attempt"}, + {SweepForce, "force"}, + {sweepDecision(99), "unknown"}, + } + for _, c := range cases { + if got := c.d.String(); got != c.want { + t.Errorf("decision %d = %q, want %q", c.d, got, c.want) + } + } +} diff --git a/tools/fastswap-miles/tiers.go b/tools/fastswap-miles/tiers.go index e893d0da2..75e9fb052 100644 --- a/tools/fastswap-miles/tiers.go +++ b/tools/fastswap-miles/tiers.go @@ -8,8 +8,8 @@ import ( ) // Tier classifies a token by price-stability profile. The tier drives sweep -// cadence, the percentile of recent sweep gas used to estimate per-user cost -// at miles-awarding time, and the gas cap above which sweeps are deferred. +// cadence, the gas-cap percentile applied during sweep timing, and the +// fallback behavior for the upfront cost estimate. type Tier int const ( @@ -31,13 +31,37 @@ func (t Tier) String() string { } } -// tokenConfig holds the per-token sweep parameters. Values are tuned by tier -// and refined per token where the realized data justifies a different default. +// GasCapPercentile returns the percentile of recent L1 gas observations above +// which sweep attempts are deferred. Stable tokens are most selective (only +// sweep at the cheapest 25% of recent gas); volatile tokens sweep at almost +// any gas (only blocked at the top 25%). The lookback window for the +// percentile computation is the cadence period (or 6h for volatile). +func (t Tier) GasCapPercentile() int { + switch t { + case TierStable: + return 25 + case TierBlueChip: + return 50 + case TierVolatile: + return 75 + default: + return 75 + } +} + +// tokenConfig holds the per-token sweep parameters. type tokenConfig struct { - Tier Tier - SweepCadence time.Duration // target maximum interval between sweeps - CostEstimatePctile int // percentile of recent sweep gas to use as upfront cost estimate - ExpectedBatchSize int // assumed batch size for per-user cost dilution + Tier Tier + + // SweepCadence is the target maximum interval between sweep attempts. + // During this window, the sweep loop deliberately waits for the batch to + // grow. After cadence elapses, the loop attempts to sweep subject to + // profitability and gas-cap checks. Force-sweep happens at 1.5×cadence. + // + // A zero value means "no cadence floor" — try every cycle. Used for + // volatile tokens where price risk dominates batching benefit; force-sweep + // for those is a fixed 6h since last attempt (handled in the scheduler). + SweepCadence time.Duration } // L1 mainnet token addresses for the configured set. @@ -56,45 +80,62 @@ var ( pepeAddr = common.HexToAddress("0x6982508145454ce325ddbe47a25d4ec3d2311933") ) -// tokenConfigs maps known L1 token addresses (lowercased hex) to their sweep -// configuration. Lookup is case-insensitive via lookupTokenConfig. Unknown -// tokens fall through to defaultTokenConfig. +// tokenConfigs maps known L1 token addresses to their sweep configuration. +// Unknown addresses fall through to defaultTokenConfig. // -// Cadence values are derived from the Apr 13-27 stable-volume window: -// - USDC (~31 swaps/day): daily yields ~30-row batches. -// - USDT (~17 swaps/day): every 2d yields ~34-row batches. -// - DAI (~8 swaps/day, mostly 1-3/day): every 5d yields ~38-row batches. -// - Blue chips (3-15 swaps/day): daily is reasonable; batch sizes are smaller. -// - Volatile (low/sporadic volume): 6h to limit price-risk exposure. -// -// CostEstimatePctile and ExpectedBatchSize together set how much "buffer" the -// protocol keeps when paying out miles upfront. Stables use a low percentile -// (p40) and a generous batch-size assumption (30) because their realized cost -// is consistent and batches reliably exceed 30; the difference is protocol -// upside. Volatiles use p75 and batch-size 1 — the worst-case assumption. +// Cadence values are calibrated against the Apr 13-27 stable-volume window: +// - USDC (~31 swaps/day): daily cadence yields ~30-row batches. +// - USDT (~17 swaps/day): 48h yields ~34-row batches. +// - DAI (~8 swaps/day): 48h yields ~15-row batches (compromise between +// the daily-default preference and the value of larger batches). +// - Blue chips: 24h with smaller batches. +// - Volatile / unknowns: zero cadence (every cycle eligible) with 6h +// force-sweep, because price risk dominates batching benefit. var tokenConfigs = map[common.Address]tokenConfig{ - usdcAddr: {TierStable, 24 * time.Hour, 40, 30}, - usdtAddr: {TierStable, 48 * time.Hour, 40, 30}, - daiAddr: {TierStable, 120 * time.Hour, 40, 30}, - wbtcAddr: {TierBlueChip, 24 * time.Hour, 50, 3}, - arbAddr: {TierBlueChip, 24 * time.Hour, 50, 3}, - linkAddr: {TierBlueChip, 24 * time.Hour, 50, 3}, - compAddr: {TierBlueChip, 24 * time.Hour, 50, 3}, - uniAddr: {TierBlueChip, 24 * time.Hour, 50, 3}, - sushiAddr: {TierBlueChip, 24 * time.Hour, 50, 3}, - inchAddr: {TierBlueChip, 24 * time.Hour, 50, 3}, - yfiAddr: {TierBlueChip, 24 * time.Hour, 50, 3}, - pepeAddr: {TierVolatile, 6 * time.Hour, 75, 1}, + usdcAddr: {TierStable, 24 * time.Hour}, + usdtAddr: {TierStable, 48 * time.Hour}, + daiAddr: {TierStable, 48 * time.Hour}, + wbtcAddr: {TierBlueChip, 24 * time.Hour}, + arbAddr: {TierBlueChip, 24 * time.Hour}, + linkAddr: {TierBlueChip, 24 * time.Hour}, + compAddr: {TierBlueChip, 24 * time.Hour}, + uniAddr: {TierBlueChip, 24 * time.Hour}, + sushiAddr: {TierBlueChip, 24 * time.Hour}, + inchAddr: {TierBlueChip, 24 * time.Hour}, + yfiAddr: {TierBlueChip, 24 * time.Hour}, + pepeAddr: {TierVolatile, 0}, } // defaultTokenConfig is used when an output token is not in tokenConfigs. -// Conservative defaults: treat as volatile, sweep every 6h, assume size-1 -// batches at p75 of recent costs. +// Conservative defaults: treat as volatile, no cadence floor. var defaultTokenConfig = tokenConfig{ - Tier: TierVolatile, - SweepCadence: 6 * time.Hour, - CostEstimatePctile: 75, - ExpectedBatchSize: 1, + Tier: TierVolatile, + SweepCadence: 0, +} + +// volatileForceSweepInterval is the time since last sweep attempt at which a +// volatile token's gas cap is dropped (profitability still required). +const volatileForceSweepInterval = 6 * time.Hour + +// forceSweepInterval returns the time-since-last-sweep at which the gas cap +// is dropped (profitability remains the only check). For stable/bluechip +// tokens this is 1.5× cadence; for volatile tokens it is a fixed 6h. +func (c tokenConfig) forceSweepInterval() time.Duration { + if c.SweepCadence == 0 { + return volatileForceSweepInterval + } + return c.SweepCadence + c.SweepCadence/2 +} + +// gasCapLookback returns the lookback window for computing the gas cap +// percentile. For stable/bluechip tokens this matches the cadence period +// (so "cheap relative to gas during the period we'd actually consider +// sweeping"). For volatile tokens it is 6h. +func (c tokenConfig) gasCapLookback() time.Duration { + if c.SweepCadence == 0 { + return volatileForceSweepInterval + } + return c.SweepCadence } // lookupTokenConfig returns the configuration for a token address, case @@ -110,3 +151,21 @@ func lookupTokenConfig(addr common.Address) tokenConfig { } return defaultTokenConfig } + +// isWhitelisted reports whether the token is explicitly listed in +// tokenConfigs (rather than falling through to defaultTokenConfig). Used to +// gate upfront miles awarding: only whitelisted output tokens are eligible, +// because an attacker who mints their own token and controls its on-chain +// liquidity could otherwise extract miles for surplus that has no realizable +// ETH value. Unknown tokens defer to sweep time, where the realized swap +// result is the source of truth (and where attacker tokens never sweep +// because Barter can't quote them and the profitability gate blocks). +func isWhitelisted(addr common.Address) bool { + if _, ok := tokenConfigs[addr]; ok { + return true + } + if _, ok := tokenConfigs[common.HexToAddress(strings.ToLower(addr.Hex()))]; ok { + return true + } + return false +} diff --git a/tools/fastswap-miles/tiers_test.go b/tools/fastswap-miles/tiers_test.go index e1fb8213a..f7daaeb44 100644 --- a/tools/fastswap-miles/tiers_test.go +++ b/tools/fastswap-miles/tiers_test.go @@ -25,6 +25,22 @@ func TestTierString(t *testing.T) { } } +func TestTierGasCapPercentile(t *testing.T) { + cases := []struct { + tier Tier + want int + }{ + {TierStable, 25}, + {TierBlueChip, 50}, + {TierVolatile, 75}, + } + for _, c := range cases { + if got := c.tier.GasCapPercentile(); got != c.want { + t.Errorf("Tier(%v).GasCapPercentile() = %d, want %d", c.tier, got, c.want) + } + } +} + func TestLookupTokenConfig_KnownStable(t *testing.T) { cfg := lookupTokenConfig(usdcAddr) if cfg.Tier != TierStable { @@ -33,11 +49,13 @@ func TestLookupTokenConfig_KnownStable(t *testing.T) { if cfg.SweepCadence != 24*time.Hour { t.Errorf("USDC cadence = %v, want 24h", cfg.SweepCadence) } - if cfg.CostEstimatePctile != 40 { - t.Errorf("USDC pctile = %d, want 40", cfg.CostEstimatePctile) - } - if cfg.ExpectedBatchSize != 30 { - t.Errorf("USDC batch = %d, want 30", cfg.ExpectedBatchSize) +} + +func TestLookupTokenConfig_DAICadence(t *testing.T) { + // DAI cadence was tightened from 5d to 48h after backtesting. + cfg := lookupTokenConfig(daiAddr) + if cfg.SweepCadence != 48*time.Hour { + t.Errorf("DAI cadence = %v, want 48h", cfg.SweepCadence) } } @@ -46,22 +64,22 @@ func TestLookupTokenConfig_KnownBlueChip(t *testing.T) { if cfg.Tier != TierBlueChip { t.Errorf("WBTC tier = %v, want bluechip", cfg.Tier) } - if cfg.ExpectedBatchSize != 3 { - t.Errorf("WBTC batch = %d, want 3", cfg.ExpectedBatchSize) + if cfg.SweepCadence != 24*time.Hour { + t.Errorf("WBTC cadence = %v, want 24h", cfg.SweepCadence) } } -func TestLookupTokenConfig_KnownVolatile(t *testing.T) { +func TestLookupTokenConfig_KnownVolatileNoCadence(t *testing.T) { cfg := lookupTokenConfig(pepeAddr) if cfg.Tier != TierVolatile { t.Errorf("PEPE tier = %v, want volatile", cfg.Tier) } - if cfg.SweepCadence != 6*time.Hour { - t.Errorf("PEPE cadence = %v, want 6h", cfg.SweepCadence) + if cfg.SweepCadence != 0 { + t.Errorf("PEPE cadence = %v, want zero (no cadence floor)", cfg.SweepCadence) } } -func TestLookupTokenConfig_UnknownDefaultsVolatile(t *testing.T) { +func TestLookupTokenConfig_UnknownDefaultsVolatileNoCadence(t *testing.T) { addr := common.HexToAddress("0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef") cfg := lookupTokenConfig(addr) if cfg != defaultTokenConfig { @@ -70,10 +88,12 @@ func TestLookupTokenConfig_UnknownDefaultsVolatile(t *testing.T) { if cfg.Tier != TierVolatile { t.Errorf("default tier = %v, want volatile", cfg.Tier) } + if cfg.SweepCadence != 0 { + t.Errorf("default cadence = %v, want zero (no cadence floor)", cfg.SweepCadence) + } } func TestLookupTokenConfig_CaseInsensitive(t *testing.T) { - // Same address, different cases. upper := common.HexToAddress(strings.ToUpper(usdcAddr.Hex())) lower := common.HexToAddress(strings.ToLower(usdcAddr.Hex())) if lookupTokenConfig(upper).Tier != TierStable { @@ -84,6 +104,39 @@ func TestLookupTokenConfig_CaseInsensitive(t *testing.T) { } } +func TestForceSweepInterval(t *testing.T) { + // Stable / bluechip force-sweep at 1.5×cadence. + usdc := lookupTokenConfig(usdcAddr) + if got := usdc.forceSweepInterval(); got != 36*time.Hour { + t.Errorf("USDC force-sweep interval = %v, want 36h", got) + } + usdt := lookupTokenConfig(usdtAddr) + if got := usdt.forceSweepInterval(); got != 72*time.Hour { + t.Errorf("USDT force-sweep interval = %v, want 72h", got) + } + + // Volatile force-sweep at fixed 6h regardless of cadence. + pepe := lookupTokenConfig(pepeAddr) + if got := pepe.forceSweepInterval(); got != 6*time.Hour { + t.Errorf("PEPE force-sweep interval = %v, want 6h", got) + } +} + +func TestGasCapLookback(t *testing.T) { + usdc := lookupTokenConfig(usdcAddr) + if got := usdc.gasCapLookback(); got != 24*time.Hour { + t.Errorf("USDC gas-cap lookback = %v, want 24h", got) + } + dai := lookupTokenConfig(daiAddr) + if got := dai.gasCapLookback(); got != 48*time.Hour { + t.Errorf("DAI gas-cap lookback = %v, want 48h", got) + } + pepe := lookupTokenConfig(pepeAddr) + if got := pepe.gasCapLookback(); got != 6*time.Hour { + t.Errorf("PEPE gas-cap lookback = %v, want 6h (volatile fallback)", got) + } +} + func TestAllStableConfigsConsistent(t *testing.T) { stables := []common.Address{usdcAddr, usdtAddr, daiAddr} for _, a := range stables { @@ -91,13 +144,36 @@ func TestAllStableConfigsConsistent(t *testing.T) { if cfg.Tier != TierStable { t.Errorf("%s tier = %v, want stable", a.Hex(), cfg.Tier) } - if cfg.CostEstimatePctile != 40 { - t.Errorf("%s pctile = %d, want 40", a.Hex(), cfg.CostEstimatePctile) + if cfg.SweepCadence == 0 { + t.Errorf("%s cadence is zero; stable tokens require a cadence floor", a.Hex()) } - if cfg.ExpectedBatchSize != 30 { - t.Errorf("%s batch = %d, want 30", a.Hex(), cfg.ExpectedBatchSize) + } +} + +func TestIsWhitelisted(t *testing.T) { + whitelistedTokens := []common.Address{ + usdcAddr, usdtAddr, daiAddr, + wbtcAddr, arbAddr, linkAddr, compAddr, uniAddr, sushiAddr, inchAddr, yfiAddr, + pepeAddr, + } + for _, a := range whitelistedTokens { + if !isWhitelisted(a) { + t.Errorf("%s should be whitelisted", a.Hex()) } } + + // Random unknown token must not be whitelisted (otherwise the + // attacker-token defense is broken). + random := common.HexToAddress("0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef") + if isWhitelisted(random) { + t.Errorf("unknown token %s incorrectly whitelisted", random.Hex()) + } + + // Case-insensitive lookup. + upper := common.HexToAddress(strings.ToUpper(usdcAddr.Hex())) + if !isWhitelisted(upper) { + t.Errorf("upper-case USDC not recognized as whitelisted") + } } func TestAllBlueChipConfigsConsistent(t *testing.T) { From f6b9dc3cf7bdf778d1ee2225e9f7e43fda30ddbc Mon Sep 17 00:00:00 2001 From: owen-eth Date: Mon, 11 May 2026 13:13:55 -0400 Subject: [PATCH 2/2] feat: wire upfront fastswap miles awarding + cadence-based sweep loop --- tools/fastswap-miles/main.go | 42 +++ tools/fastswap-miles/miles.go | 691 +++++++++++++++++++++++----------- 2 files changed, 514 insertions(+), 219 deletions(-) diff --git a/tools/fastswap-miles/main.go b/tools/fastswap-miles/main.go index 0a9d15e57..af2f5323f 100644 --- a/tools/fastswap-miles/main.go +++ b/tools/fastswap-miles/main.go @@ -238,6 +238,32 @@ func main() { defer func() { _ = db.Close() }() cfg.DB = db + // Sweep-redesign components. All four can operate in dry-run + // mode (their actions are guarded behind cfg.DryRun checks at + // the call sites that mutate state). + priceOracle, err := newPriceOracle(client, weth, logger) + if err != nil { + return fmt.Errorf("newPriceOracle: %w", err) + } + cfg.PriceOracle = priceOracle + cfg.CostEstimator = newCostEstimator(db, logger) + cfg.GasBuffer = newGasBuffer(db, logger) + cfg.SweepClock = newSweepClock() + reconciliation := newReconciliationMonitor(db, logger, + strings.ToLower(executorAddr.Hex())) + + sweepLoopRunner, err := newSweepLoop(cfg) + if err != nil { + return fmt.Errorf("newSweepLoop: %w", err) + } + + // Background goroutines. Each respects ctx cancellation; the + // outer cancel() in the SIGTERM handler cascades shutdown. + go cfg.CostEstimator.Run(ctx) + go cfg.GasBuffer.Run(ctx) + go cfg.SweepClock.Run(ctx, db, logger) + go reconciliation.Run(ctx) + filterer, err := fastsettlement.NewFastsettlementv3Filterer(settlementAddr, client) if err != nil { return fmt.Errorf("NewFastsettlementv3Filterer: %w", err) @@ -307,6 +333,16 @@ func main() { // Only process miles when we've caught up to chain tip to avoid // hammering the Barter API while still indexing old blocks. if startBlock > head { + // Per-cycle gas observation feeds the buffer that drives + // the cadence-sweep gas-cap percentile. Best-effort — + // occasional miss is fine since the buffer holds 50h of + // samples at 12s cadence (~15K samples). + if gasPrice, err := client.SuggestGasPrice(ctx); err == nil { + cfg.GasBuffer.Observe(gasPrice) + } else { + logger.Debug("gas observation skipped", slog.Any("error", err)) + } + ethProcessed, err := processMiles(ctx, cfg) if err != nil { logger.Error("processMiles error", slog.Any("error", err)) @@ -322,6 +358,12 @@ func main() { if erc20Processed > 0 { logger.Info("processed ERC20 token sweeps", slog.Int("count", erc20Processed)) } + + // Cadence-based sweep of accumulated surplus from + // upfront-awarded rows. Cheap when cadence isn't met + // (in-memory check, no RPC); fires only at most once + // per cadence period per token. + sweepLoopRunner.RunOnce(ctx) } } }, diff --git a/tools/fastswap-miles/miles.go b/tools/fastswap-miles/miles.go index 3cd565ad1..86940c139 100644 --- a/tools/fastswap-miles/miles.go +++ b/tools/fastswap-miles/miles.go @@ -89,10 +89,10 @@ type serviceConfig struct { SettlementAddr common.Address MaxGasGwei uint64 - // Sweep redesign components. Optional in the sense that nil values are - // tolerated by every consumer (sweep loop, miles processor) so this - // foundation commit can land without any of them being wired up yet — - // production wiring lives in the integration commit's main.go. + // New (sweep redesign): upfront pricing + cost estimation + sweep + // scheduling components. All optional in the sense that nil values + // degrade gracefully — the miles loop simply defers every row to the + // legacy sweep-time path. Production wiring lives in main.go. PriceOracle *priceOracle CostEstimator *costEstimator GasBuffer *gasBuffer @@ -112,20 +112,16 @@ type ethRow struct { type erc20Row struct { txHash string user string - token string + token string // output_token surplus string gasCost sql.NullString inputToken string + inputAmt string // raw input token units (or ETH wei when input is ETH) + userAmtOut string // raw output token units delivered to user blockTS sql.NullTime miles sql.NullInt64 } -type tokenBatch struct { - Token string - TotalSum *big.Int - Txs []erc20Row -} - // -------------------- ETH/WETH Miles -------------------- func processMiles(ctx context.Context, cfg *serviceConfig) (int, error) { @@ -291,11 +287,66 @@ WHERE processed = false // -------------------- ERC20 Miles -------------------- +// erc20CycleStats counts the outcomes of one processERC20Miles invocation. +// Logged at the end of the cycle as `erc20_cycle_summary` so deployment can +// be monitored at a glance: a steady stream of upfront_awarded with low +// deferred_no_chainlink confirms the upfront path is doing its job; spikes +// in deferred_not_whitelisted indicate user activity on novel tokens worth +// investigating. +type erc20CycleStats struct { + idempotentSkipped int + bidRetried int + orphaned int + upfrontAwarded int + upfrontNoProfit int + upfrontSubThreshold int + upfrontFuelFailed int + deferredNotWhitelisted int + deferredNoChainlink int + deferredInvalidEvent int + deferredNoTokenDecimals int + deferredOtherReason int + deferredAwarded int + deferredBadSurplus int +} + +func (s *erc20CycleStats) noteDeferralSource(source string) { + switch source { + case "deferred:not_whitelisted": + s.deferredNotWhitelisted++ + case "deferred:no_chainlink": + s.deferredNoChainlink++ + case "deferred:invalid_event": + s.deferredInvalidEvent++ + case "deferred:no_token_decim": + s.deferredNoTokenDecimals++ + default: + s.deferredOtherReason++ + } +} + +func (s *erc20CycleStats) total() int { + return s.idempotentSkipped + s.bidRetried + s.orphaned + + s.upfrontAwarded + s.upfrontNoProfit + s.upfrontSubThreshold + s.upfrontFuelFailed + + s.deferredNotWhitelisted + s.deferredNoChainlink + s.deferredInvalidEvent + + s.deferredNoTokenDecimals + s.deferredOtherReason + s.deferredAwarded + + s.deferredBadSurplus +} + +// processERC20Miles handles all pending erc20-output rows. Each row is routed +// either to the upfront-awarding path (when the priceOracle returns a value) +// or to the deferred batch path (legacy sweep-then-pro-rata logic, used for +// non-whitelisted output tokens and whitelisted tokens that lack a Chainlink +// feed). The deferred path is unchanged behavior — it still triggers its own +// sweep when the batch becomes profitable. The cadence-based sweep loop +// (sweep_loop.go) operates independently, sweeping accumulated balance from +// upfront-awarded rows once per cadence interval. func processERC20Miles(ctx context.Context, cfg *serviceConfig) (int, error) { processed := 0 + stats := erc20CycleStats{} rows, err := cfg.DB.QueryContext(ctx, ` -SELECT tx_hash, user_address, output_token, surplus, gas_cost, input_token, block_timestamp, miles +SELECT tx_hash, user_address, output_token, surplus, gas_cost, input_token, input_amount, user_amt_out, block_timestamp, miles FROM mevcommit_57173.fastswap_miles WHERE processed = false AND swap_type = 'erc20' @@ -309,7 +360,7 @@ WHERE processed = false var pending []erc20Row for rows.Next() { var r erc20Row - if err := rows.Scan(&r.txHash, &r.user, &r.token, &r.surplus, &r.gasCost, &r.inputToken, &r.blockTS, &r.miles); err != nil { + if err := rows.Scan(&r.txHash, &r.user, &r.token, &r.surplus, &r.gasCost, &r.inputToken, &r.inputAmt, &r.userAmtOut, &r.blockTS, &r.miles); err != nil { return processed, err } pending = append(pending, r) @@ -322,24 +373,6 @@ WHERE processed = false return processed, nil } - batches := make(map[string]*tokenBatch) - for _, r := range pending { - surplusWei, ok := new(big.Int).SetString(r.surplus, 10) - if !ok || surplusWei.Sign() <= 0 { - cfg.Logger.Warn("bad surplus", slog.String("surplus", r.surplus), slog.String("tx", r.txHash)) - continue - } - if _, exists := batches[r.token]; !exists { - batches[r.token] = &tokenBatch{ - Token: r.token, - TotalSum: big.NewInt(0), - Txs: make([]erc20Row, 0), - } - } - batches[r.token].TotalSum.Add(batches[r.token].TotalSum, surplusWei) - batches[r.token].Txs = append(batches[r.token].Txs, r) - } - allErc20Hashes := make([]string, len(pending)) for i, r := range pending { allErc20Hashes[i] = r.txHash @@ -347,243 +380,463 @@ WHERE processed = false erc20BidMap := batchLookupBidCosts(cfg.Logger, cfg.DB, allErc20Hashes) erc20FastRPCSet := batchCheckFastRPC(cfg.Logger, cfg.DB, allErc20Hashes) - for token, batch := range batches { - totalOriginalGasCost := big.NewInt(0) - totalOriginalBidCost := big.NewInt(0) - - // First pass: separate rows into ready, pending-bid, and not-in-fastrpc. - // Pending-bid rows are excluded from the batch so they retry next cycle. - var readyTxs []erc20Row - var readyGasCosts []*big.Int - var readyBidCosts []*big.Int - - for _, r := range batch.Txs { - // Already-settled guard runs BEFORE batch aggregation. If a row's - // surplus tokens were already swept on a prior run (miles is - // non-null), those tokens are no longer in the executor wallet. - // Including the row in readyTxs / readyTotalSum would make the new - // sweep quote for an amount the wallet can't supply, failing the - // batch or skewing pro-rata allocation for every other row. Skip - // entirely: just re-flip processed=true and preserve every other - // column (surplus_eth/net_profit_eth/bid_cost depend on the actual - // sweep result, which we can't recompute without re-sweeping). - if r.miles.Valid { - cfg.Logger.Info("erc20 tx already has miles recorded, excluding from sweep batch", + // First pass: route every row through bid/orphan checks; rows that survive + // either get awarded upfront (priceOracle eligible) or land in a per-token + // deferred bucket for the legacy sweep-time pro-rata path. + deferredByToken := make(map[string][]deferredErc20Row) + for _, r := range pending { + // Idempotency: a row with miles already recorded was settled on a + // prior run. Just re-flip processed and preserve all derived columns; + // we can't recompute surplus_eth / net_profit_eth without re-sweeping. + if r.miles.Valid { + cfg.Logger.Info("erc20 tx already has miles recorded, skipping", + slog.String("tx", r.txHash), slog.String("user", r.user), + slog.Int64("recorded_miles", r.miles.Int64)) + if !cfg.DryRun { + markProcessedFlagOnly(cfg.DB, r.txHash) + } + stats.idempotentSkipped++ + processed++ + continue + } + + userPaysGas := strings.EqualFold(r.inputToken, zeroAddr.Hex()) + bidCostWei := getBidCost(erc20BidMap, r.txHash) + if bidCostWei.Sign() == 0 { + txAge := time.Duration(0) + if r.blockTS.Valid { + txAge = time.Since(r.blockTS.Time) + } + inFastRPC := erc20FastRPCSet[strings.ToLower(r.txHash)] + + switch decideBidCheckOutcome(userPaysGas, inFastRPC, r.blockTS.Valid, txAge) { + case bidCheckProceed: + cfg.Logger.Info("erc20 tx in FastRPC but bid never indexed, processing with 0 bid cost", + slog.String("tx", r.txHash), slog.String("user", r.user)) + // fall through with bidCostWei = 0 + case bidCheckRetry: + cfg.Logger.Info("erc20 tx bid lookup pending, will retry next cycle", slog.String("tx", r.txHash), slog.String("user", r.user), - slog.Int64("recorded_miles", r.miles.Int64)) + slog.Bool("in_fastrpc", inFastRPC), + slog.Bool("user_pays_gas", userPaysGas), + slog.Duration("age", txAge)) + stats.bidRetried++ + continue + case bidCheckOrphan: + cfg.Logger.Info("erc20 tx not in FastRPC after retry window, skipping with 0 miles", + slog.String("tx", r.txHash), slog.String("user", r.user), + slog.Duration("age", txAge)) if !cfg.DryRun { - markProcessedFlagOnly(cfg.DB, r.txHash) + // surplus_eth=0 because raw surplus is in output-token + // units and weiToEth would yield nonsense for non-ETH + // outputs (e.g. PEPE 14,413 ETH). The orphan's surplus + // tokens stay in the executor wallet and become + // protocol revenue when the cadence sweep eventually + // fires for this token. + markProcessed(cfg.DB, r.txHash, 0, 0, 0, "0") } + stats.orphaned++ processed++ continue } + } - userPaysGas := strings.EqualFold(r.inputToken, zeroAddr.Hex()) - bidCostWei := getBidCost(erc20BidMap, r.txHash) - if bidCostWei.Sign() == 0 { - txAge := time.Duration(0) - if r.blockTS.Valid { - txAge = time.Since(r.blockTS.Time) - } - inFastRPC := erc20FastRPCSet[strings.ToLower(r.txHash)] - - switch decideBidCheckOutcome(userPaysGas, inFastRPC, r.blockTS.Valid, txAge) { - case bidCheckProceed: - cfg.Logger.Info("erc20 tx in FastRPC but bid never indexed, processing with 0 bid cost", - slog.String("tx", r.txHash), slog.String("user", r.user)) - // fall through with bidCostWei = 0 - case bidCheckRetry: - cfg.Logger.Info("erc20 tx bid lookup pending, will retry next cycle", - slog.String("tx", r.txHash), slog.String("user", r.user), - slog.Bool("in_fastrpc", inFastRPC), - slog.Bool("user_pays_gas", userPaysGas), - slog.Duration("age", txAge)) - continue - case bidCheckOrphan: - cfg.Logger.Info("erc20 tx not in FastRPC after retry window, skipping with 0 miles", - slog.String("tx", r.txHash), slog.String("user", r.user), - slog.Duration("age", txAge)) - if !cfg.DryRun { - // surplus_eth=0 because surplus is in raw output-token units; - // converting it via weiToEth (as the prior code did) yields a - // nonsense value for non-ETH outputs (e.g. PEPE 14,413 ETH). - markProcessed(cfg.DB, r.txHash, 0, 0, 0, "0") - } - processed++ - continue - } - } - - gasCostWei := big.NewInt(0) - if !userPaysGas && r.gasCost.Valid && r.gasCost.String != "" { - if gc, ok := new(big.Int).SetString(r.gasCost.String, 10); ok { - gasCostWei = gc - } + gasCostWei := big.NewInt(0) + // CRITICAL: ETH-input swaps (userPaysGas) MUST NOT subtract gas_cost. + // The user paid that L1 gas from their own wallet, not from protocol + // revenue. Mirroring this exactly is what the backtest validated as + // +18% miles to users; getting it wrong showed -12%. + if !userPaysGas && r.gasCost.Valid && r.gasCost.String != "" { + if gc, ok := new(big.Int).SetString(r.gasCost.String, 10); ok { + gasCostWei = gc } - - readyTxs = append(readyTxs, r) - readyGasCosts = append(readyGasCosts, gasCostWei) - readyBidCosts = append(readyBidCosts, bidCostWei) - - totalOriginalGasCost.Add(totalOriginalGasCost, gasCostWei) - totalOriginalBidCost.Add(totalOriginalBidCost, bidCostWei) } - if len(readyTxs) == 0 { + surplusWei, ok := new(big.Int).SetString(r.surplus, 10) + if !ok || surplusWei.Sign() <= 0 { + cfg.Logger.Warn("bad surplus", slog.String("surplus", r.surplus), slog.String("tx", r.txHash)) + stats.deferredBadSurplus++ continue } - // Recalculate TotalSum for only the ready rows - readyTotalSum := big.NewInt(0) - for _, r := range readyTxs { - surplusWei, _ := new(big.Int).SetString(r.surplus, 10) - readyTotalSum.Add(readyTotalSum, surplusWei) + // Try upfront pricing. nil priceOracle (e.g. dry-run misconfiguration) + // causes everything to defer — same as a non-whitelisted token. + if cfg.PriceOracle != nil { + outputAddr := common.HexToAddress(r.token) + inputAddr := common.HexToAddress(r.inputToken) + inputAmtBig, _ := new(big.Int).SetString(r.inputAmt, 10) + userAmtOutBig, _ := new(big.Int).SetString(r.userAmtOut, 10) + surplusEthWei, eligible, source := cfg.PriceOracle.PriceSurplusEth( + ctx, inputAddr, outputAddr, inputAmtBig, userAmtOutBig, surplusWei, + ) + if eligible { + outcome := awardUpfrontERC20Miles(ctx, cfg, r, surplusEthWei, gasCostWei, bidCostWei, source) + switch outcome { + case upfrontAwarded: + stats.upfrontAwarded++ + processed++ + case upfrontNoProfit: + stats.upfrontNoProfit++ + processed++ + case upfrontSubThreshold: + stats.upfrontSubThreshold++ + processed++ + case upfrontFuelFailed: + stats.upfrontFuelFailed++ + } + continue + } + cfg.Logger.Debug("erc20 row deferred to sweep-time pricing", + slog.String("tx", r.txHash), slog.String("token", r.token), + slog.String("reason", source)) + stats.noteDeferralSource(source) + } else { + stats.deferredOtherReason++ } - reqBody := barterRequest{ - Source: token, - Target: cfg.WETH.Hex(), - SellAmount: readyTotalSum.String(), - Recipient: cfg.ExecutorAddr.Hex(), - Origin: cfg.ExecutorAddr.Hex(), - MinReturnFraction: 0.98, - Deadline: fmt.Sprintf("%d", time.Now().Add(10*time.Minute).Unix()), - } + deferredByToken[r.token] = append(deferredByToken[r.token], deferredErc20Row{ + row: r, + gas: gasCostWei, + bid: bidCostWei, + surplus: surplusWei, + }) + } - barterResp, err := callBarter(ctx, cfg.HTTPClient, cfg.BarterURL, cfg.BarterKey, reqBody) + // Second pass: legacy sweep-then-pro-rata path for rows the priceOracle + // could not value upfront. Per-token batch, exactly the prior behavior. + for token, rows := range deferredByToken { + n, err := processDeferredERC20Batch(ctx, cfg, token, rows) if err != nil { - cfg.Logger.Warn("callBarter failed", slog.String("token", token), slog.Any("error", err)) - continue + cfg.Logger.Warn("deferred erc20 batch failed", + slog.String("token", token), slog.Any("error", err)) } + stats.deferredAwarded += n + processed += n + } - gasLimit, err := strconv.ParseUint(barterResp.GasLimit, 10, 64) - if err != nil { - cfg.Logger.Warn("invalid gasLimit from barter", slog.String("gasLimit", barterResp.GasLimit)) - continue - } - gasLimit += 50000 + // One-line cycle summary. Grep this in Groundcover to see at a glance + // whether the upfront path is working and what's flowing through each + // branch. Quiet cycles (nothing pending) emit nothing. + if stats.total() > 0 { + cfg.Logger.Info("erc20_cycle_summary", + slog.Int("total", stats.total()), + slog.Int("upfront_awarded", stats.upfrontAwarded), + slog.Int("upfront_no_profit", stats.upfrontNoProfit), + slog.Int("upfront_sub_threshold", stats.upfrontSubThreshold), + slog.Int("upfront_fuel_failed", stats.upfrontFuelFailed), + slog.Int("deferred_awarded", stats.deferredAwarded), + slog.Int("deferred_not_whitelisted", stats.deferredNotWhitelisted), + slog.Int("deferred_no_chainlink", stats.deferredNoChainlink), + slog.Int("deferred_no_token_decim", stats.deferredNoTokenDecimals), + slog.Int("deferred_invalid_event", stats.deferredInvalidEvent), + slog.Int("deferred_other", stats.deferredOtherReason), + slog.Int("deferred_bad_surplus", stats.deferredBadSurplus), + slog.Int("idempotent_skipped", stats.idempotentSkipped), + slog.Int("bid_retried", stats.bidRetried), + slog.Int("orphaned", stats.orphaned)) + } - gasPrice, err := cfg.Client.SuggestGasPrice(ctx) - if err != nil { - cfg.Logger.Warn("suggest gas price failed", slog.Any("error", err)) - continue + return processed, nil +} + +// deferredErc20Row carries the parsed wei values alongside the raw row, so +// the deferred batch processor doesn't have to re-parse strings. +type deferredErc20Row struct { + row erc20Row + gas *big.Int // wei; zero for ETH-input (userPaysGas) + bid *big.Int // wei + surplus *big.Int // raw output-token units +} + +// upfrontOutcome describes how awardUpfrontERC20Miles handled a row, used +// for cycle-summary stats. +type upfrontOutcome int + +const ( + // upfrontAwarded — positive miles posted to Fuel and row marked processed. + upfrontAwarded upfrontOutcome = iota + // upfrontNoProfit — net was non-positive; row settled with miles=0. + upfrontNoProfit + // upfrontSubThreshold — net positive but below the per-mile floor; + // row settled with miles=0. + upfrontSubThreshold + // upfrontFuelFailed — Fuel submission errored; row stays pending for + // retry next cycle. + upfrontFuelFailed +) + +// awardUpfrontERC20Miles applies the new design's per-row formula and posts +// miles immediately. +// +// net = surplus_eth - deductible_user_gas - user_bid - estimated_overhead +// miles = max(0, net × 0.9 / weiPerPoint) +// +// estimated_overhead comes from the per-token p25 of realized sweep overhead +// (costEstimator). The protocol absorbs variance between estimate and +// realized; reconciliationMonitor catches drift over weeks. +func awardUpfrontERC20Miles( + ctx context.Context, + cfg *serviceConfig, + r erc20Row, + surplusEthWei, gasCostWei, bidCostWei *big.Int, + pricingSource string, +) upfrontOutcome { + overheadFloat := costEstimateLastResort + overheadSource := "default_no_data" + if cfg.CostEstimator != nil { + est := cfg.CostEstimator.Get(r.token) + overheadFloat = est.PerRowOverhead + overheadSource = est.Source + } + overheadWei := ethFloatToWei(overheadFloat) + + netProfit := new(big.Int).Sub(surplusEthWei, gasCostWei) + netProfit.Sub(netProfit, bidCostWei) + netProfit.Sub(netProfit, overheadWei) + + surplusEth := weiToEth(surplusEthWei) + netProfitEth := weiToEth(netProfit) + + if netProfit.Sign() <= 0 { + cfg.Logger.Info("no upfront profit", + slog.String("tx", r.txHash), slog.String("user", r.user), + slog.String("token", r.token), + slog.String("pricing", pricingSource), + slog.String("overhead_src", overheadSource), + slog.Float64("surplus_eth", surplusEth), + slog.Float64("overhead_eth", overheadFloat), + slog.Float64("net_profit_eth", netProfitEth)) + if !cfg.DryRun { + markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, 0, bidCostWei.String()) } + return upfrontNoProfit + } - expectedGasCost := new(big.Int).Mul(big.NewInt(int64(gasLimit)), gasPrice) - expectedEthReturn, ok := new(big.Int).SetString(barterResp.MinReturn, 10) - if !ok { - cfg.Logger.Warn("invalid MinReturn from barter", slog.String("minReturn", barterResp.MinReturn)) - continue + userShare := new(big.Int).Mul(netProfit, big.NewInt(90)) + userShare.Div(userShare, big.NewInt(100)) + + miles := new(big.Int).Div(userShare, big.NewInt(weiPerPoint)) + if miles.Sign() <= 0 { + cfg.Logger.Info("sub-threshold upfront", + slog.String("tx", r.txHash), slog.String("user", r.user), + slog.String("token", r.token), + slog.String("pricing", pricingSource), + slog.Float64("surplus_eth", surplusEth), + slog.Float64("net_profit_eth", netProfitEth)) + if !cfg.DryRun { + markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, 0, bidCostWei.String()) } + return upfrontSubThreshold + } - totalSweepCosts := new(big.Int).Add(expectedGasCost, totalOriginalBidCost) - totalSweepCosts.Add(totalSweepCosts, totalOriginalGasCost) + cfg.Logger.Info("awarding upfront erc20 miles", + slog.Int64("miles", miles.Int64()), + slog.String("user", r.user), slog.String("tx", r.txHash), + slog.String("token", r.token), + slog.String("pricing", pricingSource), + slog.String("overhead_src", overheadSource), + slog.Float64("surplus_eth", surplusEth), + slog.Float64("overhead_eth", overheadFloat), + slog.Float64("net_profit_eth", netProfitEth)) + + if cfg.DryRun { + return upfrontAwarded + } - if expectedEthReturn.Cmp(totalSweepCosts) <= 0 { - cfg.Logger.Info("token sweep not yet profitable", - slog.String("token", token), - slog.Float64("return_eth", weiToEth(expectedEthReturn)), - slog.Float64("total_cost_eth", weiToEth(totalSweepCosts))) - continue - } + if err := submitToFuel(ctx, cfg.HTTPClient, cfg.FuelURL, cfg.FuelKey, + common.HexToAddress(r.user), + common.HexToHash(r.txHash), + miles, + ); err != nil { + cfg.Logger.Error("fuel submit failed, will retry next cycle", + slog.String("tx", r.txHash), slog.Any("error", err)) + return upfrontFuelFailed + } + markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, miles.Int64(), bidCostWei.String()) + return upfrontAwarded +} + +// processDeferredERC20Batch is the legacy sweep-then-pro-rata path for rows +// the priceOracle couldn't value upfront. Behavior matches the prior +// processERC20Miles batch logic exactly — per-token Barter quote, +// profitability gate that includes user gas and bid cost, sweep submission, +// pro-rata miles per row. +func processDeferredERC20Batch(ctx context.Context, cfg *serviceConfig, token string, rows []deferredErc20Row) (int, error) { + if len(rows) == 0 { + return 0, nil + } - var actualEthReturn *big.Int - var actualSwapGasCost *big.Int + processed := 0 + totalOriginalGasCost := big.NewInt(0) + totalOriginalBidCost := big.NewInt(0) + readyTotalSum := big.NewInt(0) + for _, d := range rows { + totalOriginalGasCost.Add(totalOriginalGasCost, d.gas) + totalOriginalBidCost.Add(totalOriginalBidCost, d.bid) + readyTotalSum.Add(readyTotalSum, d.surplus) + } - if cfg.DryRun { - cfg.Logger.Info("simulated sweep", - slog.String("amount", readyTotalSum.String()), - slog.String("token", token), - slog.Float64("return_eth", weiToEth(expectedEthReturn)), - slog.Float64("gas_eth", weiToEth(expectedGasCost))) - actualEthReturn = expectedEthReturn - actualSwapGasCost = expectedGasCost - } else { - actualEthReturn, actualSwapGasCost, err = submitFastSwapSweep(ctx, cfg.Logger, cfg.Client, cfg.L1Client, cfg.HTTPClient, cfg.Signer, cfg.ExecutorAddr, common.HexToAddress(token), readyTotalSum, cfg.FastSwapURL, cfg.FundsRecipient, cfg.SettlementAddr, barterResp, cfg.MaxGasGwei) - if err != nil { - cfg.Logger.Error("failed to sweep token", slog.String("token", token), slog.Any("error", err)) - continue - } - cfg.Logger.Info("FastSwap sweep success", + reqBody := barterRequest{ + Source: token, + Target: cfg.WETH.Hex(), + SellAmount: readyTotalSum.String(), + Recipient: cfg.ExecutorAddr.Hex(), + Origin: cfg.ExecutorAddr.Hex(), + MinReturnFraction: 0.98, + Deadline: fmt.Sprintf("%d", time.Now().Add(10*time.Minute).Unix()), + } + + barterResp, err := callBarter(ctx, cfg.HTTPClient, cfg.BarterURL, cfg.BarterKey, reqBody) + if err != nil { + // "Amount too low" is Barter's response when the input is below its + // per-token minimum and is the normal steady state for early/small + // batches — log at debug so it doesn't flood the warn stream. Other + // errors (timeouts, 5xx, malformed responses) still bubble up to the + // caller's warn log. + if isBarterAmountTooLow(err) { + cfg.Logger.Debug("deferred batch under barter minimum", slog.String("token", token), - slog.Float64("return_eth", weiToEth(actualEthReturn)), - slog.Float64("gas_eth", weiToEth(actualSwapGasCost))) + slog.Int("batch_size", len(rows)), + slog.String("amount", readyTotalSum.String())) + return processed, nil } + return processed, fmt.Errorf("callBarter: %w", err) + } - for i, r := range readyTxs { - surplusWei, _ := new(big.Int).SetString(r.surplus, 10) + gasLimit, err := strconv.ParseUint(barterResp.GasLimit, 10, 64) + if err != nil { + return processed, fmt.Errorf("invalid gasLimit %q: %w", barterResp.GasLimit, err) + } + gasLimit += 50000 - txGrossEth := new(big.Int).Mul(actualEthReturn, surplusWei) - txGrossEth.Div(txGrossEth, readyTotalSum) + gasPrice, err := cfg.Client.SuggestGasPrice(ctx) + if err != nil { + return processed, fmt.Errorf("suggest gas price: %w", err) + } - txOverheadGas := new(big.Int).Mul(actualSwapGasCost, surplusWei) - txOverheadGas.Div(txOverheadGas, readyTotalSum) + expectedGasCost := new(big.Int).Mul(big.NewInt(int64(gasLimit)), gasPrice) + expectedEthReturn, ok := new(big.Int).SetString(barterResp.MinReturn, 10) + if !ok { + return processed, fmt.Errorf("invalid MinReturn from barter: %s", barterResp.MinReturn) + } - txNetProfit := new(big.Int).Sub(txGrossEth, readyGasCosts[i]) - txNetProfit.Sub(txNetProfit, readyBidCosts[i]) - txNetProfit.Sub(txNetProfit, txOverheadGas) + totalSweepCosts := new(big.Int).Add(expectedGasCost, totalOriginalBidCost) + totalSweepCosts.Add(totalSweepCosts, totalOriginalGasCost) + + if expectedEthReturn.Cmp(totalSweepCosts) <= 0 { + // Debug-only: counts in erc20_cycle_summary's deferred_other_reason + // are the info-level signal that deferred rows are accumulating + // without being profitable. + cfg.Logger.Debug("deferred token sweep not yet profitable", + slog.String("token", token), + slog.Int("batch_size", len(rows)), + slog.Float64("return_eth", weiToEth(expectedEthReturn)), + slog.Float64("total_cost_eth", weiToEth(totalSweepCosts))) + return processed, nil + } - surplusEth := weiToEth(txGrossEth) - netProfitEth := weiToEth(txNetProfit) + var actualEthReturn, actualSwapGasCost *big.Int + if cfg.DryRun { + cfg.Logger.Info("simulated deferred sweep", + slog.String("amount", readyTotalSum.String()), + slog.String("token", token), + slog.Float64("return_eth", weiToEth(expectedEthReturn)), + slog.Float64("gas_eth", weiToEth(expectedGasCost))) + actualEthReturn = expectedEthReturn + actualSwapGasCost = expectedGasCost + } else { + actualEthReturn, actualSwapGasCost, err = submitFastSwapSweep(ctx, cfg.Logger, cfg.Client, cfg.L1Client, cfg.HTTPClient, cfg.Signer, cfg.ExecutorAddr, common.HexToAddress(token), readyTotalSum, cfg.FastSwapURL, cfg.FundsRecipient, cfg.SettlementAddr, barterResp, cfg.MaxGasGwei) + if err != nil { + return processed, fmt.Errorf("submit sweep: %w", err) + } + cfg.Logger.Info("deferred sweep success", + slog.String("token", token), + slog.Int("batch_size", len(rows)), + slog.Float64("return_eth", weiToEth(actualEthReturn)), + slog.Float64("gas_eth", weiToEth(actualSwapGasCost))) + // Cadence sweep loop should not double-sweep this token immediately; + // mark its clock now so it respects the cadence floor. + if cfg.SweepClock != nil { + cfg.SweepClock.MarkSwept(common.HexToAddress(token), time.Now()) + } + } - if txNetProfit.Sign() <= 0 { - cfg.Logger.Info("no profit for subset tx", - slog.String("tx", r.txHash), slog.String("user", r.user), - slog.Float64("gross_eth", surplusEth), slog.Float64("net_profit_eth", netProfitEth)) - if !cfg.DryRun { - markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, 0, readyBidCosts[i].String()) - } - processed++ - continue - } + for _, d := range rows { + txGrossEth := new(big.Int).Mul(actualEthReturn, d.surplus) + txGrossEth.Div(txGrossEth, readyTotalSum) - userShare := new(big.Int).Mul(txNetProfit, big.NewInt(90)) - userShare.Div(userShare, big.NewInt(100)) + txOverheadGas := new(big.Int).Mul(actualSwapGasCost, d.surplus) + txOverheadGas.Div(txOverheadGas, readyTotalSum) - miles := new(big.Int).Div(userShare, big.NewInt(weiPerPoint)) - if miles.Sign() <= 0 { - cfg.Logger.Info("sub-threshold subset tx", - slog.String("tx", r.txHash), slog.String("user", r.user), - slog.Float64("gross_eth", surplusEth), slog.Float64("net_profit_eth", netProfitEth)) - if !cfg.DryRun { - markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, 0, readyBidCosts[i].String()) - } - processed++ - continue - } + txNetProfit := new(big.Int).Sub(txGrossEth, d.gas) + txNetProfit.Sub(txNetProfit, d.bid) + txNetProfit.Sub(txNetProfit, txOverheadGas) - cfg.Logger.Info("awarding miles for subset tx", - slog.Int64("miles", miles.Int64()), slog.String("user", r.user), - slog.String("tx", r.txHash), slog.Float64("gross_eth", surplusEth), - slog.Float64("net_profit_eth", netProfitEth)) + surplusEth := weiToEth(txGrossEth) + netProfitEth := weiToEth(txNetProfit) - if cfg.DryRun { - processed++ - continue + if txNetProfit.Sign() <= 0 { + cfg.Logger.Info("no profit for deferred subset tx", + slog.String("tx", d.row.txHash), slog.String("user", d.row.user), + slog.Float64("gross_eth", surplusEth), slog.Float64("net_profit_eth", netProfitEth)) + if !cfg.DryRun { + markProcessed(cfg.DB, d.row.txHash, surplusEth, netProfitEth, 0, d.bid.String()) } + processed++ + continue + } - // Note: the miles-non-null idempotency check runs upstream, before - // batch aggregation — rows with miles already recorded are never - // in readyTxs here. - - err := submitToFuel(ctx, cfg.HTTPClient, cfg.FuelURL, cfg.FuelKey, - common.HexToAddress(r.user), - common.HexToHash(r.txHash), - miles, - ) - if err != nil { - cfg.Logger.Error("fuel submit failed, will retry next cycle", - slog.String("tx", r.txHash), slog.Any("error", err)) - continue // don't mark processed — retry next cycle + userShare := new(big.Int).Mul(txNetProfit, big.NewInt(90)) + userShare.Div(userShare, big.NewInt(100)) + miles := new(big.Int).Div(userShare, big.NewInt(weiPerPoint)) + if miles.Sign() <= 0 { + cfg.Logger.Info("sub-threshold deferred subset tx", + slog.String("tx", d.row.txHash), slog.String("user", d.row.user), + slog.Float64("gross_eth", surplusEth), slog.Float64("net_profit_eth", netProfitEth)) + if !cfg.DryRun { + markProcessed(cfg.DB, d.row.txHash, surplusEth, netProfitEth, 0, d.bid.String()) } - markProcessed(cfg.DB, r.txHash, surplusEth, netProfitEth, miles.Int64(), readyBidCosts[i].String()) processed++ + continue + } + + cfg.Logger.Info("awarding deferred miles for subset tx", + slog.Int64("miles", miles.Int64()), slog.String("user", d.row.user), + slog.String("tx", d.row.txHash), slog.Float64("gross_eth", surplusEth), + slog.Float64("net_profit_eth", netProfitEth)) + + if cfg.DryRun { + processed++ + continue + } + + if err := submitToFuel(ctx, cfg.HTTPClient, cfg.FuelURL, cfg.FuelKey, + common.HexToAddress(d.row.user), + common.HexToHash(d.row.txHash), + miles, + ); err != nil { + cfg.Logger.Error("fuel submit failed, will retry next cycle", + slog.String("tx", d.row.txHash), slog.Any("error", err)) + continue } + markProcessed(cfg.DB, d.row.txHash, surplusEth, netProfitEth, miles.Int64(), d.bid.String()) + processed++ } return processed, nil } +// ethFloatToWei converts a float ETH amount to wei (big.Int). Used to bring +// the cost-estimator's float overhead into the wei domain where the rest of +// the miles arithmetic lives. +func ethFloatToWei(eth float64) *big.Int { + wei, _ := new(big.Float).Mul(big.NewFloat(eth), big.NewFloat(1e18)).Int(nil) + if wei == nil { + return big.NewInt(0) + } + return wei +} + // -------------------- Bid Cost / FastRPC Lookups -------------------- func batchLookupBidCosts(logger *slog.Logger, db *sql.DB, txHashes []string) map[string]*big.Int {