Skip to content

Commit 9683f1d

Browse files
committed
finalize wait for refresh within index api
1 parent d55d0b5 commit 9683f1d

File tree

13 files changed

+206
-79
lines changed

13 files changed

+206
-79
lines changed

core/src/main/scala/app/softnetwork/elastic/client/BulkApi.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,7 @@ trait BulkApi extends BulkTypes with ElasticClientHelpers {
452452
failed: FailedDocument
453453
)(implicit system: ActorSystem): Future[Boolean] = {
454454
implicit val ec: ExecutionContext = system.dispatcher
455-
indexAsync(failed.index, failed.document, failed.id).flatMap {
455+
indexAsync(failed.index, failed.document, failed.id, wait = false).flatMap {
456456
case ElasticSuccess(true) =>
457457
Future.successful(true)
458458

core/src/main/scala/app/softnetwork/elastic/client/ElasticClientDelegator.scala

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -495,16 +495,19 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
495495
* - the name of the index to index the entity in (default is the entity type name)
496496
* @param maybeType
497497
* - the type of the entity (default is the entity class name in lowercase)
498+
* @param wait
499+
* - whether to wait for a refresh to happen after indexing
498500
* @return
499501
* true if the entity was indexed successfully, false otherwise
500502
*/
501503
override def indexAs[U <: AnyRef](
502504
entity: U,
503505
id: String,
504506
index: Option[String],
505-
maybeType: Option[String]
507+
maybeType: Option[String],
508+
wait: Boolean
506509
)(implicit u: ClassTag[U], formats: Formats): ElasticResult[Boolean] =
507-
delegate.indexAs(entity, id, index, maybeType)
510+
delegate.indexAs(entity, id, index, maybeType, wait)
508511

509512
/** Index an entity in the given index.
510513
*
@@ -517,8 +520,13 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
517520
* @return
518521
* true if the entity was indexed successfully, false otherwise
519522
*/
520-
override def index(index: String, id: String, source: String): ElasticResult[Boolean] =
521-
delegate.index(index, id, source)
523+
override def index(
524+
index: JSONResults,
525+
id: JSONResults,
526+
source: JSONResults,
527+
wait: Boolean
528+
): ElasticResult[Boolean] =
529+
delegate.index(index, id, source, false)
522530

523531
/** Index an entity in the given index asynchronously.
524532
*
@@ -530,20 +538,23 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
530538
* - the name of the index to index the entity in (default is the entity type name)
531539
* @param maybeType
532540
* - the type of the entity (default is the entity class name in lowercase)
541+
* @param wait
542+
* - whether to wait for a refresh to happen after indexing
533543
* @return
534544
* a Future that completes with true if the entity was indexed successfully, false otherwise
535545
*/
536546
override def indexAsyncAs[U <: AnyRef](
537547
entity: U,
538548
id: String,
539549
index: Option[String],
540-
maybeType: Option[String]
550+
maybeType: Option[String],
551+
wait: Boolean
541552
)(implicit
542553
u: ClassTag[U],
543554
ec: ExecutionContext,
544555
formats: Formats
545556
): Future[ElasticResult[Boolean]] =
546-
delegate.indexAsyncAs(entity, id, index, maybeType)
557+
delegate.indexAsyncAs(entity, id, index, maybeType, wait)
547558

548559
/** Index an entity in the given index asynchronously.
549560
*
@@ -553,12 +564,14 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
553564
* - the id of the entity to index
554565
* @param source
555566
* - the source of the entity to index in JSON format
567+
* @param wait
568+
* - whether to wait for a refresh to happen after indexing
556569
* @return
557570
* a Future that completes with true if the entity was indexed successfully, false otherwise
558571
*/
559-
override def indexAsync(index: String, id: String, source: String)(implicit
572+
override def indexAsync(index: String, id: String, source: String, wait: Boolean)(implicit
560573
ec: ExecutionContext
561-
): Future[ElasticResult[Boolean]] = delegate.indexAsync(index, id, source)
574+
): Future[ElasticResult[Boolean]] = delegate.indexAsync(index, id, source, wait)
562575
override private[client] def executeIndex(
563576
index: String,
564577
id: String,
@@ -570,9 +583,10 @@ trait ElasticClientDelegator extends ElasticClientApi with BulkTypes {
570583
override private[client] def executeIndexAsync(
571584
index: String,
572585
id: String,
573-
source: String
586+
source: String,
587+
wait: Boolean
574588
)(implicit ec: ExecutionContext): Future[ElasticResult[Boolean]] =
575-
delegate.executeIndexAsync(index, id, source)
589+
delegate.executeIndexAsync(index, id, source, wait)
576590

577591
// ==================== UpdateApi ====================
578592

core/src/main/scala/app/softnetwork/elastic/client/IndexApi.scala

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import scala.reflect.ClassTag
2929

3030
/** Index Management API
3131
*/
32-
trait IndexApi extends ElasticClientHelpers { _: RefreshApi with SerializationApi =>
32+
trait IndexApi extends ElasticClientHelpers { _: SettingsApi with SerializationApi =>
3333

3434
// ========================================================================
3535
// PUBLIC METHODS
@@ -44,14 +44,17 @@ trait IndexApi extends ElasticClientHelpers { _: RefreshApi with SerializationAp
4444
* - the name of the index to index the entity in (default is the entity type name)
4545
* @param maybeType
4646
* - the type of the entity (default is the entity class name in lowercase)
47+
* @param wait
48+
* - whether to wait for a refresh to happen after indexing
4749
* @return
4850
* true if the entity was indexed successfully, false otherwise
4951
*/
5052
def indexAs[U <: AnyRef](
5153
entity: U,
5254
id: String,
5355
index: Option[String] = None,
54-
maybeType: Option[String] = None
56+
maybeType: Option[String] = None,
57+
wait: Boolean
5558
)(implicit u: ClassTag[U], formats: Formats): ElasticResult[Boolean] = {
5659
val indexType = maybeType.getOrElse(u.runtimeClass.getSimpleName.toLowerCase)
5760
val indexName = index.getOrElse(indexType)
@@ -61,7 +64,7 @@ trait IndexApi extends ElasticClientHelpers { _: RefreshApi with SerializationAp
6164
serialization.write[U](entity)
6265
}
6366
.flatMap { source =>
64-
this.index(indexName, id, source)
67+
this.index(indexName, id, source, wait)
6568
}
6669
}
6770

@@ -72,10 +75,12 @@ trait IndexApi extends ElasticClientHelpers { _: RefreshApi with SerializationAp
7275
* - the id of the entity to index
7376
* @param source
7477
* - the source of the entity to index in JSON format
78+
* @param wait
79+
* - whether to wait for a refresh to happen after indexing
7580
* @return
7681
* true if the entity was indexed successfully, false otherwise
7782
*/
78-
def index(index: String, id: String, source: String): ElasticResult[Boolean] = {
83+
def index(index: String, id: String, source: String, wait: Boolean): ElasticResult[Boolean] = {
7984
validateIndexName(index) match {
8085
case Some(error) =>
8186
return ElasticResult.failure(
@@ -91,7 +96,9 @@ trait IndexApi extends ElasticClientHelpers { _: RefreshApi with SerializationAp
9196

9297
logger.debug(s"Indexing document with id '$id' in index '$index'")
9398

94-
executeIndex(index, id, source, wait=false) match {
99+
// if wait for next refresh is enabled, we should make sure that the refresh is enabled (different to -1) and its interval not too high
100+
val waitEnabled = wait && isRefreshEnabled(index).getOrElse(false)
101+
executeIndex(index, id, source, waitEnabled) match {
95102
case success @ ElasticSuccess(true) =>
96103
logger.info(s"✅ Document with id '$id' indexed successfully in index '$index'")
97104
success
@@ -115,21 +122,32 @@ trait IndexApi extends ElasticClientHelpers { _: RefreshApi with SerializationAp
115122
* - the name of the index to index the entity in (default is the entity type name)
116123
* @param maybeType
117124
* - the type of the entity (default is the entity class name in lowercase)
125+
* @param wait
126+
* - whether to wait for a refresh to happen after indexing
118127
* @return
119128
* a Future that completes with true if the entity was indexed successfully, false otherwise
120129
*/
121130
def indexAsyncAs[U <: AnyRef](
122131
entity: U,
123132
id: String,
124133
index: Option[String] = None,
125-
maybeType: Option[String] = None
134+
maybeType: Option[String] = None,
135+
wait: Boolean
126136
)(implicit
127137
u: ClassTag[U],
128138
ec: ExecutionContext,
129139
formats: Formats
130140
): Future[ElasticResult[Boolean]] = {
131-
Future {
132-
this.indexAs(entity, id, index, maybeType)
141+
val indexType = maybeType.getOrElse(u.runtimeClass.getSimpleName.toLowerCase)
142+
val indexName = index.getOrElse(indexType)
143+
144+
ElasticResult.attempt {
145+
serialization.write[U](entity)
146+
} match {
147+
case failure @ ElasticFailure(_) =>
148+
logger.error(s"❌ Failed to serialize entity for update in index '$indexName'")
149+
Future.successful(failure)
150+
case ElasticSuccess(source) => this.indexAsync(indexName, id, source, wait)
133151
}
134152
}
135153

@@ -140,10 +158,12 @@ trait IndexApi extends ElasticClientHelpers { _: RefreshApi with SerializationAp
140158
* - the id of the entity to index
141159
* @param source
142160
* - the source of the entity to index in JSON format
161+
* @param wait
162+
* - whether to wait for a refresh to happen after indexing
143163
* @return
144164
* a Future that completes with true if the entity was indexed successfully, false otherwise
145165
*/
146-
def indexAsync(index: String, id: String, source: String)(implicit
166+
def indexAsync(index: String, id: String, source: String, wait: Boolean)(implicit
147167
ec: ExecutionContext
148168
): Future[ElasticResult[Boolean]] = {
149169
validateIndexName(index) match {
@@ -165,13 +185,13 @@ trait IndexApi extends ElasticClientHelpers { _: RefreshApi with SerializationAp
165185

166186
val promise: Promise[ElasticResult[Boolean]] = Promise()
167187

168-
executeIndexAsync(index, id, source) onComplete {
188+
// if wait for next refresh is enabled, we should make sure that the refresh is enabled (different to -1) and its interval not too high
189+
val waitEnabled = wait && isRefreshEnabled(index).getOrElse(false)
190+
executeIndexAsync(index, id, source, waitEnabled) onComplete {
169191
case scala.util.Success(result) =>
170192
result match {
171193
case success @ ElasticSuccess(true) =>
172194
logger.info(s"✅ Successfully indexed document with id '$id' in index '$index'")
173-
// Refresh the index to make sure the document is available for search
174-
this.refresh(index)
175195
promise.success(success)
176196
case success @ ElasticSuccess(_) =>
177197
logger.info(s"✅ Document with id '$id' not indexed in index '$index'")
@@ -211,6 +231,7 @@ trait IndexApi extends ElasticClientHelpers { _: RefreshApi with SerializationAp
211231
private[client] def executeIndexAsync(
212232
index: String,
213233
id: String,
214-
source: String
234+
source: String,
235+
wait: Boolean
215236
)(implicit ec: ExecutionContext): Future[ElasticResult[Boolean]]
216237
}

core/src/main/scala/app/softnetwork/elastic/client/SettingsApi.scala

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,53 @@ trait SettingsApi { _: IndicesApi =>
120120
}
121121
}
122122

123+
/** Get the refresh interval of an index.
124+
* @param index
125+
* - the name of the index to get the refresh interval for
126+
* @return
127+
* the refresh interval of the index
128+
*/
129+
def getRefreshInterval(index: String): ElasticResult[String] = {
130+
loadSettings(index).flatMap { settingsJson =>
131+
ElasticResult.attempt(
132+
new JsonParser().parse(settingsJson).getAsJsonObject
133+
) match {
134+
case ElasticFailure(error) =>
135+
logger.error(s"❌ Failed to parse JSON settings for index '$index': ${error.message}")
136+
ElasticFailure(error.copy(operation = Some("getRefreshInterval")))
137+
case ElasticSuccess(settingsObj) =>
138+
if (Option(settingsObj).isDefined && settingsObj.has("refresh_interval")) {
139+
val refreshInterval = settingsObj
140+
.getAsJsonPrimitive("refresh_interval")
141+
.getAsString
142+
ElasticSuccess(refreshInterval)
143+
} else {
144+
val message = s"refresh_interval not found in the settings for index '$index'."
145+
logger.error(s"$message")
146+
ElasticFailure(
147+
ElasticError(
148+
message = message,
149+
operation = Some("getRefreshInterval"),
150+
index = Some(index)
151+
)
152+
)
153+
}
154+
}
155+
}
156+
}
157+
158+
/** Check if the refresh interval is enabled for an index.
159+
* @param index
160+
* - the name of the index to check
161+
* @return
162+
* true if the refresh interval is enabled, false otherwise
163+
*/
164+
def isRefreshEnabled(index: String): ElasticResult[Boolean] = {
165+
getRefreshInterval(index).flatMap { refreshInterval =>
166+
ElasticSuccess(refreshInterval != "-1")
167+
}
168+
}
169+
123170
/** Load the settings of an index.
124171
* @param index
125172
* - the name of the index to load the settings for

core/src/main/scala/app/softnetwork/elastic/client/metrics/MetricsElasticClient.scala

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -340,17 +340,22 @@ class MetricsElasticClient(
340340

341341
// ==================== IndexApi ====================
342342

343-
override def index(index: String, id: String, source: String): ElasticResult[Boolean] = {
343+
override def index(
344+
index: String,
345+
id: String,
346+
source: String,
347+
wait: Boolean
348+
): ElasticResult[Boolean] = {
344349
measureResult("index", Some(index)) {
345-
delegate.index(index, id, source)
350+
delegate.index(index, id, source, wait)
346351
}
347352
}
348353

349-
override def indexAsync(index: String, id: String, source: String)(implicit
354+
override def indexAsync(index: String, id: String, source: String, wait: Boolean)(implicit
350355
ec: ExecutionContext
351356
): Future[ElasticResult[Boolean]] = {
352357
measureAsync("indexAsync", Some(index)) {
353-
delegate.indexAsync(index, id, source)
358+
delegate.indexAsync(index, id, source, wait)
354359
}
355360
}
356361

@@ -364,17 +369,20 @@ class MetricsElasticClient(
364369
* - the name of the index to index the entity in (default is the entity type name)
365370
* @param maybeType
366371
* - the type of the entity (default is the entity class name in lowercase)
372+
* @param wait
373+
* - whether to wait for a refresh to happen after indexing
367374
* @return
368375
* true if the entity was indexed successfully, false otherwise
369376
*/
370377
override def indexAs[U <: AnyRef](
371378
entity: U,
372379
id: String,
373380
index: Option[String],
374-
maybeType: Option[String]
381+
maybeType: Option[String],
382+
wait: Boolean
375383
)(implicit u: ClassTag[U], formats: Formats): ElasticResult[Boolean] =
376384
measureResult("indexAs", index) {
377-
delegate.indexAs(entity, id, index, maybeType)
385+
delegate.indexAs(entity, id, index, maybeType, wait)
378386
}
379387

380388
/** Index an entity in the given index asynchronously.
@@ -387,21 +395,24 @@ class MetricsElasticClient(
387395
* - the name of the index to index the entity in (default is the entity type name)
388396
* @param maybeType
389397
* - the type of the entity (default is the entity class name in lowercase)
398+
* @param wait
399+
* - whether to wait for a refresh to happen after indexing
390400
* @return
391401
* a Future that completes with true if the entity was indexed successfully, false otherwise
392402
*/
393403
override def indexAsyncAs[U <: AnyRef](
394404
entity: U,
395405
id: String,
396406
index: Option[String],
397-
maybeType: Option[String]
407+
maybeType: Option[String],
408+
wait: Boolean
398409
)(implicit
399410
u: ClassTag[U],
400411
ec: ExecutionContext,
401412
formats: Formats
402413
): Future[ElasticResult[Boolean]] =
403414
measureAsync("indexAsyncAs", index) {
404-
delegate.indexAsyncAs(entity, id, index, maybeType)
415+
delegate.indexAsyncAs(entity, id, index, maybeType, wait)
405416
}
406417

407418
// ==================== UpdateApi ====================

0 commit comments

Comments
 (0)