Skip to content

Commit fa9f5d4

Browse files
Reduce CPU usage (#575) (#581)
* Compute length using Seek() * Reduce heap allocations in marshallMeta * Remove outdated comment Co-authored-by: Laurent Saint-Félix <laurent.saintfelix@elastic.co> Co-authored-by: Tim Rühsen <tim.ruehsen@gmx.de>
1 parent c768538 commit fa9f5d4

File tree

1 file changed

+21
-31
lines changed

1 file changed

+21
-31
lines changed

esutil/bulk_indexer.go

Lines changed: 21 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"bytes"
2222
"context"
2323
"encoding/json"
24-
"errors"
2524
"fmt"
2625
"io"
2726
"net/http"
@@ -115,18 +114,19 @@ type BulkIndexerItem struct {
115114

116115
// marshallMeta format as JSON the item metadata.
117116
func (item *BulkIndexerItem) marshallMeta() {
118-
var aux []byte
117+
// Pre-allocate a buffer large enough for most use cases.
118+
// 'aux = aux[:0]' resets the length without changing the capacity.
119+
aux := make([]byte, 0, 256)
120+
119121
item.meta.WriteRune('{')
120-
aux = strconv.AppendQuote(aux, item.Action)
121-
item.meta.Write(aux)
122-
aux = nil
122+
item.meta.Write(strconv.AppendQuote(aux, item.Action))
123+
aux = aux[:0]
123124
item.meta.WriteRune(':')
124125
item.meta.WriteRune('{')
125126
if item.DocumentID != "" {
126127
item.meta.WriteString(`"_id":`)
127-
aux = strconv.AppendQuote(aux, item.DocumentID)
128-
item.meta.Write(aux)
129-
aux = nil
128+
item.meta.Write(strconv.AppendQuote(aux, item.DocumentID))
129+
aux = aux[:0]
130130
}
131131

132132
if item.DocumentID != "" && item.Version != nil {
@@ -138,37 +138,33 @@ func (item *BulkIndexerItem) marshallMeta() {
138138
if item.DocumentID != "" && item.VersionType != "" {
139139
item.meta.WriteRune(',')
140140
item.meta.WriteString(`"version_type":`)
141-
aux = strconv.AppendQuote(aux, item.VersionType)
142-
item.meta.Write(aux)
143-
aux = nil
141+
item.meta.Write(strconv.AppendQuote(aux, item.VersionType))
142+
aux = aux[:0]
144143
}
145144

146145
if item.Routing != "" {
147146
if item.DocumentID != "" {
148147
item.meta.WriteRune(',')
149148
}
150149
item.meta.WriteString(`"routing":`)
151-
aux = strconv.AppendQuote(aux, item.Routing)
152-
item.meta.Write(aux)
153-
aux = nil
150+
item.meta.Write(strconv.AppendQuote(aux, item.Routing))
151+
aux = aux[:0]
154152
}
155153
if item.Index != "" {
156154
if item.DocumentID != "" || item.Routing != "" {
157155
item.meta.WriteRune(',')
158156
}
159157
item.meta.WriteString(`"_index":`)
160-
aux = strconv.AppendQuote(aux, item.Index)
161-
item.meta.Write(aux)
162-
aux = nil
158+
item.meta.Write(strconv.AppendQuote(aux, item.Index))
159+
aux = aux[:0]
163160
}
164161
if item.RetryOnConflict != nil && item.Action == "update" {
165162
if item.DocumentID != "" || item.Routing != "" || item.Index != "" {
166163
item.meta.WriteString(",")
167164
}
168165
item.meta.WriteString(`"retry_on_conflict":`)
169-
aux = strconv.AppendInt(aux, int64(*item.RetryOnConflict), 10)
170-
item.meta.Write(aux)
171-
aux = nil
166+
item.meta.Write(strconv.AppendInt(aux, int64(*item.RetryOnConflict), 10))
167+
aux = aux[:0]
172168
}
173169
item.meta.WriteRune('}')
174170
item.meta.WriteRune('}')
@@ -178,18 +174,12 @@ func (item *BulkIndexerItem) marshallMeta() {
178174
// computeLength calculate the size of the body and the metadata.
179175
func (item *BulkIndexerItem) computeLength() error {
180176
if item.Body != nil {
181-
// TODO propagate buf len to config to allow for performance gains.
182-
var buf = make([]byte, 1<<4)
183-
for {
184-
n, err := item.Body.Read(buf)
185-
if errors.Is(err, io.EOF) {
186-
break
187-
} else if err != nil {
188-
return err
189-
}
190-
item.payloadLength += n
177+
n, err := item.Body.Seek(0, io.SeekEnd)
178+
if err != nil {
179+
return err
191180
}
192-
_, err := item.Body.Seek(0, io.SeekStart)
181+
item.payloadLength += int(n)
182+
_, err = item.Body.Seek(0, io.SeekStart)
193183
if err != nil {
194184
return err
195185
}

0 commit comments

Comments
 (0)