Skip to content

Commit ee791bf

Browse files
[BulkIndexer] reduce memory consumption (#558) (#560)
* Improve BulkIndexer memory consumption * Flush instantly oversized payloads Co-authored-by: Laurent Saint-Félix <laurent.saintfelix@elastic.co>
1 parent a67e916 commit ee791bf

File tree

3 files changed

+168
-84
lines changed

3 files changed

+168
-84
lines changed

esutil/bulk_indexer.go

Lines changed: 121 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import (
3535
)
3636

3737
// BulkIndexer represents a parallel, asynchronous, efficient indexer for Elasticsearch.
38-
//
3938
type BulkIndexer interface {
4039
// Add adds an item to the indexer. It returns an error when the item cannot be added.
4140
// Use the OnSuccess and OnFailure callbacks to get the operation result for the item.
@@ -54,7 +53,6 @@ type BulkIndexer interface {
5453
}
5554

5655
// BulkIndexerConfig represents configuration of the indexer.
57-
//
5856
type BulkIndexerConfig struct {
5957
NumWorkers int // The number of workers. Defaults to runtime.NumCPU().
6058
FlushBytes int // The flush threshold in bytes. Defaults to 5MB.
@@ -86,7 +84,6 @@ type BulkIndexerConfig struct {
8684
}
8785

8886
// BulkIndexerStats represents the indexer statistics.
89-
//
9087
type BulkIndexerStats struct {
9188
NumAdded uint64
9289
NumFlushed uint64
@@ -99,7 +96,6 @@ type BulkIndexerStats struct {
9996
}
10097

10198
// BulkIndexerItem represents an indexer item.
102-
//
10399
type BulkIndexerItem struct {
104100
Index string
105101
Action string
@@ -109,21 +105,106 @@ type BulkIndexerItem struct {
109105
VersionType string
110106
Body io.ReadSeeker
111107
RetryOnConflict *int
108+
meta bytes.Buffer // Item metadata header
109+
payloadLength int // Item payload total length metadata+newline+body length
112110

113111
OnSuccess func(context.Context, BulkIndexerItem, BulkIndexerResponseItem) // Per item
114112
OnFailure func(context.Context, BulkIndexerItem, BulkIndexerResponseItem, error) // Per item
115113
}
116114

115+
// marshallMeta format as JSON the item metadata.
116+
func (item *BulkIndexerItem) marshallMeta() {
117+
var aux []byte
118+
item.meta.WriteRune('{')
119+
aux = strconv.AppendQuote(aux, item.Action)
120+
item.meta.Write(aux)
121+
aux = nil
122+
item.meta.WriteRune(':')
123+
item.meta.WriteRune('{')
124+
if item.DocumentID != "" {
125+
item.meta.WriteString(`"_id":`)
126+
aux = strconv.AppendQuote(aux, item.DocumentID)
127+
item.meta.Write(aux)
128+
aux = nil
129+
}
130+
131+
if item.DocumentID != "" && item.Version != nil {
132+
item.meta.WriteRune(',')
133+
item.meta.WriteString(`"version":`)
134+
item.meta.WriteString(strconv.FormatInt(*item.Version, 10))
135+
}
136+
137+
if item.DocumentID != "" && item.VersionType != "" {
138+
item.meta.WriteRune(',')
139+
item.meta.WriteString(`"version_type":`)
140+
aux = strconv.AppendQuote(aux, item.VersionType)
141+
item.meta.Write(aux)
142+
aux = nil
143+
}
144+
145+
if item.Routing != "" {
146+
if item.DocumentID != "" {
147+
item.meta.WriteRune(',')
148+
}
149+
item.meta.WriteString(`"routing":`)
150+
aux = strconv.AppendQuote(aux, item.Routing)
151+
item.meta.Write(aux)
152+
aux = nil
153+
}
154+
if item.Index != "" {
155+
if item.DocumentID != "" || item.Routing != "" {
156+
item.meta.WriteRune(',')
157+
}
158+
item.meta.WriteString(`"_index":`)
159+
aux = strconv.AppendQuote(aux, item.Index)
160+
item.meta.Write(aux)
161+
aux = nil
162+
}
163+
if item.RetryOnConflict != nil && item.Action == "update" {
164+
if item.DocumentID != "" || item.Routing != "" || item.Index != "" {
165+
item.meta.WriteString(",")
166+
}
167+
item.meta.WriteString(`"retry_on_conflict":`)
168+
aux = strconv.AppendInt(aux, int64(*item.RetryOnConflict), 10)
169+
item.meta.Write(aux)
170+
aux = nil
171+
}
172+
item.meta.WriteRune('}')
173+
item.meta.WriteRune('}')
174+
item.meta.WriteRune('\n')
175+
}
176+
177+
// computeLength calculate the size of the body and the metadata.
178+
func (item *BulkIndexerItem) computeLength() error {
179+
if item.Body != nil {
180+
var buf bytes.Buffer
181+
_, err := io.Copy(&buf, item.Body)
182+
if err != nil {
183+
return err
184+
}
185+
186+
_, err = item.Body.Seek(0, io.SeekStart)
187+
if err != nil {
188+
return err
189+
}
190+
item.payloadLength = buf.Len()
191+
return nil
192+
}
193+
item.payloadLength += len(item.meta.Bytes())
194+
// Add two bytes to account for newlines.
195+
item.payloadLength += 2
196+
197+
return nil
198+
}
199+
117200
// BulkIndexerResponse represents the Elasticsearch response.
118-
//
119201
type BulkIndexerResponse struct {
120202
Took int `json:"took"`
121203
HasErrors bool `json:"errors"`
122204
Items []map[string]BulkIndexerResponseItem `json:"items,omitempty"`
123205
}
124206

125207
// BulkIndexerResponseItem represents the Elasticsearch response item.
126-
//
127208
type BulkIndexerResponseItem struct {
128209
Index string `json:"_index"`
129210
DocumentID string `json:"_id"`
@@ -150,13 +231,11 @@ type BulkIndexerResponseItem struct {
150231
}
151232

152233
// BulkResponseJSONDecoder defines the interface for custom JSON decoders.
153-
//
154234
type BulkResponseJSONDecoder interface {
155235
UnmarshalFromReader(io.Reader, *BulkIndexerResponse) error
156236
}
157237

158238
// BulkIndexerDebugLogger defines the interface for a debugging logger.
159-
//
160239
type BulkIndexerDebugLogger interface {
161240
Printf(string, ...interface{})
162241
}
@@ -184,7 +263,6 @@ type bulkIndexerStats struct {
184263
}
185264

186265
// NewBulkIndexer creates a new bulk indexer.
187-
//
188266
func NewBulkIndexer(cfg BulkIndexerConfig) (BulkIndexer, error) {
189267
if cfg.Client == nil {
190268
cfg.Client, _ = elasticsearch.NewDefaultClient()
@@ -220,10 +298,16 @@ func NewBulkIndexer(cfg BulkIndexerConfig) (BulkIndexer, error) {
220298
// Add adds an item to the indexer.
221299
//
222300
// Adding an item after a call to Close() will panic.
223-
//
224301
func (bi *bulkIndexer) Add(ctx context.Context, item BulkIndexerItem) error {
225302
atomic.AddUint64(&bi.stats.numAdded, 1)
226303

304+
// Serialize metadata to JSON
305+
item.marshallMeta()
306+
// Compute length for body & metadata
307+
if err := item.computeLength(); err != nil {
308+
return err
309+
}
310+
227311
select {
228312
case <-ctx.Done():
229313
if bi.config.OnError != nil {
@@ -238,7 +322,6 @@ func (bi *bulkIndexer) Add(ctx context.Context, item BulkIndexerItem) error {
238322

239323
// Close stops the periodic flush, closes the indexer queue channel,
240324
// notifies the done channel and calls flush on all writers.
241-
//
242325
func (bi *bulkIndexer) Close(ctx context.Context) error {
243326
bi.ticker.Stop()
244327
close(bi.queue)
@@ -271,7 +354,6 @@ func (bi *bulkIndexer) Close(ctx context.Context) error {
271354
}
272355

273356
// Stats returns indexer statistics.
274-
//
275357
func (bi *bulkIndexer) Stats() BulkIndexerStats {
276358
return BulkIndexerStats{
277359
NumAdded: atomic.LoadUint64(&bi.stats.numAdded),
@@ -286,7 +368,6 @@ func (bi *bulkIndexer) Stats() BulkIndexerStats {
286368
}
287369

288370
// init initializes the bulk indexer.
289-
//
290371
func (bi *bulkIndexer) init() {
291372
bi.queue = make(chan BulkIndexerItem, bi.config.NumWorkers)
292373

@@ -332,7 +413,6 @@ func (bi *bulkIndexer) init() {
332413
}
333414

334415
// worker represents an indexer worker.
335-
//
336416
type worker struct {
337417
id int
338418
ch <-chan BulkIndexerItem
@@ -344,7 +424,6 @@ type worker struct {
344424
}
345425

346426
// run launches the worker in a goroutine.
347-
//
348427
func (w *worker) run() {
349428
go func() {
350429
ctx := context.Background()
@@ -361,7 +440,18 @@ func (w *worker) run() {
361440
w.bi.config.DebugLogger.Printf("[worker-%03d] Received item [%s:%s]\n", w.id, item.Action, item.DocumentID)
362441
}
363442

364-
if err := w.writeMeta(item); err != nil {
443+
oversizePayload := w.bi.config.FlushBytes <= item.payloadLength
444+
if !oversizePayload && w.buf.Len() > 0 && w.buf.Len()+item.payloadLength >= w.bi.config.FlushBytes {
445+
if err := w.flush(ctx); err != nil {
446+
w.mu.Unlock()
447+
if w.bi.config.OnError != nil {
448+
w.bi.config.OnError(ctx, err)
449+
}
450+
continue
451+
}
452+
}
453+
454+
if err := w.writeMeta(&item); err != nil {
365455
if item.OnFailure != nil {
366456
item.OnFailure(ctx, item, BulkIndexerResponseItem{}, err)
367457
}
@@ -380,7 +470,11 @@ func (w *worker) run() {
380470
}
381471

382472
w.items = append(w.items, item)
383-
if w.buf.Len() >= w.bi.config.FlushBytes {
473+
// Should the item payload exceed the configured FlushBytes flush happens instantly.
474+
if oversizePayload {
475+
if w.bi.config.DebugLogger != nil {
476+
w.bi.config.DebugLogger.Printf("[worker-%03d] Oversize Payload in item [%s:%s]\n", w.id, item.Action, item.DocumentID)
477+
}
384478
if err := w.flush(ctx); err != nil {
385479
w.mu.Unlock()
386480
if w.bi.config.OnError != nil {
@@ -394,71 +488,15 @@ func (w *worker) run() {
394488
}()
395489
}
396490

397-
// writeMeta formats and writes the item metadata to the buffer; it must be called under a lock.
398-
//
399-
func (w *worker) writeMeta(item BulkIndexerItem) error {
400-
w.buf.WriteRune('{')
401-
w.aux = strconv.AppendQuote(w.aux, item.Action)
402-
w.buf.Write(w.aux)
403-
w.aux = w.aux[:0]
404-
w.buf.WriteRune(':')
405-
w.buf.WriteRune('{')
406-
if item.DocumentID != "" {
407-
w.buf.WriteString(`"_id":`)
408-
w.aux = strconv.AppendQuote(w.aux, item.DocumentID)
409-
w.buf.Write(w.aux)
410-
w.aux = w.aux[:0]
491+
// writeMeta writes the item metadata to the buffer; it must be called under a lock.
492+
func (w *worker) writeMeta(item *BulkIndexerItem) error {
493+
if _, err := w.buf.Write(item.meta.Bytes()); err != nil {
494+
return err
411495
}
412-
413-
if item.DocumentID != "" && item.Version != nil {
414-
w.buf.WriteRune(',')
415-
w.buf.WriteString(`"version":`)
416-
w.buf.WriteString(strconv.FormatInt(*item.Version, 10))
417-
}
418-
419-
if item.DocumentID != "" && item.VersionType != "" {
420-
w.buf.WriteRune(',')
421-
w.buf.WriteString(`"version_type":`)
422-
w.aux = strconv.AppendQuote(w.aux, item.VersionType)
423-
w.buf.Write(w.aux)
424-
w.aux = w.aux[:0]
425-
}
426-
427-
if item.Routing != "" {
428-
if item.DocumentID != "" {
429-
w.buf.WriteRune(',')
430-
}
431-
w.buf.WriteString(`"routing":`)
432-
w.aux = strconv.AppendQuote(w.aux, item.Routing)
433-
w.buf.Write(w.aux)
434-
w.aux = w.aux[:0]
435-
}
436-
if item.Index != "" {
437-
if item.DocumentID != "" || item.Routing != "" {
438-
w.buf.WriteRune(',')
439-
}
440-
w.buf.WriteString(`"_index":`)
441-
w.aux = strconv.AppendQuote(w.aux, item.Index)
442-
w.buf.Write(w.aux)
443-
w.aux = w.aux[:0]
444-
}
445-
if item.RetryOnConflict != nil && item.Action == "update" {
446-
if item.DocumentID != "" || item.Routing != "" || item.Index != "" {
447-
w.buf.WriteString(",")
448-
}
449-
w.buf.WriteString(`"retry_on_conflict":`)
450-
w.aux = strconv.AppendInt(w.aux, int64(*item.RetryOnConflict), 10)
451-
w.buf.Write(w.aux)
452-
w.aux = w.aux[:0]
453-
}
454-
w.buf.WriteRune('}')
455-
w.buf.WriteRune('}')
456-
w.buf.WriteRune('\n')
457496
return nil
458497
}
459498

460499
// writeBody writes the item body to the buffer; it must be called under a lock.
461-
//
462500
func (w *worker) writeBody(item *BulkIndexerItem) error {
463501
if item.Body != nil {
464502
if _, err := w.buf.ReadFrom(item.Body); err != nil {
@@ -474,7 +512,6 @@ func (w *worker) writeBody(item *BulkIndexerItem) error {
474512
}
475513

476514
// flush writes out the worker buffer; it must be called under a lock.
477-
//
478515
func (w *worker) flush(ctx context.Context) error {
479516
if w.bi.config.OnFlushStart != nil {
480517
ctx = w.bi.config.OnFlushStart(ctx)
@@ -497,8 +534,12 @@ func (w *worker) flush(ctx context.Context) error {
497534
)
498535

499536
defer func() {
500-
w.items = w.items[:0]
501-
w.buf.Reset()
537+
w.items = nil
538+
if w.buf.Cap() > w.bi.config.FlushBytes {
539+
w.buf = bytes.NewBuffer(make([]byte, 0, w.bi.config.FlushBytes))
540+
} else {
541+
w.buf.Reset()
542+
}
502543
}()
503544

504545
if w.bi.config.DebugLogger != nil {

esutil/bulk_indexer_integration_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ func TestBulkIndexerIntegration(t *testing.T) {
7272
bi, _ := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
7373
Index: indexName,
7474
Client: es,
75-
// FlushBytes: 3e+6,
7675
})
7776

7877
numItems := 100000

0 commit comments

Comments
 (0)