Skip to content

Bug: esutil.BulkIndexer silently drops item after a full buffer bulk flush fails #1041

@gean-dito

Description

@gean-dito

In worker.run when the buffer would get full due to a new item, there is a forced flush attempt. If this fails returning false, the current item recently popped from the itemQueue is lost (no callback is called and it is not requeued/reprocessed).

Here is a test reproducing the issue. If the status code is changed to http.StatusOK it passes.

package esutil

import (
	"bytes"
	"context"
	"io"
	"log"
	"net/http"
	"strconv"
	"strings"
	"testing"

	"github.com/elastic/go-elasticsearch/v9"
)

func TestItemLoss(t *testing.T) {
	esConfig := elasticsearch.Config{
		Transport: &mockTransport{
			RoundTripFunc: func(request *http.Request) (*http.Response, error) {
				return &http.Response{
					// StatusCode: http.StatusOK,
					StatusCode: http.StatusInternalServerError,
					Body: io.NopCloser(strings.NewReader(`{}`)),
					Header: http.Header{
						"X-Elastic-Product": []string{"Elasticsearch"},
					},
				}, nil
			},
		},
	}

	client, err := elasticsearch.NewClient(esConfig)
	if err != nil {
		log.Fatal(err)
	}

	logbuf := bytes.Buffer{}
	logger := log.New(&logbuf, "", 0)
	cfg := BulkIndexerConfig{
		NumWorkers:  1,
		Client:      client,
		FlushBytes:  100,
		DebugLogger: logger,
	}
	bi, err := NewBulkIndexer(cfg)
	if err != nil {
		log.Fatal(err)
	}

	itemSuccessCallback := func(ctx context.Context, item BulkIndexerItem, response BulkIndexerResponseItem) {
		logger.Printf("doc %s callback: success\n", response.DocumentID)
	}
	itemFailureCallback := func(ctx context.Context, item BulkIndexerItem, response BulkIndexerResponseItem, err error) {
		logger.Printf("doc %s callback: fail\n", response.DocumentID)
	}
	bi.Add(context.Background(), BulkIndexerItem{
		Action:     "index",
		DocumentID: strconv.Itoa(1),
		Body:       strings.NewReader(`{"title":"foo1"}`),
		OnSuccess:  itemSuccessCallback,
		OnFailure:  itemFailureCallback,
	})
	bi.Add(context.Background(), BulkIndexerItem{
		Action:     "index",
		DocumentID: strconv.Itoa(2),
		Body:       strings.NewReader(`{"title":"foo2"}`),
		OnSuccess:  itemSuccessCallback,
		OnFailure:  itemFailureCallback,
	})
	bi.Add(context.Background(), BulkIndexerItem{
		Action:     "index",
		DocumentID: strconv.Itoa(3),
		Body:       strings.NewReader(`{"title":"foo3"}`),
		OnSuccess:  itemSuccessCallback,
		OnFailure:  itemFailureCallback,
	})
	bi.Add(context.Background(), BulkIndexerItem{
		Action:     "index",
		DocumentID: strconv.Itoa(4),
		Body:       strings.NewReader(`{"title":"foo4"}`),
		OnSuccess:  itemSuccessCallback,
		OnFailure:  itemFailureCallback,
	})
	bi.Close(context.Background())

	logdata := logbuf.Bytes()
	if !bytes.Contains(logdata, []byte(`{"title":"foo1"}`)) {
		t.Fatalf("Expected doc 1 to be flushed, got: \n%s", logbuf.String())
	}
	if !bytes.Contains(logdata, []byte(`{"title":"foo2"}`)) {
		t.Fatalf("Expected doc 2 to be flushed, got: \n%s", logbuf.String())
	}
	if !bytes.Contains(logdata, []byte(`{"title":"foo3"}`)) {
		t.Fatalf("Expected doc 3 to be flushed, got: \n%s", logbuf.String())
	}
	if !bytes.Contains(logdata, []byte(`{"title":"foo4"}`)) {
		t.Fatalf("Expected doc 4 to be flushed, got: \n%s", logbuf.String())
	}
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    Area: HelpersImprovements or additions to the helpCategory: BugSomething isn't working

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions