Skip to content

Commit 9bef031

Browse files
committed
add rest high level client helpers, add BulkErrorAnalyzer, update rest high level client implementations to use ElasticResult
1 parent 512f823 commit 9bef031

File tree

9 files changed

+2691
-1424
lines changed

9 files changed

+2691
-1424
lines changed

core/src/main/scala/app/softnetwork/elastic/client/MappingApi.scala

Lines changed: 2 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -106,34 +106,7 @@ trait MappingApi extends ElasticClientHelpers { _: SettingsApi with IndicesApi w
106106

107107
logger.debug(s"Getting mapping for index '$index'")
108108

109-
executeGetMapping(index).flatMap { jsonString =>
110-
// ✅ Extracting mapping from JSON
111-
ElasticResult.attempt(
112-
new JsonParser().parse(jsonString).getAsJsonObject
113-
) match {
114-
case ElasticFailure(error) =>
115-
logger.error(s"❌ Failed to parse JSON mapping for index '$index': ${error.message}")
116-
return ElasticFailure(error.copy(operation = Some("getMapping"), index = Some(index)))
117-
case ElasticSuccess(indexObj) =>
118-
if (Option(indexObj).isDefined && indexObj.has(index)) {
119-
val settingsObj = indexObj
120-
.getAsJsonObject(index)
121-
.getAsJsonObject("mappings")
122-
.getAsJsonObject("_doc")
123-
ElasticSuccess(settingsObj.toString)
124-
} else {
125-
val message = s"Index '$index' not found in the loaded mapping."
126-
logger.error(s"$message")
127-
ElasticFailure(
128-
ElasticError(
129-
message = message,
130-
operation = Some("getMapping"),
131-
index = Some(index)
132-
)
133-
)
134-
}
135-
}
136-
}
109+
executeGetMapping(index)
137110
}
138111

139112
/** Get the mapping properties of an index.
@@ -143,15 +116,7 @@ trait MappingApi extends ElasticClientHelpers { _: SettingsApi with IndicesApi w
143116
* the mapping properties of the index as a JSON string
144117
*/
145118
def getMappingProperties(index: String): ElasticResult[String] =
146-
getMapping(index).flatMap { mappingJson =>
147-
ElasticResult.attempt {
148-
new JsonParser()
149-
.parse(mappingJson)
150-
.getAsJsonObject
151-
.get("mappings")
152-
.toString
153-
}
154-
}
119+
getMapping(index)
155120

156121
/** Check if the mapping of an index is different from the provided mapping.
157122
* @param index

core/src/main/scala/app/softnetwork/elastic/client/bulk/package.scala

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,11 +162,11 @@ package object bulk {
162162
}
163163
}
164164

165-
trait BulkElasticAction { def index: String }
165+
trait BulkElasticAction { def index: String } // TODO rename to BulkItemIndex
166166

167-
trait BulkElasticResult { def items: List[BulkElasticResultItem] }
167+
trait BulkElasticResult { def items: List[BulkElasticResultItem] } // TODO remove
168168

169-
trait BulkElasticResultItem { def index: String }
169+
trait BulkElasticResultItem { def index: String } // TODO remove
170170

171171
case class BulkSettings[A](disableRefresh: Boolean = false)(implicit
172172
settingsApi: SettingsApi,
@@ -211,4 +211,65 @@ package object bulk {
211211

212212
def docAsUpsert(doc: String): String = s"""{"doc":$doc,"doc_as_upsert":true}"""
213213

214+
object BulkErrorAnalyzer {
215+
216+
/** Determines whether a bulk error is retryable based on the status.
217+
*
218+
* @param statusCode
219+
* HTTP error status code
220+
* @return
221+
* true if the error is retryable, false otherwise
222+
*/
223+
def isRetryable(statusCode: Int): Boolean = statusCode match {
224+
// Temporary errors - retryable
225+
case 429 => true // Too Many Requests
226+
case 503 => true // Service Unavailable
227+
case 504 => true // Gateway Timeout
228+
case 408 => true // Request Timeout
229+
case 502 => true // Bad Gateway
230+
231+
// Permanent errors - not retryable
232+
case 400 => false // Bad Request
233+
case 401 => false // Unauthorized
234+
case 403 => false // Forbidden
235+
case 404 => false // Not Found
236+
case 409 => false // Conflict (version)
237+
case 413 => false // Payload Too Large
238+
239+
// By default, 5xx errors (except those listed) are retryable.
240+
case code if code >= 500 && code < 600 => true
241+
242+
// Other 4xx errors are non-retryable
243+
case code if code >= 400 && code < 500 => false
244+
245+
// Status 2xx and 3xx should not be errors
246+
case _ => false
247+
}
248+
249+
/** Determines whether a bulk error is retryable based on the ES error type.
250+
*
251+
* @param errorType
252+
* Elasticsearch error type
253+
* @return
254+
* true if the error is retryable, false otherwise
255+
*/
256+
def isRetryableByType(errorType: String): Boolean = errorType match {
257+
// Retryable errors
258+
case "es_rejected_execution_exception" => true
259+
case "circuit_breaking_exception" => true
260+
case "timeout_exception" => true
261+
case "unavailable_shards_exception" => true
262+
263+
// non-retryable errors
264+
case "mapper_parsing_exception" => false
265+
case "illegal_argument_exception" => false
266+
case "version_conflict_engine_exception" => false
267+
case "document_missing_exception" => false
268+
case "index_not_found_exception" => false
269+
case "strict_dynamic_mapping_exception" => false
270+
271+
// By default, it is considered non-retryable
272+
case _ => false
273+
}
274+
}
214275
}

es6/jest/src/main/scala/app/softnetwork/elastic/client/jest/JestMappingApi.scala

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,13 @@
1717
package app.softnetwork.elastic.client.jest
1818

1919
import app.softnetwork.elastic.client.{IndicesApi, MappingApi, RefreshApi, SettingsApi}
20-
import app.softnetwork.elastic.client.result.ElasticResult
20+
import app.softnetwork.elastic.client.result.{
21+
ElasticError,
22+
ElasticFailure,
23+
ElasticResult,
24+
ElasticSuccess
25+
}
26+
import com.google.gson.JsonParser
2127
import io.searchbox.indices.mapping.{GetMapping, PutMapping}
2228

2329
import scala.util.Try
@@ -49,7 +55,7 @@ trait JestMappingApi extends MappingApi with JestClientHelpers {
4955
index = Some(index),
5056
retryable = true
5157
)(
52-
new GetMapping.Builder().addIndex(index).addType("_doc").build()
58+
new GetMapping.Builder().addIndex(index).build()
5359
) { result =>
5460
result.getJsonString
5561
}
@@ -62,6 +68,33 @@ trait JestMappingApi extends MappingApi with JestClientHelpers {
6268
* the mapping properties of the index as a JSON string
6369
*/
6470
override def getMappingProperties(index: String): ElasticResult[String] = {
65-
getMapping(index)
71+
getMapping(index).flatMap { jsonString =>
72+
// ✅ Extracting mapping from JSON
73+
ElasticResult.attempt(
74+
new JsonParser().parse(jsonString).getAsJsonObject
75+
) match {
76+
case ElasticFailure(error) =>
77+
logger.error(s"❌ Failed to parse JSON mapping for index '$index': ${error.message}")
78+
return ElasticFailure(error.copy(operation = Some("getMapping"), index = Some(index)))
79+
case ElasticSuccess(indexObj) =>
80+
if (Option(indexObj).isDefined && indexObj.has(index)) {
81+
val settingsObj = indexObj
82+
.getAsJsonObject(index)
83+
.getAsJsonObject("mappings")
84+
.getAsJsonObject("_doc")
85+
ElasticSuccess(settingsObj.toString)
86+
} else {
87+
val message = s"Index '$index' not found in the loaded mapping."
88+
logger.error(s"$message")
89+
ElasticFailure(
90+
ElasticError(
91+
message = message,
92+
operation = Some("getMapping"),
93+
index = Some(index)
94+
)
95+
)
96+
}
97+
}
98+
}
6699
}
67100
}

0 commit comments

Comments
 (0)