Skip to content

Commit 26d8e15

Browse files
committed
implement wait for a refresh to happen after updating
1 parent ab576a2 commit 26d8e15

File tree

12 files changed

+175
-71
lines changed

12 files changed

+175
-71
lines changed

core/src/main/scala/app/softnetwork/elastic/client/ElasticClientDelegator.scala

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -600,16 +600,19 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
600600
* - the source of the entity to update in JSON format
601601
* @param upsert
602602
* - true to upsert the entity if it does not exist, false otherwise
603+
* @param wait
604+
* - whether to wait for a refresh to happen after updating (default is false)
603605
* @return
604606
* true if the entity was updated successfully, false otherwise
605607
*/
606608
override def update(
607609
index: String,
608610
id: String,
609611
source: String,
610-
upsert: Boolean
612+
upsert: Boolean,
613+
wait: Boolean
611614
): ElasticResult[Boolean] =
612-
delegate.update(index, id, source, upsert)
615+
delegate.update(index, id, source, upsert, wait)
613616

614617
/** Update an entity in the given index.
615618
*
@@ -623,6 +626,8 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
623626
* - the type of the entity (default is the entity class name in lowercase)
624627
* @param upsert
625628
* - true to upsert the entity if it does not exist, false otherwise
629+
* @param wait
630+
* - whether to wait for a refresh to happen after updating (default is false)
626631
* @return
627632
* true if the entity was updated successfully, false otherwise
628633
*/
@@ -631,9 +636,10 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
631636
id: String,
632637
index: Option[String],
633638
maybeType: Option[String],
634-
upsert: Boolean
639+
upsert: Boolean,
640+
wait: Boolean
635641
)(implicit u: ClassTag[U], formats: Formats): ElasticResult[Boolean] =
636-
delegate.updateAs(entity, id, index, maybeType, upsert)
642+
delegate.updateAs(entity, id, index, maybeType, upsert, wait)
637643

638644
/** Update an entity in the given index asynchronously.
639645
*
@@ -645,13 +651,21 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
645651
* - the source of the entity to update in JSON format
646652
* @param upsert
647653
* - true to upsert the entity if it does not exist, false otherwise
654+
* @param wait
655+
* - whether to wait for a refresh to happen after updating (default is false)
648656
* @return
649657
* a Future that completes with true if the entity was updated successfully, false otherwise
650658
*/
651-
override def updateAsync(index: String, id: String, source: String, upsert: Boolean)(implicit
659+
override def updateAsync(
660+
index: String,
661+
id: String,
662+
source: String,
663+
upsert: Boolean,
664+
wait: Boolean
665+
)(implicit
652666
ec: ExecutionContext
653667
): Future[ElasticResult[Boolean]] =
654-
delegate.updateAsync(index, id, source, upsert)
668+
delegate.updateAsync(index, id, source, upsert, wait)
655669

656670
/** Update an entity in the given index asynchronously.
657671
*
@@ -665,6 +679,8 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
665679
* - the type of the entity (default is the entity class name in lowercase)
666680
* @param upsert
667681
* - true to upsert the entity if it does not exist, false otherwise
682+
* @param wait
683+
* - whether to wait for a refresh to happen after updating (default is false)
668684
* @return
669685
* a Future that completes with true if the entity was updated successfully, false otherwise
670686
*/
@@ -673,29 +689,32 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
673689
id: String,
674690
index: Option[String],
675691
maybeType: Option[String],
676-
upsert: Boolean
692+
upsert: Boolean,
693+
wait: Boolean
677694
)(implicit
678695
u: ClassTag[U],
679696
ec: ExecutionContext,
680697
formats: Formats
681698
): Future[ElasticResult[Boolean]] =
682-
delegate.updateAsyncAs(entity, id, index, maybeType, upsert)
699+
delegate.updateAsyncAs(entity, id, index, maybeType, upsert, wait)
683700

684701
override private[client] def executeUpdate(
685702
index: String,
686703
id: String,
687704
source: String,
688-
upsert: Boolean
705+
upsert: Boolean,
706+
wait: Boolean
689707
): ElasticResult[Boolean] =
690-
delegate.executeUpdate(index, id, source, upsert)
708+
delegate.executeUpdate(index, id, source, upsert, wait)
691709

692710
override private[client] def executeUpdateAsync(
693711
index: String,
694712
id: String,
695713
source: String,
696-
upsert: Boolean
714+
upsert: Boolean,
715+
wait: Boolean
697716
)(implicit ec: ExecutionContext): Future[ElasticResult[Boolean]] =
698-
delegate.executeUpdateAsync(index, id, source, upsert)
717+
delegate.executeUpdateAsync(index, id, source, upsert, wait)
699718

700719
// ==================== DeleteApi ====================
701720

core/src/main/scala/app/softnetwork/elastic/client/IndexApi.scala

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ trait IndexApi extends ElasticClientHelpers { _: SettingsApi with SerializationA
4545
* @param maybeType
4646
* - the type of the entity (default is the entity class name in lowercase)
4747
* @param wait
48-
* - whether to wait for a refresh to happen after indexing
48+
* - whether to wait for a refresh to happen after indexing (default is false)
4949
* @return
5050
* true if the entity was indexed successfully, false otherwise
5151
*/
@@ -54,7 +54,7 @@ trait IndexApi extends ElasticClientHelpers { _: SettingsApi with SerializationA
5454
id: String,
5555
index: Option[String] = None,
5656
maybeType: Option[String] = None,
57-
wait: Boolean
57+
wait: Boolean = false
5858
)(implicit u: ClassTag[U], formats: Formats): ElasticResult[Boolean] = {
5959
val indexType = maybeType.getOrElse(u.runtimeClass.getSimpleName.toLowerCase)
6060
val indexName = index.getOrElse(indexType)
@@ -76,11 +76,16 @@ trait IndexApi extends ElasticClientHelpers { _: SettingsApi with SerializationA
7676
* @param source
7777
* - the source of the entity to index in JSON format
7878
* @param wait
79-
* - whether to wait for a refresh to happen after indexing
79+
* - whether to wait for a refresh to happen after indexing (default is false)
8080
* @return
8181
* true if the entity was indexed successfully, false otherwise
8282
*/
83-
def index(index: String, id: String, source: String, wait: Boolean): ElasticResult[Boolean] = {
83+
def index(
84+
index: String,
85+
id: String,
86+
source: String,
87+
wait: Boolean = false
88+
): ElasticResult[Boolean] = {
8489
validateIndexName(index) match {
8590
case Some(error) =>
8691
return ElasticResult.failure(
@@ -96,7 +101,7 @@ trait IndexApi extends ElasticClientHelpers { _: SettingsApi with SerializationA
96101

97102
logger.debug(s"Indexing document with id '$id' in index '$index'")
98103

99-
// if wait for next refresh is enabled, we should make sure that the refresh is enabled (different to -1) and its interval not too high
104+
// if wait for next refresh is enabled, we should make sure that the refresh is enabled (different to -1)
100105
val waitEnabled = wait && isRefreshEnabled(index).getOrElse(false)
101106
executeIndex(index, id, source, waitEnabled) match {
102107
case success @ ElasticSuccess(true) =>
@@ -123,7 +128,7 @@ trait IndexApi extends ElasticClientHelpers { _: SettingsApi with SerializationA
123128
* @param maybeType
124129
* - the type of the entity (default is the entity class name in lowercase)
125130
* @param wait
126-
* - whether to wait for a refresh to happen after indexing
131+
* - whether to wait for a refresh to happen after indexing (default is false)
127132
* @return
128133
* a Future that completes with true if the entity was indexed successfully, false otherwise
129134
*/
@@ -132,7 +137,7 @@ trait IndexApi extends ElasticClientHelpers { _: SettingsApi with SerializationA
132137
id: String,
133138
index: Option[String] = None,
134139
maybeType: Option[String] = None,
135-
wait: Boolean
140+
wait: Boolean = false
136141
)(implicit
137142
u: ClassTag[U],
138143
ec: ExecutionContext,
@@ -159,11 +164,11 @@ trait IndexApi extends ElasticClientHelpers { _: SettingsApi with SerializationA
159164
* @param source
160165
* - the source of the entity to index in JSON format
161166
* @param wait
162-
* - whether to wait for a refresh to happen after indexing
167+
* - whether to wait for a refresh to happen after indexing (default is false)
163168
* @return
164169
* a Future that completes with true if the entity was indexed successfully, false otherwise
165170
*/
166-
def indexAsync(index: String, id: String, source: String, wait: Boolean)(implicit
171+
def indexAsync(index: String, id: String, source: String, wait: Boolean = false)(implicit
167172
ec: ExecutionContext
168173
): Future[ElasticResult[Boolean]] = {
169174
validateIndexName(index) match {
@@ -185,7 +190,7 @@ trait IndexApi extends ElasticClientHelpers { _: SettingsApi with SerializationA
185190

186191
val promise: Promise[ElasticResult[Boolean]] = Promise()
187192

188-
// if wait for next refresh is enabled, we should make sure that the refresh is enabled (different to -1) and its interval not too high
193+
// if wait for next refresh is enabled, we should make sure that the refresh is enabled (different to -1)
189194
val waitEnabled = wait && isRefreshEnabled(index).getOrElse(false)
190195
executeIndexAsync(index, id, source, waitEnabled) onComplete {
191196
case scala.util.Success(result) =>

core/src/main/scala/app/softnetwork/elastic/client/UpdateApi.scala

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import scala.util.{Failure, Success}
3030

3131
/** Update Management API
3232
*/
33-
trait UpdateApi extends ElasticClientHelpers { _: RefreshApi with SerializationApi =>
33+
trait UpdateApi extends ElasticClientHelpers { _: SettingsApi with SerializationApi =>
3434

3535
// ========================================================================
3636
// PUBLIC METHODS
@@ -45,10 +45,18 @@ trait UpdateApi extends ElasticClientHelpers { _: RefreshApi with SerializationA
4545
* - the source of the entity to update in JSON format
4646
* @param upsert
4747
* - true to upsert the entity if it does not exist, false otherwise
48+
* @param wait
49+
* - whether to wait for a refresh to happen after updating (default is false)
4850
* @return
4951
* true if the entity was updated successfully, false otherwise
5052
*/
51-
def update(index: String, id: String, source: String, upsert: Boolean): ElasticResult[Boolean] = {
53+
def update(
54+
index: String,
55+
id: String,
56+
source: String,
57+
upsert: Boolean,
58+
wait: Boolean = false
59+
): ElasticResult[Boolean] = {
5260
validateIndexName(index) match {
5361
case Some(error) =>
5462
return ElasticResult.failure(
@@ -77,10 +85,12 @@ trait UpdateApi extends ElasticClientHelpers { _: RefreshApi with SerializationA
7785

7886
logger.debug(s"Updating document with id '$id' in index '$index'")
7987

80-
executeUpdate(index, id, source, upsert) match {
81-
case ElasticSuccess(true) =>
88+
// if wait for next refresh is enabled, we should make sure that the refresh is enabled (different to -1)
89+
val waitEnabled = wait && isRefreshEnabled(index).getOrElse(false)
90+
executeUpdate(index, id, source, upsert, waitEnabled) match {
91+
case success @ ElasticSuccess(true) =>
8292
logger.info(s"✅ Successfully updated document with id '$id' in index '$index'")
83-
this.refresh(index)
93+
success
8494
case ElasticSuccess(false) =>
8595
val error = s"Document with id '$id' in index '$index' not updated"
8696
logger.warn(s"$error")
@@ -110,6 +120,8 @@ trait UpdateApi extends ElasticClientHelpers { _: RefreshApi with SerializationA
110120
* - the type of the entity (default is the entity class name in lowercase)
111121
* @param upsert
112122
* - true to upsert the entity if it does not exist, false otherwise
123+
* @param wait
124+
* - whether to wait for a refresh to happen after updating (default is false)
113125
* @return
114126
* true if the entity was updated successfully, false otherwise
115127
*/
@@ -118,7 +130,8 @@ trait UpdateApi extends ElasticClientHelpers { _: RefreshApi with SerializationA
118130
id: String,
119131
index: Option[String] = None,
120132
maybeType: Option[String] = None,
121-
upsert: Boolean = true
133+
upsert: Boolean = true,
134+
wait: Boolean = false
122135
)(implicit u: ClassTag[U], formats: Formats): ElasticResult[Boolean] = {
123136
val indexType = maybeType.getOrElse(u.runtimeClass.getSimpleName.toLowerCase)
124137
val indexName = index.getOrElse(indexType)
@@ -128,7 +141,7 @@ trait UpdateApi extends ElasticClientHelpers { _: RefreshApi with SerializationA
128141
serialization.write[U](entity)
129142
}
130143
.flatMap { source =>
131-
this.update(indexName, id, source, upsert)
144+
this.update(indexName, id, source, upsert, wait)
132145
}
133146
}
134147

@@ -141,10 +154,18 @@ trait UpdateApi extends ElasticClientHelpers { _: RefreshApi with SerializationA
141154
* - the source of the entity to update in JSON format
142155
* @param upsert
143156
* - true to upsert the entity if it does not exist, false otherwise
157+
* @param wait
158+
* - whether to wait for a refresh to happen after updating (default is false)
144159
* @return
145160
* a Future that completes with true if the entity was updated successfully, false otherwise
146161
*/
147-
def updateAsync(index: String, id: String, source: String, upsert: Boolean)(implicit
162+
def updateAsync(
163+
index: String,
164+
id: String,
165+
source: String,
166+
upsert: Boolean,
167+
wait: Boolean = false
168+
)(implicit
148169
ec: ExecutionContext
149170
): Future[ElasticResult[Boolean]] = {
150171
validateIndexName(index) match {
@@ -179,13 +200,15 @@ trait UpdateApi extends ElasticClientHelpers { _: RefreshApi with SerializationA
179200

180201
logger.debug(s"Updating document with id '$id' in index '$index' asynchronously")
181202

203+
// if wait for next refresh is enabled, we should make sure that the refresh is enabled (different to -1)
204+
val waitEnabled = wait && isRefreshEnabled(index).getOrElse(false)
182205
val promise: Promise[ElasticResult[Boolean]] = Promise()
183-
executeUpdateAsync(index, id, source, upsert) onComplete {
206+
executeUpdateAsync(index, id, source, upsert, waitEnabled) onComplete {
184207
case Success(s) =>
185208
s match {
186-
case _ @ElasticSuccess(true) =>
209+
case success @ ElasticSuccess(true) =>
187210
logger.info(s"✅ Successfully updated document with id '$id' in index '$index'")
188-
promise.success(this.refresh(index))
211+
promise.success(success)
189212
case success @ ElasticSuccess(_) =>
190213
logger.warn(s"❌ Document with id '$id' in index '$index' not updated")
191214
promise.success(success)
@@ -217,6 +240,8 @@ trait UpdateApi extends ElasticClientHelpers { _: RefreshApi with SerializationA
217240
* - the type of the entity (default is the entity class name in lowercase)
218241
* @param upsert
219242
* - true to upsert the entity if it does not exist, false otherwise
243+
* @param wait
244+
* - whether to wait for a refresh to happen after updating (default is false)
220245
* @return
221246
* a Future that completes with true if the entity was updated successfully, false otherwise
222247
*/
@@ -225,7 +250,8 @@ trait UpdateApi extends ElasticClientHelpers { _: RefreshApi with SerializationA
225250
id: String,
226251
index: Option[String] = None,
227252
maybeType: Option[String] = None,
228-
upsert: Boolean = true
253+
upsert: Boolean = true,
254+
wait: Boolean = false
229255
)(implicit
230256
u: ClassTag[U],
231257
ec: ExecutionContext,
@@ -240,7 +266,7 @@ trait UpdateApi extends ElasticClientHelpers { _: RefreshApi with SerializationA
240266
case failure @ ElasticFailure(_) =>
241267
logger.error(s"❌ Failed to serialize entity for update in index '$indexName'")
242268
Future.successful(failure)
243-
case ElasticSuccess(source) => this.updateAsync(indexName, id, source, upsert)
269+
case ElasticSuccess(source) => this.updateAsync(indexName, id, source, upsert, wait)
244270
}
245271
}
246272

@@ -252,13 +278,15 @@ trait UpdateApi extends ElasticClientHelpers { _: RefreshApi with SerializationA
252278
index: String,
253279
id: String,
254280
source: String,
255-
upsert: Boolean
281+
upsert: Boolean,
282+
wait: Boolean
256283
): ElasticResult[Boolean]
257284

258285
private[client] def executeUpdateAsync(
259286
index: String,
260287
id: String,
261288
source: String,
262-
upsert: Boolean
289+
upsert: Boolean,
290+
wait: Boolean
263291
)(implicit ec: ExecutionContext): Future[ElasticResult[Boolean]]
264292
}

0 commit comments

Comments
 (0)