Skip to content

Commit b8eb3d5

Browse files
authored
Put back into the pool (#827)
1 parent 7a618b8 commit b8eb3d5

File tree

5 files changed

+23
-0
lines changed

5 files changed

+23
-0
lines changed

lib/asyncwrite/asyncwrite.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ func (bfw *BackgroundWriter) writeWorker() {
3232
pool.Put(data)
3333
if writeErr != nil {
3434
err = writeErr
35+
// Drain remaining buffers from channel to avoid memory leak
36+
for remaining := range bfw.ch {
37+
pool.Put(remaining)
38+
}
3539
break
3640
}
3741
}
@@ -53,6 +57,7 @@ func (bfw *BackgroundWriter) Write(p []byte) (n int, err error) {
5357
case bfw.ch <- b:
5458
return len(b), nil
5559
case err := <-bfw.done:
60+
pool.Put(b)
5661
return 0, err
5762
}
5863
}

lib/ffi/cunative/decode_sdr.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ func Decode(replica, key io.Reader, out io.Writer) error {
121121
// Read replica
122122
rn, err := io.ReadFull(replica, rbuf)
123123
if err != nil && err != io.ErrUnexpectedEOF {
124+
pool.Put(rbuf)
125+
pool.Put(kbuf)
124126
if err == io.EOF {
125127
return
126128
}
@@ -131,11 +133,15 @@ func Decode(replica, key io.Reader, out io.Writer) error {
131133
// Read key
132134
kn, err := io.ReadFull(key, kbuf[:rn])
133135
if err != nil && err != io.ErrUnexpectedEOF {
136+
pool.Put(rbuf)
137+
pool.Put(kbuf)
134138
errChan <- err
135139
return
136140
}
137141

138142
if kn != rn {
143+
pool.Put(rbuf)
144+
pool.Put(kbuf)
139145
errChan <- io.ErrUnexpectedEOF
140146
return
141147
}

lib/ffi/cunative/decode_snap.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,8 @@ func DecodeSnap(spt abi.RegisteredSealProof, commD, commK cid.Cid, key, replica
122122
// Read replica
123123
rn, err := io.ReadFull(replica, rbuf)
124124
if err != nil && err != io.ErrUnexpectedEOF {
125+
pool.Put(rbuf)
126+
pool.Put(kbuf)
125127
if err == io.EOF {
126128
return
127129
}
@@ -132,11 +134,15 @@ func DecodeSnap(spt abi.RegisteredSealProof, commD, commK cid.Cid, key, replica
132134
// Read key
133135
kn, err := io.ReadFull(key, kbuf[:rn])
134136
if err != nil && err != io.ErrUnexpectedEOF {
137+
pool.Put(rbuf)
138+
pool.Put(kbuf)
135139
errChan <- err
136140
return
137141
}
138142

139143
if kn != rn {
144+
pool.Put(rbuf)
145+
pool.Put(kbuf)
140146
errChan <- io.ErrUnexpectedEOF
141147
return
142148
}

lib/pieceprovider/sector_reader.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,15 @@ func (p *SectorReader) ReadPiece(ctx context.Context, sector storiface.SectorRef
8585

8686
upr, err := fr32.NewUnpadReaderBuf(r, readPaddedSize, buf)
8787
if err != nil {
88+
pool.Put(buf)
8889
r.Close() // nolint
8990
return nil, xerrors.Errorf("creating unpadded reader: %w", err)
9091
}
9192

9293
bir := bufio.NewReaderSize(upr, 127)
9394
if startOffset > uint64(startOffsetAligned) {
9495
if _, err := bir.Discard(startOffsetDiff); err != nil {
96+
pool.Put(buf)
9597
r.Close() // nolint
9698
return nil, xerrors.Errorf("discarding bytes for startOffset: %w", err)
9799
}

tasks/pdp/task_prove.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/ethereum/go-ethereum/core/types"
1616
"github.com/ethereum/go-ethereum/ethclient"
1717
"github.com/ipfs/go-cid"
18+
pool "github.com/libp2p/go-buffer-pool"
1819
"github.com/minio/sha256-simd"
1920
"github.com/oklog/ulid"
2021
"github.com/samber/lo"
@@ -435,6 +436,7 @@ func (p *ProveTask) proveRoot(ctx context.Context, dataSetID int64, pieceID int6
435436
if err != nil {
436437
return contract.IPDPTypesProof{}, xerrors.Errorf("failed to build memtree: %w", err)
437438
}
439+
defer pool.Put(memTree)
438440
log.Debugw("provePiece", "rootChallengeOffset", rootChallengeOffset, "challengedLeaf", challengedLeaf)
439441

440442
mProof, err := proof.MemtreeProof(memTree, challengedLeaf)
@@ -519,6 +521,7 @@ func (p *ProveTask) proveRoot(ctx context.Context, dataSetID int64, pieceID int6
519521
if err != nil {
520522
return contract.IPDPTypesProof{}, xerrors.Errorf("failed to build memtree: %w", err)
521523
}
524+
defer pool.Put(memtree)
522525

523526
// Get challenge leaf in subTree
524527
subTreeChallenge := challengedLeaf - startLeaf
@@ -552,6 +555,7 @@ func (p *ProveTask) proveRoot(ctx context.Context, dataSetID int64, pieceID int6
552555
if err != nil {
553556
return contract.IPDPTypesProof{}, xerrors.Errorf("failed to build memtree from snapshot: %w", err)
554557
}
558+
defer pool.Put(mtree)
555559

556560
// Generate merkle proof from snapShot node to commP
557561
proofs, err := proof.MemtreeProof(mtree, snapshotNodeIndex)

0 commit comments

Comments
 (0)