Skip to content

Commit 52325e9

Browse files
committed
fix(transaction): prevent concurrent map access in monitor
1 parent a54d62b commit 52325e9

File tree

1 file changed

+60
-22
lines changed

1 file changed

+60
-22
lines changed

pkg/transaction/monitor.go

Lines changed: 60 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ import (
1818
"github.com/ethersphere/bee/v2/pkg/log"
1919
)
2020

21-
var ErrTransactionCancelled = errors.New("transaction cancelled")
22-
var ErrMonitorClosed = errors.New("monitor closed")
21+
var (
22+
ErrTransactionCancelled = errors.New("transaction cancelled")
23+
ErrMonitorClosed = errors.New("monitor closed")
24+
)
2325

2426
// Monitor is a nonce-based watcher for transaction confirmations.
2527
// Instead of watching transactions individually, the senders nonce is monitored and transactions are checked based on this.
@@ -190,31 +192,56 @@ func watchStart(watches []transactionWatch) time.Time {
190192
return start
191193
}
192194

195+
// txToCheck holds snapshot data for a transaction that needs to be checked.
196+
// This allows releasing the lock during slow RPC calls.
197+
type txToCheck struct {
198+
nonce uint64
199+
txHash common.Hash
200+
watchStart time.Time
201+
}
202+
193203
// check pending checks the given block (number) for confirmed or cancelled transactions
194204
func (tm *transactionMonitor) checkPending(block uint64) error {
195-
confirmedNonces := make(map[uint64]*types.Receipt)
196-
var cancelledNonces []uint64
205+
// Phase 1: Snapshot the transactions we need to check under the lock.
206+
// This allows us to release the lock during slow RPC calls.
207+
tm.lock.Lock()
208+
var txsToCheck []txToCheck
209+
noncesToCheck := make(map[uint64]struct{})
197210
for nonceGroup, watchMap := range tm.watchesByNonce {
211+
noncesToCheck[nonceGroup] = struct{}{}
198212
for txHash, watches := range watchMap {
199-
receipt, err := tm.backend.TransactionReceipt(tm.ctx, txHash)
200-
if err != nil {
201-
// wait for a few blocks to be mined before considering a transaction not existing
202-
transactionWatchNotFoundTimeout := 5 * tm.pollingInterval
203-
if errors.Is(err, ethereum.NotFound) && watchStart(watches).Before(time.Now().Add(transactionWatchNotFoundTimeout)) {
204-
// if both err and receipt are nil, there is no receipt
205-
// the reason why we consider this only potentially cancelled is to catch cases where after a reorg the original transaction wins
206-
continue
207-
}
208-
return err
209-
}
210-
if receipt != nil {
211-
// if we have a receipt we have a confirmation
212-
confirmedNonces[nonceGroup] = receipt
213+
txsToCheck = append(txsToCheck, txToCheck{
214+
nonce: nonceGroup,
215+
txHash: txHash,
216+
watchStart: watchStart(watches),
217+
})
218+
}
219+
}
220+
tm.lock.Unlock()
221+
222+
// Phase 2: Make RPC calls without holding the lock.
223+
// TransactionReceipt and NonceAt can be slow (100-500ms each).
224+
confirmedNonces := make(map[uint64]*types.Receipt)
225+
for _, tx := range txsToCheck {
226+
receipt, err := tm.backend.TransactionReceipt(tm.ctx, tx.txHash)
227+
if err != nil {
228+
// wait for a few blocks to be mined before considering a transaction not existing
229+
transactionWatchNotFoundTimeout := 5 * tm.pollingInterval
230+
if errors.Is(err, ethereum.NotFound) && tx.watchStart.Before(time.Now().Add(transactionWatchNotFoundTimeout)) {
231+
// if both err and receipt are nil, there is no receipt
232+
// the reason why we consider this only potentially cancelled is to catch cases where after a reorg the original transaction wins
233+
continue
213234
}
235+
return err
236+
}
237+
if receipt != nil {
238+
// if we have a receipt we have a confirmation
239+
confirmedNonces[tx.nonce] = receipt
214240
}
215241
}
216242

217-
for nonceGroup := range tm.watchesByNonce {
243+
var cancelledNonces []uint64
244+
for nonceGroup := range noncesToCheck {
218245
if _, ok := confirmedNonces[nonceGroup]; ok {
219246
continue
220247
}
@@ -229,12 +256,18 @@ func (tm *transactionMonitor) checkPending(block uint64) error {
229256
}
230257
}
231258

232-
// notify the subscribers and remove watches for confirmed or cancelled transactions
259+
// Phase 3: Notify subscribers and cleanup under the lock.
233260
tm.lock.Lock()
234261
defer tm.lock.Unlock()
235262

263+
// notify the subscribers and remove watches for confirmed or cancelled transactions
236264
for nonce, receipt := range confirmedNonces {
237-
for txHash, watches := range tm.watchesByNonce[nonce] {
265+
watchMap, ok := tm.watchesByNonce[nonce]
266+
if !ok {
267+
// nonce was already processed (shouldn't happen but be defensive)
268+
continue
269+
}
270+
for txHash, watches := range watchMap {
238271
if receipt.TxHash == txHash {
239272
for _, watch := range watches {
240273
select {
@@ -255,7 +288,12 @@ func (tm *transactionMonitor) checkPending(block uint64) error {
255288
}
256289

257290
for _, nonce := range cancelledNonces {
258-
for _, watches := range tm.watchesByNonce[nonce] {
291+
watchMap, ok := tm.watchesByNonce[nonce]
292+
if !ok {
293+
// nonce was already processed
294+
continue
295+
}
296+
for _, watches := range watchMap {
259297
for _, watch := range watches {
260298
select {
261299
case watch.errC <- ErrTransactionCancelled:

0 commit comments

Comments
 (0)