Skip to content

Commit 29ede6b

Browse files
Anaethelionmehran-prsmunkyboy
authored
[Backport] bulkdindexer version and routing (#373)
* Add per item routing for bulk requests (#265) * Remove docuemnt type from bulk indexer * Util: Allow setting version metadata in Bulk Indexer * Fix version type * Update tests Co-authored-by: Mike Luu <github@munkyboy.com> * CI: Util: Add integration tests to bulkindexer for external versioning (#360) Co-authored-by: Mehran Poursadeghi <mehr.prs@gmail.com> Co-authored-by: Mike Luu <github@munkyboy.com>
1 parent d1bc209 commit 29ede6b

File tree

3 files changed

+120
-3
lines changed

3 files changed

+120
-3
lines changed

esutil/bulk_indexer.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,9 @@ type BulkIndexerItem struct {
106106
Index string
107107
Action string
108108
DocumentID string
109+
Routing string
110+
Version *int64
111+
VersionType string
109112
Body io.Reader
110113
RetryOnConflict *int
111114

@@ -408,10 +411,34 @@ func (w *worker) writeMeta(item BulkIndexerItem) error {
408411
w.buf.Write(w.aux)
409412
w.aux = w.aux[:0]
410413
}
411-
if item.Index != "" {
414+
415+
if item.DocumentID != "" && item.Version != nil {
416+
w.buf.WriteRune(',')
417+
w.buf.WriteString(`"version":`)
418+
w.buf.WriteString(strconv.FormatInt(*item.Version, 10))
419+
}
420+
421+
if item.DocumentID != "" && item.VersionType != "" {
422+
w.buf.WriteRune(',')
423+
w.buf.WriteString(`"version_type":`)
424+
w.aux = strconv.AppendQuote(w.aux, item.VersionType)
425+
w.buf.Write(w.aux)
426+
w.aux = w.aux[:0]
427+
}
428+
429+
if item.Routing != "" {
412430
if item.DocumentID != "" {
413431
w.buf.WriteRune(',')
414432
}
433+
w.buf.WriteString(`"_routing":`)
434+
w.aux = strconv.AppendQuote(w.aux, item.Routing)
435+
w.buf.Write(w.aux)
436+
w.aux = w.aux[:0]
437+
}
438+
if item.Index != "" {
439+
if item.DocumentID != "" || item.Routing != "" {
440+
w.buf.WriteRune(',')
441+
}
415442
w.buf.WriteString(`"_index":`)
416443
w.aux = strconv.AppendQuote(w.aux, item.Index)
417444
w.buf.Write(w.aux)

esutil/bulk_indexer_integration_test.go

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
//go:build integration
1819
// +build integration
1920

2021
package esutil_test
@@ -187,6 +188,65 @@ func TestBulkIndexerIntegration(t *testing.T) {
187188
t.Errorf("Expected indices to exist, but got a [%s] response", res.Status())
188189
}
189190
})
191+
192+
t.Run("External version", func(t *testing.T) {
193+
var index string = "test-index-a"
194+
195+
es, _ := elasticsearch.NewClient(elasticsearch.Config{
196+
CompressRequestBody: tt.CompressRequestBodyEnabled,
197+
Logger: &estransport.ColorLogger{Output: os.Stdout},
198+
})
199+
200+
es.Indices.Delete([]string{index}, es.Indices.Delete.WithIgnoreUnavailable(true))
201+
es.Indices.Create(index, es.Indices.Create.WithWaitForActiveShards("1"))
202+
203+
bulkIndex := func(bulkIndexer esutil.BulkIndexer, baseVersion int) {
204+
var countTotal int = 500
205+
var countSuccessful uint64
206+
for i := 0; i < countTotal; i++ {
207+
version := int64(baseVersion + i)
208+
item := esutil.BulkIndexerItem{
209+
Action: "index",
210+
Index: index,
211+
DocumentID: strconv.Itoa(i),
212+
Body: strings.NewReader(body),
213+
Version: &version,
214+
VersionType: "external",
215+
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem) {
216+
if version != item2.Version &&
217+
version != *item.Version &&
218+
item2.Result != "created" {
219+
t.Fatalf("version mismatch, expected: %d, got: %d && %d", version, item.Version, item2.Version)
220+
}
221+
atomic.AddUint64(&countSuccessful, 1)
222+
},
223+
}
224+
err := bulkIndexer.Add(context.Background(), item)
225+
if err != nil {
226+
t.Fatal(err)
227+
}
228+
}
229+
if err := bulkIndexer.Close(context.Background()); err != nil {
230+
t.Fatal(err)
231+
}
232+
233+
if int(countSuccessful) != countTotal {
234+
t.Fatalf("Unexpected countSuccessful, expected %d, got: %d", countTotal, countSuccessful)
235+
}
236+
}
237+
238+
bi, _ := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
239+
Index: index,
240+
Client: es,
241+
})
242+
bulkIndex(bi, 500)
243+
244+
bi, _ = esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
245+
Index: index,
246+
Client: es,
247+
})
248+
bulkIndex(bi, 900)
249+
})
190250
})
191251
}
192-
}
252+
}

esutil/bulk_indexer_internal_test.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -644,8 +644,8 @@ func TestBulkIndexer(t *testing.T) {
644644
t.Errorf("Unexpected NumAdded: %d", stats.NumAdded)
645645
}
646646
})
647-
648647
t.Run("Worker.writeMeta()", func(t *testing.T) {
648+
v:=int64(23)
649649
type args struct {
650650
item BulkIndexerItem
651651
}
@@ -684,6 +684,36 @@ func TestBulkIndexer(t *testing.T) {
684684
}},
685685
`{"index":{"_id":"42","_index":"test"}}` + "\n",
686686
},
687+
{
688+
"with version and no document",
689+
args{BulkIndexerItem{
690+
Action: "index",
691+
Index: "test",
692+
Version: &v,
693+
}},
694+
`{"index":{"_index":"test"}}` + "\n",
695+
},
696+
{
697+
"with version",
698+
args{BulkIndexerItem{
699+
Action: "index",
700+
DocumentID: "42",
701+
Index: "test",
702+
Version: &v,
703+
}},
704+
`{"index":{"_id":"42","version":23,"_index":"test"}}` + "\n",
705+
},
706+
{
707+
"with version and version_type",
708+
args{BulkIndexerItem{
709+
Action: "index",
710+
DocumentID: "42",
711+
Index: "test",
712+
Version: &v,
713+
VersionType: "external",
714+
}},
715+
`{"index":{"_id":"42","version":23,"version_type":"external","_index":"test"}}` + "\n",
716+
},
687717
}
688718
for _, tt := range tests {
689719
tt := tt

0 commit comments

Comments
 (0)