Skip to content

Commit ae9cd05

Browse files
committed
Add require_alias on query string and body with serialization
1 parent 317dd3a commit ae9cd05

File tree

3 files changed

+82
-0
lines changed

3 files changed

+82
-0
lines changed

esutil/bulk_indexer.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ type BulkIndexerConfig struct {
7676
Pretty bool
7777
Refresh string
7878
Routing string
79+
RequireAlias *bool
7980
Source []string
8081
SourceExcludes []string
8182
SourceIncludes []string
@@ -101,6 +102,7 @@ type BulkIndexerItem struct {
101102
Action string
102103
DocumentID string
103104
Routing string
105+
RequireAlias *bool
104106
Version *int64
105107
VersionType string
106108
Body io.ReadSeeker
@@ -166,6 +168,15 @@ func (item *BulkIndexerItem) marshallMeta() {
166168
item.meta.Write(strconv.AppendInt(aux, int64(*item.RetryOnConflict), 10))
167169
aux = aux[:0]
168170
}
171+
if item.RequireAlias != nil {
172+
if item.DocumentID != "" || item.Routing != "" || item.Index != "" || item.RetryOnConflict != nil {
173+
item.meta.WriteString(",")
174+
}
175+
item.meta.WriteString(`"require_alias":`)
176+
item.meta.Write(strconv.AppendBool(aux, *item.RequireAlias))
177+
aux = aux[:0]
178+
}
179+
169180
item.meta.WriteRune('}')
170181
item.meta.WriteRune('}')
171182
item.meta.WriteRune('\n')
@@ -519,6 +530,7 @@ func (w *worker) flushBuffer(ctx context.Context) error {
519530
Pipeline: w.bi.config.Pipeline,
520531
Refresh: w.bi.config.Refresh,
521532
Routing: w.bi.config.Routing,
533+
RequireAlias: w.bi.config.RequireAlias,
522534
Source: w.bi.config.Source,
523535
SourceExcludes: w.bi.config.SourceExcludes,
524536
SourceIncludes: w.bi.config.SourceIncludes,

esutil/bulk_indexer_integration_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"compress/gzip"
2525
"context"
2626
"fmt"
27+
"github.com/elastic/go-elasticsearch/v8/esapi"
2728
"os"
2829
"strconv"
2930
"strings"
@@ -257,6 +258,53 @@ func TestBulkIndexerIntegration(t *testing.T) {
257258
})
258259
bulkIndex(bi, 900)
259260
})
261+
262+
t.Run("Index alias", func(t *testing.T) {
263+
var index string = "test-index-a"
264+
var alias string = "test-alias-a"
265+
266+
es, _ := elasticsearch.NewClient(elasticsearch.Config{
267+
CompressRequestBody: tt.CompressRequestBodyEnabled,
268+
CompressRequestBodyLevel: tt.CompressRequestBodyLevel,
269+
Logger: &elastictransport.ColorLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true},
270+
})
271+
272+
es.Indices.Delete([]string{index}, es.Indices.Delete.WithIgnoreUnavailable(true))
273+
es.Indices.Create(index, es.Indices.Create.WithWaitForActiveShards("1"))
274+
es.Indices.PutAlias([]string{index}, alias)
275+
276+
bi, _ := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
277+
Client: es,
278+
})
279+
func(bulkIndexer esutil.BulkIndexer) {
280+
var countTotal int = 10
281+
var countSuccessful uint64
282+
for i := 0; i < countTotal; i++ {
283+
item := esutil.BulkIndexerItem{
284+
Action: "index",
285+
Index: alias,
286+
DocumentID: strconv.Itoa(i),
287+
Body: strings.NewReader(body),
288+
RequireAlias: esapi.BoolPtr(true),
289+
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem) {
290+
atomic.AddUint64(&countSuccessful, 1)
291+
},
292+
}
293+
err := bulkIndexer.Add(context.Background(), item)
294+
if err != nil {
295+
t.Fatal(err)
296+
}
297+
}
298+
if err := bulkIndexer.Close(context.Background()); err != nil {
299+
t.Fatal(err)
300+
}
301+
302+
if int(countSuccessful) != countTotal {
303+
t.Fatalf("Unexpected countSuccessful, expected %d, got: %d", countTotal, countSuccessful)
304+
}
305+
}(bi)
306+
307+
})
260308
})
261309
}
262310
}

esutil/bulk_indexer_internal_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -666,6 +666,28 @@ func TestBulkIndexer(t *testing.T) {
666666
}},
667667
`{"index":{"_id":"42","version":23,"version_type":"external","_index":"test"}}` + "\n",
668668
},
669+
{
670+
"with require_alias",
671+
args{BulkIndexerItem{
672+
Action: "index",
673+
DocumentID: "42",
674+
Index: "test",
675+
RequireAlias: esapi.BoolPtr(true),
676+
}},
677+
`{"index":{"_id":"42","_index":"test","require_alias":true}}` + "\n",
678+
},
679+
{
680+
"with version, version_type and require_alias",
681+
args{BulkIndexerItem{
682+
Action: "index",
683+
DocumentID: "42",
684+
Index: "test",
685+
Version: &v,
686+
VersionType: "external",
687+
RequireAlias: esapi.BoolPtr(true),
688+
}},
689+
`{"index":{"_id":"42","version":23,"version_type":"external","_index":"test","require_alias":true}}` + "\n",
690+
},
669691
{
670692
"with retry_on_conflict and bad action",
671693
args{BulkIndexerItem{

0 commit comments

Comments
 (0)