Skip to content

Commit 408aad8

Browse files
authored
Util: Replace BulkIndexerItem body field with a ReadSeeker to improve memory consumption (#315) (#383)
1 parent ff56749 commit 408aad8

File tree

1 file changed

+2
-20
lines changed

1 file changed

+2
-20
lines changed

esutil/bulk_indexer.go

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"encoding/json"
2424
"fmt"
2525
"io"
26-
"io/ioutil"
2726
"net/http"
2827
"runtime"
2928
"strconv"
@@ -108,7 +107,7 @@ type BulkIndexerItem struct {
108107
Routing string
109108
Version *int64
110109
VersionType string
111-
Body io.Reader
110+
Body io.ReadSeeker
112111
RetryOnConflict *int
113112

114113
OnSuccess func(context.Context, BulkIndexerItem, BulkIndexerResponseItem) // Per item
@@ -453,30 +452,14 @@ func (w *worker) writeMeta(item BulkIndexerItem) error {
453452
//
454453
func (w *worker) writeBody(item *BulkIndexerItem) error {
455454
if item.Body != nil {
456-
457-
var getBody func() io.Reader
458-
459-
if item.OnSuccess != nil || item.OnFailure != nil {
460-
var buf bytes.Buffer
461-
buf.ReadFrom(item.Body)
462-
getBody = func() io.Reader {
463-
r := buf
464-
return ioutil.NopCloser(&r)
465-
}
466-
item.Body = getBody()
467-
}
468-
469455
if _, err := w.buf.ReadFrom(item.Body); err != nil {
470456
if w.bi.config.OnError != nil {
471457
w.bi.config.OnError(context.Background(), err)
472458
}
473459
return err
474460
}
461+
item.Body.Seek(0, io.SeekStart)
475462
w.buf.WriteRune('\n')
476-
477-
if getBody != nil && (item.OnSuccess != nil || item.OnFailure != nil) {
478-
item.Body = getBody()
479-
}
480463
}
481464
return nil
482465
}
@@ -540,7 +523,6 @@ func (w *worker) flush(ctx context.Context) error {
540523
}
541524
req.Header.Set(elasticsearch.HeaderClientMeta, "h=bp")
542525

543-
544526
res, err := req.Do(ctx, w.bi.config.Client)
545527
if err != nil {
546528
atomic.AddUint64(&w.bi.stats.numFailed, uint64(len(w.items)))

0 commit comments

Comments
 (0)