Skip to content

Commit ef641c1

Browse files
[BulkIndexer] reduce memory consumption (#558) (#559)
* Improve BulkIndexer memory consumption * Flush instantly oversized payloads Co-authored-by: Laurent Saint-Félix <laurent.saintfelix@elastic.co>
1 parent dd16589 commit ef641c1

File tree

3 files changed

+168
-84
lines changed

3 files changed

+168
-84
lines changed

esutil/bulk_indexer.go

Lines changed: 121 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import (
3737
)
3838

3939
// BulkIndexer represents a parallel, asynchronous, efficient indexer for Elasticsearch.
40-
//
4140
type BulkIndexer interface {
4241
// Add adds an item to the indexer. It returns an error when the item cannot be added.
4342
// Use the OnSuccess and OnFailure callbacks to get the operation result for the item.
@@ -56,7 +55,6 @@ type BulkIndexer interface {
5655
}
5756

5857
// BulkIndexerConfig represents configuration of the indexer.
59-
//
6058
type BulkIndexerConfig struct {
6159
NumWorkers int // The number of workers. Defaults to runtime.NumCPU().
6260
FlushBytes int // The flush threshold in bytes. Defaults to 5MB.
@@ -88,7 +86,6 @@ type BulkIndexerConfig struct {
8886
}
8987

9088
// BulkIndexerStats represents the indexer statistics.
91-
//
9289
type BulkIndexerStats struct {
9390
NumAdded uint64
9491
NumFlushed uint64
@@ -101,7 +98,6 @@ type BulkIndexerStats struct {
10198
}
10299

103100
// BulkIndexerItem represents an indexer item.
104-
//
105101
type BulkIndexerItem struct {
106102
Index string
107103
Action string
@@ -111,21 +107,106 @@ type BulkIndexerItem struct {
111107
VersionType string
112108
Body io.Reader
113109
RetryOnConflict *int
110+
meta bytes.Buffer // Item metadata header
111+
payloadLength int // Item payload total length metadata+newline+body length
114112

115113
OnSuccess func(context.Context, BulkIndexerItem, BulkIndexerResponseItem) // Per item
116114
OnFailure func(context.Context, BulkIndexerItem, BulkIndexerResponseItem, error) // Per item
117115
}
118116

117+
// marshallMeta format as JSON the item metadata.
118+
func (item *BulkIndexerItem) marshallMeta() {
119+
var aux []byte
120+
item.meta.WriteRune('{')
121+
aux = strconv.AppendQuote(aux, item.Action)
122+
item.meta.Write(aux)
123+
aux = nil
124+
item.meta.WriteRune(':')
125+
item.meta.WriteRune('{')
126+
if item.DocumentID != "" {
127+
item.meta.WriteString(`"_id":`)
128+
aux = strconv.AppendQuote(aux, item.DocumentID)
129+
item.meta.Write(aux)
130+
aux = nil
131+
}
132+
133+
if item.DocumentID != "" && item.Version != nil {
134+
item.meta.WriteRune(',')
135+
item.meta.WriteString(`"version":`)
136+
item.meta.WriteString(strconv.FormatInt(*item.Version, 10))
137+
}
138+
139+
if item.DocumentID != "" && item.VersionType != "" {
140+
item.meta.WriteRune(',')
141+
item.meta.WriteString(`"version_type":`)
142+
aux = strconv.AppendQuote(aux, item.VersionType)
143+
item.meta.Write(aux)
144+
aux = nil
145+
}
146+
147+
if item.Routing != "" {
148+
if item.DocumentID != "" {
149+
item.meta.WriteRune(',')
150+
}
151+
item.meta.WriteString(`"routing":`)
152+
aux = strconv.AppendQuote(aux, item.Routing)
153+
item.meta.Write(aux)
154+
aux = nil
155+
}
156+
if item.Index != "" {
157+
if item.DocumentID != "" || item.Routing != "" {
158+
item.meta.WriteRune(',')
159+
}
160+
item.meta.WriteString(`"_index":`)
161+
aux = strconv.AppendQuote(aux, item.Index)
162+
item.meta.Write(aux)
163+
aux = nil
164+
}
165+
if item.RetryOnConflict != nil && item.Action == "update" {
166+
if item.DocumentID != "" || item.Routing != "" || item.Index != "" {
167+
item.meta.WriteString(",")
168+
}
169+
item.meta.WriteString(`"retry_on_conflict":`)
170+
aux = strconv.AppendInt(aux, int64(*item.RetryOnConflict), 10)
171+
item.meta.Write(aux)
172+
aux = nil
173+
}
174+
item.meta.WriteRune('}')
175+
item.meta.WriteRune('}')
176+
item.meta.WriteRune('\n')
177+
}
178+
179+
// computeLength calculate the size of the body and the metadata.
180+
func (item *BulkIndexerItem) computeLength() error {
181+
if item.Body != nil {
182+
var buf bytes.Buffer
183+
_, err := io.Copy(&buf, item.Body)
184+
if err != nil {
185+
return err
186+
}
187+
188+
_, err = item.Body.Seek(0, io.SeekStart)
189+
if err != nil {
190+
return err
191+
}
192+
item.payloadLength = buf.Len()
193+
return nil
194+
}
195+
item.payloadLength += len(item.meta.Bytes())
196+
// Add two bytes to account for newlines.
197+
item.payloadLength += 2
198+
199+
return nil
200+
}
201+
119202
// BulkIndexerResponse represents the Elasticsearch response.
120-
//
121203
type BulkIndexerResponse struct {
122204
Took int `json:"took"`
123205
HasErrors bool `json:"errors"`
124206
Items []map[string]BulkIndexerResponseItem `json:"items,omitempty"`
125207
}
126208

127209
// BulkIndexerResponseItem represents the Elasticsearch response item.
128-
//
129210
type BulkIndexerResponseItem struct {
130211
Index string `json:"_index"`
131212
DocumentID string `json:"_id"`
@@ -152,13 +233,11 @@ type BulkIndexerResponseItem struct {
152233
}
153234

154235
// BulkResponseJSONDecoder defines the interface for custom JSON decoders.
155-
//
156236
type BulkResponseJSONDecoder interface {
157237
UnmarshalFromReader(io.Reader, *BulkIndexerResponse) error
158238
}
159239

160240
// BulkIndexerDebugLogger defines the interface for a debugging logger.
161-
//
162241
type BulkIndexerDebugLogger interface {
163242
Printf(string, ...interface{})
164243
}
@@ -186,7 +265,6 @@ type bulkIndexerStats struct {
186265
}
187266

188267
// NewBulkIndexer creates a new bulk indexer.
189-
//
190268
func NewBulkIndexer(cfg BulkIndexerConfig) (BulkIndexer, error) {
191269
if cfg.Client == nil {
192270
cfg.Client, _ = elasticsearch.NewDefaultClient()
@@ -222,10 +300,16 @@ func NewBulkIndexer(cfg BulkIndexerConfig) (BulkIndexer, error) {
222300
// Add adds an item to the indexer.
223301
//
224302
// Adding an item after a call to Close() will panic.
225-
//
226303
func (bi *bulkIndexer) Add(ctx context.Context, item BulkIndexerItem) error {
227304
atomic.AddUint64(&bi.stats.numAdded, 1)
228305

306+
// Serialize metadata to JSON
307+
item.marshallMeta()
308+
// Compute length for body & metadata
309+
if err := item.computeLength(); err != nil {
310+
return err
311+
}
312+
229313
select {
230314
case <-ctx.Done():
231315
if bi.config.OnError != nil {
@@ -240,7 +324,6 @@ func (bi *bulkIndexer) Add(ctx context.Context, item BulkIndexerItem) error {
240324

241325
// Close stops the periodic flush, closes the indexer queue channel,
242326
// notifies the done channel and calls flush on all writers.
243-
//
244327
func (bi *bulkIndexer) Close(ctx context.Context) error {
245328
bi.ticker.Stop()
246329
close(bi.queue)
@@ -273,7 +356,6 @@ func (bi *bulkIndexer) Close(ctx context.Context) error {
273356
}
274357

275358
// Stats returns indexer statistics.
276-
//
277359
func (bi *bulkIndexer) Stats() BulkIndexerStats {
278360
return BulkIndexerStats{
279361
NumAdded: atomic.LoadUint64(&bi.stats.numAdded),
@@ -288,7 +370,6 @@ func (bi *bulkIndexer) Stats() BulkIndexerStats {
288370
}
289371

290372
// init initializes the bulk indexer.
291-
//
292373
func (bi *bulkIndexer) init() {
293374
bi.queue = make(chan BulkIndexerItem, bi.config.NumWorkers)
294375

@@ -334,7 +415,6 @@ func (bi *bulkIndexer) init() {
334415
}
335416

336417
// worker represents an indexer worker.
337-
//
338418
type worker struct {
339419
id int
340420
ch <-chan BulkIndexerItem
@@ -346,7 +426,6 @@ type worker struct {
346426
}
347427

348428
// run launches the worker in a goroutine.
349-
//
350429
func (w *worker) run() {
351430
go func() {
352431
ctx := context.Background()
@@ -363,7 +442,18 @@ func (w *worker) run() {
363442
w.bi.config.DebugLogger.Printf("[worker-%03d] Received item [%s:%s]\n", w.id, item.Action, item.DocumentID)
364443
}
365444

366-
if err := w.writeMeta(item); err != nil {
445+
oversizePayload := w.bi.config.FlushBytes <= item.payloadLength
446+
if !oversizePayload && w.buf.Len() > 0 && w.buf.Len()+item.payloadLength >= w.bi.config.FlushBytes {
447+
if err := w.flush(ctx); err != nil {
448+
w.mu.Unlock()
449+
if w.bi.config.OnError != nil {
450+
w.bi.config.OnError(ctx, err)
451+
}
452+
continue
453+
}
454+
}
455+
456+
if err := w.writeMeta(&item); err != nil {
367457
if item.OnFailure != nil {
368458
item.OnFailure(ctx, item, BulkIndexerResponseItem{}, err)
369459
}
@@ -382,7 +472,11 @@ func (w *worker) run() {
382472
}
383473

384474
w.items = append(w.items, item)
385-
if w.buf.Len() >= w.bi.config.FlushBytes {
475+
// Should the item payload exceed the configured FlushBytes flush happens instantly.
476+
if oversizePayload {
477+
if w.bi.config.DebugLogger != nil {
478+
w.bi.config.DebugLogger.Printf("[worker-%03d] Oversize Payload in item [%s:%s]\n", w.id, item.Action, item.DocumentID)
479+
}
386480
if err := w.flush(ctx); err != nil {
387481
w.mu.Unlock()
388482
if w.bi.config.OnError != nil {
@@ -396,71 +490,15 @@ func (w *worker) run() {
396490
}()
397491
}
398492

399-
// writeMeta formats and writes the item metadata to the buffer; it must be called under a lock.
400-
//
401-
func (w *worker) writeMeta(item BulkIndexerItem) error {
402-
w.buf.WriteRune('{')
403-
w.aux = strconv.AppendQuote(w.aux, item.Action)
404-
w.buf.Write(w.aux)
405-
w.aux = w.aux[:0]
406-
w.buf.WriteRune(':')
407-
w.buf.WriteRune('{')
408-
if item.DocumentID != "" {
409-
w.buf.WriteString(`"_id":`)
410-
w.aux = strconv.AppendQuote(w.aux, item.DocumentID)
411-
w.buf.Write(w.aux)
412-
w.aux = w.aux[:0]
493+
// writeMeta writes the item metadata to the buffer; it must be called under a lock.
494+
func (w *worker) writeMeta(item *BulkIndexerItem) error {
495+
if _, err := w.buf.Write(item.meta.Bytes()); err != nil {
496+
return err
413497
}
414-
415-
if item.DocumentID != "" && item.Version != nil {
416-
w.buf.WriteRune(',')
417-
w.buf.WriteString(`"version":`)
418-
w.buf.WriteString(strconv.FormatInt(*item.Version, 10))
419-
}
420-
421-
if item.DocumentID != "" && item.VersionType != "" {
422-
w.buf.WriteRune(',')
423-
w.buf.WriteString(`"version_type":`)
424-
w.aux = strconv.AppendQuote(w.aux, item.VersionType)
425-
w.buf.Write(w.aux)
426-
w.aux = w.aux[:0]
427-
}
428-
429-
if item.Routing != "" {
430-
if item.DocumentID != "" {
431-
w.buf.WriteRune(',')
432-
}
433-
w.buf.WriteString(`"routing":`)
434-
w.aux = strconv.AppendQuote(w.aux, item.Routing)
435-
w.buf.Write(w.aux)
436-
w.aux = w.aux[:0]
437-
}
438-
if item.Index != "" {
439-
if item.DocumentID != "" || item.Routing != "" {
440-
w.buf.WriteRune(',')
441-
}
442-
w.buf.WriteString(`"_index":`)
443-
w.aux = strconv.AppendQuote(w.aux, item.Index)
444-
w.buf.Write(w.aux)
445-
w.aux = w.aux[:0]
446-
}
447-
if item.RetryOnConflict != nil && item.Action == "update" {
448-
if item.DocumentID != "" || item.Routing != "" || item.Index != "" {
449-
w.buf.WriteString(",")
450-
}
451-
w.buf.WriteString(`"retry_on_conflict":`)
452-
w.aux = strconv.AppendInt(w.aux, int64(*item.RetryOnConflict), 10)
453-
w.buf.Write(w.aux)
454-
w.aux = w.aux[:0]
455-
}
456-
w.buf.WriteRune('}')
457-
w.buf.WriteRune('}')
458-
w.buf.WriteRune('\n')
459498
return nil
460499
}
461500

462501
// writeBody writes the item body to the buffer; it must be called under a lock.
463-
//
464502
func (w *worker) writeBody(item *BulkIndexerItem) error {
465503
if item.Body != nil {
466504

@@ -492,7 +530,6 @@ func (w *worker) writeBody(item *BulkIndexerItem) error {
492530
}
493531

494532
// flush writes out the worker buffer; it must be called under a lock.
495-
//
496533
func (w *worker) flush(ctx context.Context) error {
497534
if w.bi.config.OnFlushStart != nil {
498535
ctx = w.bi.config.OnFlushStart(ctx)
@@ -515,8 +552,12 @@ func (w *worker) flush(ctx context.Context) error {
515552
)
516553

517554
defer func() {
518-
w.items = w.items[:0]
519-
w.buf.Reset()
555+
w.items = nil
556+
if w.buf.Cap() > w.bi.config.FlushBytes {
557+
w.buf = bytes.NewBuffer(make([]byte, 0, w.bi.config.FlushBytes))
558+
} else {
559+
w.buf.Reset()
560+
}
520561
}()
521562

522563
if w.bi.config.DebugLogger != nil {

esutil/bulk_indexer_integration_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ func TestBulkIndexerIntegration(t *testing.T) {
7272
bi, _ := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
7373
Index: indexName,
7474
Client: es,
75-
// FlushBytes: 3e+6,
7675
})
7776

7877
numItems := 100000

0 commit comments

Comments
 (0)