Skip to content

Commit d55d0b5

Browse files
committed
Merge branch 'fix/refresh'
2 parents 27168c3 + 3fa7ef7 commit d55d0b5

File tree

7 files changed

+67
-29
lines changed

7 files changed

+67
-29
lines changed

core/src/main/scala/app/softnetwork/elastic/client/ElasticClientDelegator.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -562,9 +562,10 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
562562
override private[client] def executeIndex(
563563
index: String,
564564
id: String,
565-
source: String
565+
source: String,
566+
wait: Boolean
566567
): ElasticResult[Boolean] =
567-
delegate.executeIndex(index, id, source)
568+
delegate.executeIndex(index, id, source, wait)
568569

569570
override private[client] def executeIndexAsync(
570571
index: String,
@@ -1138,7 +1139,7 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
11381139

11391140
override private[client] def pitSearchAfter(
11401141
elasticQuery: ElasticQuery,
1141-
fieldAliases: Map[JSONResults, JSONResults],
1142+
fieldAliases: Map[String, String],
11421143
config: ScrollConfig,
11431144
hasSorts: Boolean
11441145
)(implicit system: ActorSystem) = {

core/src/main/scala/app/softnetwork/elastic/client/IndexApi.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,9 @@ trait IndexApi extends ElasticClientHelpers { _: RefreshApi with SerializationAp
9191

9292
logger.debug(s"Indexing document with id '$id' in index '$index'")
9393

94-
executeIndex(index, id, source) match {
94+
executeIndex(index, id, source, wait=false) match {
9595
case success @ ElasticSuccess(true) =>
9696
logger.info(s"✅ Document with id '$id' indexed successfully in index '$index'")
97-
// Refresh the index to make sure the document is available for search
98-
this.refresh(index)
9997
success
10098
case success @ ElasticSuccess(_) =>
10199
logger.info(s"✅ Document with id '$id' not indexed in index '$index'")
@@ -206,7 +204,8 @@ trait IndexApi extends ElasticClientHelpers { _: RefreshApi with SerializationAp
206204
private[client] def executeIndex(
207205
index: String,
208206
id: String,
209-
source: String
207+
source: String,
208+
wait: Boolean
210209
): ElasticResult[Boolean]
211210

212211
private[client] def executeIndexAsync(

es6/jest/src/main/scala/app/softnetwork/elastic/client/jest/JestIndexApi.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,22 @@ trait JestIndexApi extends IndexApi with JestClientHelpers {
3636
override private[client] def executeIndex(
3737
index: String,
3838
id: String,
39-
source: String
39+
source: String,
40+
wait: Boolean
4041
): ElasticResult[Boolean] =
4142
executeJestBooleanAction(
4243
operation = "index",
4344
index = Some(index),
4445
retryable = true
45-
)(
46-
new Index.Builder(source).index(index).`type`("_doc").id(id).build()
47-
)
46+
) {
47+
val refresh = if (wait) "wait_for" else "false"
48+
new Index.Builder(source)
49+
.index(index)
50+
.`type`("_doc")
51+
.id(id)
52+
.setParameter("refresh", refresh)
53+
.build()
54+
}
4855

4956
/** Index a document in the given index asynchronously.
5057
* @see

es6/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import org.elasticsearch.action.search.{
4848
SearchResponse,
4949
SearchScrollRequest
5050
}
51+
import org.elasticsearch.action.support.WriteRequest
5152
import org.elasticsearch.action.support.master.AcknowledgedResponse
5253
import org.elasticsearch.action.update.{UpdateRequest, UpdateResponse}
5354
import org.elasticsearch.action.{ActionListener, DocWriteRequest}
@@ -490,18 +491,22 @@ trait RestHighLevelClientIndexApi extends IndexApi with RestHighLevelClientHelpe
490491
override private[client] def executeIndex(
491492
index: String,
492493
id: String,
493-
source: String
494+
source: String,
495+
wait: Boolean
494496
): result.ElasticResult[Boolean] =
495497
executeRestAction[IndexRequest, IndexResponse, Boolean](
496498
operation = "index",
497499
index = Some(index),
498500
retryable = false
499-
)(
500-
request = new IndexRequest(index)
501+
)(request = {
502+
val refresh =
503+
if (wait) WriteRequest.RefreshPolicy.WAIT_UNTIL else WriteRequest.RefreshPolicy.NONE
504+
new IndexRequest(index)
501505
.`type`("_doc")
502506
.id(id)
503507
.source(source, XContentType.JSON)
504-
)(
508+
.setRefreshPolicy(refresh)
509+
})(
505510
executor = req => apply().index(req, RequestOptions.DEFAULT)
506511
)(response => response.status().getStatus < 400)
507512

es7/rest/src/main/scala/app/softnetwork/elastic/client/rest/RestHighLevelClientApi.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import org.elasticsearch.action.search.{
4949
SearchResponse,
5050
SearchScrollRequest
5151
}
52+
import org.elasticsearch.action.support.WriteRequest
5253
import org.elasticsearch.action.support.master.AcknowledgedResponse
5354
import org.elasticsearch.action.update.{UpdateRequest, UpdateResponse}
5455
import org.elasticsearch.action.{ActionListener, DocWriteRequest}
@@ -496,17 +497,23 @@ trait RestHighLevelClientIndexApi extends IndexApi with RestHighLevelClientHelpe
496497
override private[client] def executeIndex(
497498
index: String,
498499
id: String,
499-
source: String
500+
source: String,
501+
wait: Boolean
500502
): result.ElasticResult[Boolean] =
501503
executeRestAction[IndexRequest, IndexResponse, Boolean](
502504
operation = "index",
503505
index = Some(index),
504506
retryable = false
505507
)(
506-
request = new IndexRequest(index)
507-
.`type`("_doc")
508-
.id(id)
509-
.source(source, XContentType.JSON)
508+
request = {
509+
val refresh =
510+
if (wait) WriteRequest.RefreshPolicy.WAIT_UNTIL else WriteRequest.RefreshPolicy.NONE
511+
new IndexRequest(index)
512+
.`type`("_doc")
513+
.id(id)
514+
.source(source, XContentType.JSON)
515+
.setRefreshPolicy(refresh)
516+
}
510517
)(
511518
executor = req => apply().index(req, RequestOptions.DEFAULT)
512519
)(response => response.status().getStatus < 400)

es8/java/src/main/scala/app/softnetwork/elastic/client/java/JavaClientApi.scala

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,14 @@ import app.softnetwork.elastic.sql.bridge._
2727
import app.softnetwork.elastic.sql.query.{SQLAggregation, SQLSearchRequest}
2828
import app.softnetwork.elastic.client
2929
import app.softnetwork.elastic.client.result.{ElasticFailure, ElasticResult, ElasticSuccess}
30-
import co.elastic.clients.elasticsearch._types.{FieldSort, FieldValue, SortOptions, SortOrder, Time}
30+
import co.elastic.clients.elasticsearch._types.{
31+
FieldSort,
32+
FieldValue,
33+
Refresh,
34+
SortOptions,
35+
SortOrder,
36+
Time
37+
}
3138
import co.elastic.clients.elasticsearch.core.bulk.{
3239
BulkOperation,
3340
DeleteOperation,
@@ -510,22 +517,25 @@ trait JavaClientIndexApi extends IndexApi with JavaClientHelpers {
510517
override private[client] def executeIndex(
511518
index: String,
512519
id: String,
513-
source: String
520+
source: String,
521+
wait: Boolean
514522
): result.ElasticResult[Boolean] =
515523
executeJavaBooleanAction(
516524
operation = "index",
517525
index = Some(index),
518526
retryable = false
519-
)(
527+
) {
528+
val refresh = if (wait) Refresh.WaitFor else Refresh.False
520529
apply()
521530
.index(
522531
new IndexRequest.Builder()
523532
.index(index)
524533
.id(id)
525534
.withJson(new StringReader(source))
535+
.refresh(refresh)
526536
.build()
527537
)
528-
)(
538+
}(
529539
_.shards()
530540
.failed()
531541
.intValue() == 0

es9/java/src/main/scala/app/softnetwork/elastic/client/java/JavaClientApi.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,14 @@ import app.softnetwork.elastic.sql.bridge._
2727
import app.softnetwork.elastic.sql.query.{SQLAggregation, SQLSearchRequest}
2828
import app.softnetwork.elastic.client
2929
import app.softnetwork.elastic.client.result.{ElasticFailure, ElasticResult, ElasticSuccess}
30-
import co.elastic.clients.elasticsearch._types.{FieldSort, FieldValue, SortOptions, SortOrder, Time}
30+
import co.elastic.clients.elasticsearch._types.{
31+
FieldSort,
32+
FieldValue,
33+
Refresh,
34+
SortOptions,
35+
SortOrder,
36+
Time
37+
}
3138
import co.elastic.clients.elasticsearch.core.bulk.{
3239
BulkOperation,
3340
DeleteOperation,
@@ -46,7 +53,6 @@ import com.google.gson.JsonParser
4653
import _root_.java.io.{IOException, StringReader}
4754
import _root_.java.util.{Map => JMap}
4855
import scala.jdk.CollectionConverters._
49-
5056
import scala.concurrent.{ExecutionContext, Future}
5157
import scala.language.implicitConversions
5258
import scala.util.{Failure, Success, Try}
@@ -508,22 +514,25 @@ trait JavaClientIndexApi extends IndexApi with JavaClientHelpers {
508514
override private[client] def executeIndex(
509515
index: String,
510516
id: String,
511-
source: String
517+
source: String,
518+
wait: Boolean
512519
): result.ElasticResult[Boolean] =
513520
executeJavaBooleanAction(
514521
operation = "index",
515522
index = Some(index),
516523
retryable = false
517-
)(
524+
) {
525+
val refresh = if (wait) Refresh.WaitFor else Refresh.False
518526
apply()
519527
.index(
520528
new IndexRequest.Builder()
521529
.index(index)
522530
.id(id)
523531
.withJson(new StringReader(source))
532+
.refresh(refresh)
524533
.build()
525534
)
526-
)(
535+
}(
527536
_.shards()
528537
.failed()
529538
.intValue() == 0

0 commit comments

Comments
 (0)