Skip to content

Commit e3e8ed1

Browse files
committed
fix inner hits serach, update specifications
1 parent 32f2709 commit e3e8ed1

File tree

9 files changed

+280
-55
lines changed

9 files changed

+280
-55
lines changed

.github/workflows/release.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ jobs:
4848
- name: Upload coverage to Codecov
4949
uses: codecov/codecov-action@v3
5050
with:
51-
files: sql/target/scala-2.13/coverage-report/cobertura.xml,client/testkit/target/scala-2.13/coverage-report/cobertura.xml,java/testkit/target/scala-2.13/coverage-report/cobertura.xml,persistence/target/scala-2.13/coverage-report/cobertura.xml,teskit/target/scala-2.13/coverage-report/cobertura.xml
51+
files: sql/target/scala-2.13/coverage-report/cobertura.xml,client/testkit/target/scala-2.13/coverage-report/cobertura.xml,persistence/target/scala-2.13/coverage-report/cobertura.xml,java/testkit/target/scala-2.13/coverage-report/cobertura.xml,teskit/target/scala-2.13/coverage-report/cobertura.xml
5252
flags: unittests
5353
fail_ci_if_error: false
5454
verbose: true

client/src/main/scala/app/softnetwork/elastic/client/ElasticClientApi.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -552,7 +552,17 @@ trait SearchApi {
552552

553553
def search[U](jsonQuery: JSONQuery)(implicit m: Manifest[U], formats: Formats): List[U]
554554

555-
def search[U](sqlQuery: SQLQuery)(implicit m: Manifest[U], formats: Formats): List[U]
555+
def search[U](sqlQuery: SQLQuery)(implicit m: Manifest[U], formats: Formats): List[U] = {
556+
sqlQuery.search match {
557+
case Some(searchRequest) =>
558+
val indices = collection.immutable.Seq(searchRequest.sources: _*)
559+
search[U](JSONQuery(searchRequest.query, indices))(m, formats)
560+
case None =>
561+
throw new IllegalArgumentException(
562+
s"SQL query ${sqlQuery.query} does not contain a valid search request"
563+
)
564+
}
565+
}
556566

557567
def searchAsync[U](
558568
sqlQuery: SQLQuery
@@ -569,7 +579,7 @@ trait SearchApi {
569579
case Some(searchRequest) =>
570580
val indices = collection.immutable.Seq(searchRequest.sources: _*)
571581
val jsonQuery = JSONQuery(searchRequest.query, indices)
572-
searchWithInnerHits(jsonQuery, innerField)
582+
searchWithInnerHits(jsonQuery, innerField)(m1, m2, formats)
573583
case None =>
574584
throw new IllegalArgumentException(
575585
s"SQL query ${sqlQuery.query} does not contain a valid search request"

java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientApi.scala

Lines changed: 78 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import co.elastic.clients.elasticsearch.core._
2121
import co.elastic.clients.elasticsearch.core.search.SearchRequestBody
2222
import co.elastic.clients.elasticsearch.indices.update_aliases.{Action, AddAction, RemoveAction}
2323
import co.elastic.clients.elasticsearch.indices._
24-
import com.google.gson.JsonParser
24+
import com.google.gson.{Gson, JsonParser}
2525

2626
import _root_.java.io.StringReader
2727
import _root_.java.util.{Map => JMap}
@@ -129,8 +129,7 @@ trait ElasticsearchClientSettingsApi extends SettingsApi with ElasticsearchClien
129129
.getSettings(
130130
new GetIndicesSettingsRequest.Builder().index("*").build()
131131
)
132-
.toString
133-
settings.substring(settings.indexOf(':') + 1).trim
132+
extractSource(settings).getOrElse("")
134133
}
135134
}
136135

@@ -150,8 +149,7 @@ trait ElasticsearchClientMappingApi extends MappingApi with ElasticsearchClientC
150149
.getMapping(
151150
new GetMappingRequest.Builder().index(index).build()
152151
)
153-
.toString
154-
mapping.substring(mapping.indexOf(':') + 1).trim
152+
extractSource(mapping).getOrElse("")
155153
}
156154
}
157155

@@ -477,7 +475,7 @@ trait ElasticsearchClientGetApi extends GetApi with ElasticsearchClientCompanion
477475
case Success(response) =>
478476
if (response.found()) {
479477
val source = mapper.writeValueAsString(response.source())
480-
logger.info(s"Deserializing response $source for id: $id, index: ${index
478+
logger.whenDebugEnabled(s"Deserializing response $source for id: $id, index: ${index
481479
.getOrElse("default")}, type: ${maybeType.getOrElse("_all")}")
482480
// Deserialize the source string to the expected type
483481
// Note: This assumes that the source is a valid JSON representation of U
@@ -528,7 +526,7 @@ trait ElasticsearchClientGetApi extends GetApi with ElasticsearchClientCompanion
528526
).flatMap {
529527
case response if response.found() =>
530528
val source = mapper.writeValueAsString(response.source())
531-
logger.info(s"Deserializing response $source for id: $id, index: ${index
529+
logger.whenDebugEnabled(s"Deserializing response $source for id: $id, index: ${index
532530
.getOrElse("default")}, type: ${maybeType.getOrElse("_all")}")
533531
// Deserialize the source string to the expected type
534532
// Note: This assumes that the source is a valid JSON representation of U
@@ -571,29 +569,22 @@ trait ElasticsearchClientSearchApi extends SearchApi with ElasticsearchClientCom
571569
.hits()
572570
.hits()
573571
.asScala
574-
.map { hit =>
572+
.flatMap { hit =>
575573
val source = mapper.writeValueAsString(hit.source())
576-
logger.info(s"Deserializing hit: $source")
577-
serialization.read[U](source)
574+
logger.whenDebugEnabled(s"Deserializing hit: $source")
575+
Try(serialization.read[U](source)).toOption.orElse {
576+
logger.error(
577+
s"Failed to deserialize hit: $source"
578+
)
579+
None
580+
}
578581
}
579582
.toList
580583
} else {
581584
List.empty[U]
582585
}
583586
}
584587

585-
override def search[U](sqlQuery: SQLQuery)(implicit m: Manifest[U], formats: Formats): List[U] = {
586-
sqlQuery.search match {
587-
case Some(searchRequest) =>
588-
val indices = collection.immutable.Seq(searchRequest.sources: _*)
589-
search[U](JSONQuery(searchRequest.query, indices))
590-
case None =>
591-
throw new IllegalArgumentException(
592-
s"SQL query ${sqlQuery.query} does not contain a valid search request"
593-
)
594-
}
595-
}
596-
597588
override def searchAsync[U](
598589
sqlQuery: SQLQuery
599590
)(implicit m: Manifest[U], ec: ExecutionContext, formats: Formats): Future[List[U]] = {
@@ -618,7 +609,7 @@ trait ElasticsearchClientSearchApi extends SearchApi with ElasticsearchClientCom
618609
.asScala
619610
.map { hit =>
620611
val source = mapper.writeValueAsString(hit.source())
621-
logger.info(s"Deserializing hit: $source")
612+
logger.whenDebugEnabled(s"Deserializing hit: $source")
622613
serialization.read[U](source)
623614
}
624615
.toList
@@ -644,20 +635,70 @@ trait ElasticsearchClientSearchApi extends SearchApi with ElasticsearchClientCom
644635
formats: Formats
645636
): List[(U, List[I])] = {
646637
import jsonQuery._
647-
val response = apply().search(
648-
new SearchRequest.Builder()
649-
.index(indices.asJava)
650-
.withJson(
651-
new StringReader(query)
652-
)
653-
.build(),
654-
classOf[JMap[String, Object]]
655-
)
656-
Try(new JsonParser().parse(response.toString).getAsJsonObject ~> [U, I] innerField) match {
657-
case Success(s) => s
658-
case Failure(f) =>
659-
logger.error(f.getMessage, f)
660-
List.empty
638+
logger.info(s"Searching with query: $query on indices: ${indices.mkString(", ")}")
639+
val response = apply()
640+
.search(
641+
new SearchRequest.Builder()
642+
.index(indices.asJava)
643+
.withJson(
644+
new StringReader(query)
645+
)
646+
.build(),
647+
classOf[JMap[String, Object]]
648+
)
649+
val results = response
650+
.hits()
651+
.hits()
652+
.asScala
653+
.toList
654+
if (results.nonEmpty) {
655+
results.flatMap { hit =>
656+
val hitSource = hit.source()
657+
Option(hitSource)
658+
.map(mapper.writeValueAsString)
659+
.flatMap { source =>
660+
logger.whenDebugEnabled(s"Deserializing hit: $source")
661+
Try(serialization.read[U](source)) match {
662+
case Success(mainObject) =>
663+
Some(mainObject)
664+
case Failure(f) =>
665+
logger.error(
666+
s"Failed to deserialize hit: $source for query: $query on indices: ${indices.mkString(", ")}",
667+
f
668+
)
669+
None
670+
}
671+
}
672+
.map { mainObject =>
673+
val innerHits = hit
674+
.innerHits()
675+
.asScala
676+
.get(innerField)
677+
.map(_.hits().hits().asScala.toList)
678+
.getOrElse(Nil)
679+
val innerObjects = innerHits.flatMap { innerHit =>
680+
extractSource(innerHit) match {
681+
case Some(innerSource) =>
682+
logger.whenDebugEnabled(s"Processing inner hit: $innerSource")
683+
val json = new JsonParser().parse(innerSource).getAsJsonObject
684+
val gson = new Gson()
685+
Try(serialization.read[I](gson.toJson(json.get("_source")))) match {
686+
case Success(innerObject) => Some(innerObject)
687+
case Failure(f) =>
688+
logger.error(s"Failed to deserialize inner hit: $innerSource", f)
689+
None
690+
}
691+
case None =>
692+
logger.warn("Could not extract inner hit source from string representation")
693+
None
694+
}
695+
}
696+
(mainObject, innerObjects)
697+
}
698+
}
699+
} else {
700+
logger.warn(s"No hits found for query: $query on indices: ${indices.mkString(", ")}")
701+
List.empty[(U, List[I])]
661702
}
662703
}
663704

java/src/main/scala/app/softnetwork/elastic/client/java/ElasticsearchClientCompanion.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import co.elastic.clients.elasticsearch.{ElasticsearchAsyncClient, Elasticsearch
55
import co.elastic.clients.json.jackson.JacksonJsonpMapper
66
import co.elastic.clients.transport.rest_client.RestClientTransport
77
import com.fasterxml.jackson.databind.ObjectMapper
8+
import com.fasterxml.jackson.module.scala.ClassTagExtensions
89
import com.typesafe.scalalogging.StrictLogging
910
import org.apache.http.HttpHost
1011
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
@@ -23,7 +24,7 @@ trait ElasticsearchClientCompanion extends StrictLogging {
2324

2425
private var asyncClient: Option[ElasticsearchAsyncClient] = None
2526

26-
lazy val mapper = new ObjectMapper()
27+
lazy val mapper: ObjectMapper with ClassTagExtensions = new ObjectMapper() with ClassTagExtensions
2728

2829
def transport: RestClientTransport = {
2930
val credentialsProvider = new BasicCredentialsProvider()
@@ -75,4 +76,11 @@ trait ElasticsearchClientCompanion extends StrictLogging {
7576
promise.future
7677
}
7778

79+
protected def extractSource(value: AnyRef): Option[String] = {
80+
val s = value.toString
81+
val idx = s.indexOf(':')
82+
if (idx >= 0 && idx + 1 < s.length) Some(s.substring(idx + 1).trim)
83+
else None
84+
}
85+
7886
}

testkit/src/main/scala/app/softnetwork/elastic/scalatest/ElasticDockerTestKit.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package app.softnetwork.elastic.scalatest
22

33
import org.scalatest.Suite
44
import org.testcontainers.containers.BindMode
5-
import org.testcontainers.containers.wait.strategy.Wait
5+
//import org.testcontainers.containers.wait.strategy.Wait
66
import org.testcontainers.elasticsearch.ElasticsearchContainer
77
import org.testcontainers.utility.DockerImageName
88

0 commit comments

Comments
 (0)