Skip to content

Commit 8e72e36

Browse files
authored
[Bulk Indexer] Use one ticker per worker (#624)
* Use one ticker per worker * Reduce code duplication * Restart worker ticker after flush * Remove superfluous worker.aux variable
1 parent 837dc92 commit 8e72e36

File tree

2 files changed

+76
-106
lines changed

2 files changed

+76
-106
lines changed

esutil/bulk_indexer.go

Lines changed: 74 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -238,8 +238,6 @@ type bulkIndexer struct {
238238
wg sync.WaitGroup
239239
queue chan BulkIndexerItem
240240
workers []*worker
241-
ticker *time.Ticker
242-
done chan bool
243241
stats *bulkIndexerStats
244242

245243
config BulkIndexerConfig
@@ -280,7 +278,6 @@ func NewBulkIndexer(cfg BulkIndexerConfig) (BulkIndexer, error) {
280278

281279
bi := bulkIndexer{
282280
config: cfg,
283-
done: make(chan bool),
284281
stats: &bulkIndexerStats{},
285282
}
286283

@@ -315,11 +312,9 @@ func (bi *bulkIndexer) Add(ctx context.Context, item BulkIndexerItem) error {
315312
}
316313

317314
// Close stops the periodic flush, closes the indexer queue channel,
318-
// notifies the done channel and calls flush on all writers.
315+
// which triggers the workers to flush and stop.
319316
func (bi *bulkIndexer) Close(ctx context.Context) error {
320-
bi.ticker.Stop()
321317
close(bi.queue)
322-
bi.done <- true
323318

324319
select {
325320
case <-ctx.Done():
@@ -331,19 +326,6 @@ func (bi *bulkIndexer) Close(ctx context.Context) error {
331326
bi.wg.Wait()
332327
}
333328

334-
for _, w := range bi.workers {
335-
w.mu.Lock()
336-
if w.buf.Len() > 0 {
337-
if err := w.flush(ctx); err != nil {
338-
w.mu.Unlock()
339-
if bi.config.OnError != nil {
340-
bi.config.OnError(ctx, err)
341-
}
342-
continue
343-
}
344-
}
345-
w.mu.Unlock()
346-
}
347329
return nil
348330
}
349331

@@ -367,54 +349,26 @@ func (bi *bulkIndexer) init() {
367349

368350
for i := 1; i <= bi.config.NumWorkers; i++ {
369351
w := worker{
370-
id: i,
371-
ch: bi.queue,
372-
bi: bi,
373-
buf: bytes.NewBuffer(make([]byte, 0, bi.config.FlushBytes)),
374-
aux: make([]byte, 0, 512)}
352+
id: i,
353+
ch: bi.queue,
354+
bi: bi,
355+
buf: bytes.NewBuffer(make([]byte, 0, bi.config.FlushBytes)),
356+
ticker: time.NewTicker(bi.config.FlushInterval),
357+
}
375358
w.run()
376359
bi.workers = append(bi.workers, &w)
377360
}
378361
bi.wg.Add(bi.config.NumWorkers)
379-
380-
bi.ticker = time.NewTicker(bi.config.FlushInterval)
381-
go func() {
382-
ctx := context.Background()
383-
for {
384-
select {
385-
case <-bi.done:
386-
return
387-
case <-bi.ticker.C:
388-
if bi.config.DebugLogger != nil {
389-
bi.config.DebugLogger.Printf("[indexer] Auto-flushing workers after %s\n", bi.config.FlushInterval)
390-
}
391-
for _, w := range bi.workers {
392-
w.mu.Lock()
393-
if w.buf.Len() > 0 {
394-
if err := w.flush(ctx); err != nil {
395-
w.mu.Unlock()
396-
if bi.config.OnError != nil {
397-
bi.config.OnError(ctx, err)
398-
}
399-
continue
400-
}
401-
}
402-
w.mu.Unlock()
403-
}
404-
}
405-
}
406-
}()
407362
}
408363

409364
// worker represents an indexer worker.
410365
type worker struct {
411-
id int
412-
ch <-chan BulkIndexerItem
413-
mu sync.Mutex
414-
bi *bulkIndexer
415-
buf *bytes.Buffer
416-
aux []byte
417-
items []BulkIndexerItem
366+
id int
367+
ch <-chan BulkIndexerItem
368+
bi *bulkIndexer
369+
buf *bytes.Buffer
370+
items []BulkIndexerItem
371+
ticker *time.Ticker
418372
}
419373

420374
// run launches the worker in a goroutine.
@@ -425,73 +379,74 @@ func (w *worker) run() {
425379
if w.bi.config.DebugLogger != nil {
426380
w.bi.config.DebugLogger.Printf("[worker-%03d] Started\n", w.id)
427381
}
428-
defer w.bi.wg.Done()
382+
defer func() {
383+
w.ticker.Stop()
384+
w.flush(ctx)
385+
w.bi.wg.Done()
386+
}()
429387

430-
for item := range w.ch {
431-
w.mu.Lock()
388+
for {
389+
select {
390+
case <-w.ticker.C:
391+
if w.bi.config.DebugLogger != nil {
392+
w.bi.config.DebugLogger.Printf("[worker-%03d] Auto-flushing after %s\n",
393+
w.id, w.bi.config.FlushInterval)
394+
}
395+
w.flush(ctx)
396+
case item, ok := <-w.ch:
397+
if !ok {
398+
return
399+
}
432400

433-
if w.bi.config.DebugLogger != nil {
434-
w.bi.config.DebugLogger.Printf("[worker-%03d] Received item [%s:%s]\n", w.id, item.Action, item.DocumentID)
435-
}
401+
if w.bi.config.DebugLogger != nil {
402+
w.bi.config.DebugLogger.Printf("[worker-%03d] Received item [%s:%s]\n", w.id, item.Action, item.DocumentID)
403+
}
436404

437-
oversizePayload := w.bi.config.FlushBytes <= item.payloadLength
438-
if !oversizePayload && w.buf.Len() > 0 && w.buf.Len()+item.payloadLength >= w.bi.config.FlushBytes {
439-
if err := w.flush(ctx); err != nil {
440-
w.mu.Unlock()
441-
if w.bi.config.OnError != nil {
442-
w.bi.config.OnError(ctx, err)
405+
oversizePayload := w.bi.config.FlushBytes <= item.payloadLength
406+
if !oversizePayload && w.buf.Len() > 0 && w.buf.Len()+item.payloadLength >= w.bi.config.FlushBytes {
407+
if !w.flush(ctx) {
408+
continue
443409
}
444-
w.mu.Lock()
445-
// continue with 'item' even when flush failed
446410
}
447-
}
448411

449-
if err := w.writeMeta(&item); err != nil {
450-
if item.OnFailure != nil {
451-
item.OnFailure(ctx, item, BulkIndexerResponseItem{}, err)
412+
if err := w.writeMeta(&item); err != nil {
413+
if item.OnFailure != nil {
414+
item.OnFailure(ctx, item, BulkIndexerResponseItem{}, err)
415+
}
416+
atomic.AddUint64(&w.bi.stats.numFailed, 1)
417+
continue
452418
}
453-
atomic.AddUint64(&w.bi.stats.numFailed, 1)
454-
w.mu.Unlock()
455-
continue
456-
}
457419

458-
if err := w.writeBody(&item); err != nil {
459-
if item.OnFailure != nil {
460-
item.OnFailure(ctx, item, BulkIndexerResponseItem{}, err)
420+
if err := w.writeBody(&item); err != nil {
421+
if item.OnFailure != nil {
422+
item.OnFailure(ctx, item, BulkIndexerResponseItem{}, err)
423+
}
424+
atomic.AddUint64(&w.bi.stats.numFailed, 1)
425+
continue
461426
}
462-
atomic.AddUint64(&w.bi.stats.numFailed, 1)
463-
w.mu.Unlock()
464-
continue
465-
}
466427

467-
w.items = append(w.items, item)
468-
// Should the item payload exceed the configured FlushBytes flush happens instantly.
469-
if oversizePayload {
470-
if w.bi.config.DebugLogger != nil {
471-
w.bi.config.DebugLogger.Printf("[worker-%03d] Oversize Payload in item [%s:%s]\n", w.id, item.Action, item.DocumentID)
472-
}
473-
if err := w.flush(ctx); err != nil {
474-
w.mu.Unlock()
475-
if w.bi.config.OnError != nil {
476-
w.bi.config.OnError(ctx, err)
428+
w.items = append(w.items, item)
429+
// Should the item payload exceed the configured FlushBytes flush happens instantly.
430+
if oversizePayload {
431+
if w.bi.config.DebugLogger != nil {
432+
w.bi.config.DebugLogger.Printf("[worker-%03d] Oversize Payload in item [%s:%s]\n", w.id, item.Action, item.DocumentID)
477433
}
478-
continue
434+
w.flush(ctx)
479435
}
480436
}
481-
w.mu.Unlock()
482437
}
483438
}()
484439
}
485440

486-
// writeMeta writes the item metadata to the buffer; it must be called under a lock.
441+
// writeMeta writes the item metadata to the buffer.
487442
func (w *worker) writeMeta(item *BulkIndexerItem) error {
488443
if _, err := w.buf.Write(item.meta.Bytes()); err != nil {
489444
return err
490445
}
491446
return nil
492447
}
493448

494-
// writeBody writes the item body to the buffer; it must be called under a lock.
449+
// writeBody writes the item body to the buffer.
495450
func (w *worker) writeBody(item *BulkIndexerItem) error {
496451
if item.Body != nil {
497452
if _, err := w.buf.ReadFrom(item.Body); err != nil {
@@ -506,8 +461,23 @@ func (w *worker) writeBody(item *BulkIndexerItem) error {
506461
return nil
507462
}
508463

509-
// flush writes out the worker buffer; it must be called under a lock.
510-
func (w *worker) flush(ctx context.Context) error {
464+
// flush writes out the worker buffer and handles errors.
465+
// It also restarts the ticker.
466+
// Returns true to indicate success.
467+
func (w *worker) flush(ctx context.Context) bool {
468+
ok := true
469+
if err := w.flushBuffer(ctx); err != nil {
470+
if w.bi.config.OnError != nil {
471+
w.bi.config.OnError(ctx, err)
472+
}
473+
ok = false
474+
}
475+
w.ticker.Reset(w.bi.config.FlushInterval)
476+
return ok
477+
}
478+
479+
// flushBuffer writes out the worker buffer.
480+
func (w *worker) flushBuffer(ctx context.Context) error {
511481
if w.bi.config.OnFlushStart != nil {
512482
ctx = w.bi.config.OnFlushStart(ctx)
513483
}

esutil/bulk_indexer_internal_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"time"
4141

4242
"github.com/elastic/elastic-transport-go/v8/elastictransport"
43+
4344
"github.com/elastic/go-elasticsearch/v8"
4445
"github.com/elastic/go-elasticsearch/v8/esapi"
4546
)
@@ -525,7 +526,7 @@ func TestBulkIndexer(t *testing.T) {
525526
}
526527
es, _ := elasticsearch.NewClient(esCfg)
527528

528-
biCfg := BulkIndexerConfig{NumWorkers: 1, FlushBytes: 28*2, Client: es}
529+
biCfg := BulkIndexerConfig{NumWorkers: 1, FlushBytes: 28 * 2, Client: es}
529530
if os.Getenv("DEBUG") != "" {
530531
biCfg.DebugLogger = log.New(os.Stdout, "", 0)
531532
}
@@ -690,7 +691,6 @@ func TestBulkIndexer(t *testing.T) {
690691
t.Run(tt.name, func(t *testing.T) {
691692
w := &worker{
692693
buf: bytes.NewBuffer(make([]byte, 0, 5e+6)),
693-
aux: make([]byte, 0, 512),
694694
}
695695
tt.args.item.marshallMeta()
696696
if err := w.writeMeta(&tt.args.item); err != nil {

0 commit comments

Comments
 (0)