Skip to content

Commit 512f823

Browse files
committed
extract all apis to use ElasticResult + update jest implementation accordingly
1 parent 5204a3e commit 512f823

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+9759
-3214
lines changed
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
/*
2+
* Copyright 2025 SOFTNETWORK
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package app.softnetwork.elastic.client
18+
19+
import app.softnetwork.elastic.client.result.{ElasticFailure, ElasticResult, ElasticSuccess}
20+
import app.softnetwork.elastic.sql.query.{SQLAggregation, SQLQuery}
21+
22+
import java.time.temporal.Temporal
23+
import scala.annotation.tailrec
24+
import scala.concurrent.{ExecutionContext, Future}
25+
26+
/** Aggregate API for Elasticsearch clients.
27+
*
28+
* @tparam T
29+
* - the type of aggregate result
30+
*/
31+
trait AggregateApi[T <: AggregateResult] {
32+
33+
/** Aggregate the results of the given SQL query.
34+
*
35+
* @param sqlQuery
36+
* - the query to aggregate the results for
37+
* @return
38+
* a sequence of aggregated results
39+
*/
40+
def aggregate(sqlQuery: SQLQuery)(implicit
41+
ec: ExecutionContext
42+
): Future[ElasticResult[_root_.scala.collection.Seq[T]]]
43+
}
44+
45+
/** Aggregate API for single value aggregate results.
46+
*/
47+
trait SingleValueAggregateApi
48+
extends AggregateApi[SingleValueAggregateResult]
49+
with ElasticConversion {
50+
_: SearchApi =>
51+
52+
/** Aggregate the results of the given SQL query.
53+
*
54+
* @param sqlQuery
55+
* - the query to aggregate the results for
56+
* @return
57+
* a sequence of aggregated results
58+
*/
59+
override def aggregate(
60+
sqlQuery: SQLQuery
61+
)(implicit
62+
ec: ExecutionContext
63+
): Future[ElasticResult[_root_.scala.collection.Seq[SingleValueAggregateResult]]] = {
64+
Future {
65+
@tailrec
66+
def findAggregation(
67+
name: String,
68+
aggregation: SQLAggregation,
69+
results: Map[String, Any]
70+
): Option[Any] = {
71+
name.split("\\.") match {
72+
case Array(_, tail @ _*) if tail.nonEmpty =>
73+
findAggregation(
74+
tail.mkString("."),
75+
aggregation,
76+
results
77+
)
78+
case _ => results.get(name)
79+
}
80+
}
81+
82+
@tailrec
83+
def getAggregateValue(s: Seq[_], distinct: Boolean): AggregateValue = {
84+
if (s.isEmpty) return EmptyValue
85+
86+
s.headOption match {
87+
case Some(_: Boolean) =>
88+
val values = s.asInstanceOf[Seq[Boolean]]
89+
ArrayOfBooleanValue(if (distinct) values.distinct else values)
90+
91+
case Some(_: Number) =>
92+
val values = s.asInstanceOf[Seq[Number]]
93+
ArrayOfNumericValue(if (distinct) values.distinct else values)
94+
95+
case Some(_: Temporal) =>
96+
val values = s.asInstanceOf[Seq[Temporal]]
97+
ArrayOfTemporalValue(if (distinct) values.distinct else values)
98+
99+
case Some(_: String) =>
100+
val values = s.map(_.toString)
101+
ArrayOfStringValue(if (distinct) values.distinct else values)
102+
103+
case Some(_: Map[_, _]) =>
104+
val typedMaps = s.asInstanceOf[Seq[Map[String, Any]]]
105+
val metadataKeys = Set("_id", "_index", "_score", "_sort")
106+
107+
// Check if all maps have the same single non-metadata key
108+
val nonMetadataKeys = typedMaps.flatMap(_.keys.filterNot(metadataKeys.contains))
109+
val uniqueKeys = nonMetadataKeys.distinct
110+
111+
if (uniqueKeys.size == 1) {
112+
// Extract values from the single key
113+
val key = uniqueKeys.head
114+
val extractedValues = typedMaps.flatMap(_.get(key))
115+
getAggregateValue(extractedValues, distinct)
116+
} else {
117+
// Multiple keys: return as object array
118+
val cleanMaps = typedMaps.map(m =>
119+
m.filterNot(k => metadataKeys.contains(k.toString))
120+
.map(kv => kv._1 -> kv._2)
121+
)
122+
ArrayOfObjectValue(if (distinct) cleanMaps.distinct else cleanMaps)
123+
}
124+
125+
case Some(_: Seq[_]) =>
126+
// Handle nested sequences (flatten them)
127+
getAggregateValue(s.asInstanceOf[Seq[Seq[_]]].flatten, distinct)
128+
129+
case _ => EmptyValue
130+
}
131+
}
132+
// Execute the search
133+
search(sqlQuery)
134+
.flatMap { response =>
135+
// Parse the response
136+
val parseResult = ElasticResult.fromTry(parseResponse(response))
137+
138+
parseResult match {
139+
// Case 1: Parse successful - process the results
140+
case ElasticSuccess(results) =>
141+
val aggregationResults = results.flatMap { result =>
142+
response.aggregations.map { case (name, aggregation) =>
143+
// Attempt to process each aggregation
144+
val aggregationResult = ElasticResult.attempt {
145+
val value = findAggregation(name, aggregation, result).orNull match {
146+
case b: Boolean => BooleanValue(b)
147+
case n: Number => NumericValue(n)
148+
case s: String => StringValue(s)
149+
case t: Temporal => TemporalValue(t)
150+
case m: Map[_, Any] => ObjectValue(m.map(kv => kv._1.toString -> kv._2))
151+
case s: Seq[_] if aggregation.multivalued =>
152+
getAggregateValue(s, aggregation.distinct)
153+
case _ => EmptyValue
154+
}
155+
156+
SingleValueAggregateResult(name, aggregation.aggType, value)
157+
}
158+
159+
// Convert failures to results with errors
160+
aggregationResult match {
161+
case ElasticSuccess(result) => result
162+
case ElasticFailure(error) =>
163+
SingleValueAggregateResult(
164+
name,
165+
aggregation.aggType,
166+
EmptyValue,
167+
error = Some(s"Failed to process aggregation: ${error.message}")
168+
)
169+
}
170+
}.toSeq
171+
}
172+
173+
ElasticResult.success(aggregationResults)
174+
175+
// Case 2: Parse failed - returning empty results with errors
176+
case ElasticFailure(error) =>
177+
val errorResults = response.aggregations.map { case (name, aggregation) =>
178+
SingleValueAggregateResult(
179+
name,
180+
aggregation.aggType,
181+
EmptyValue,
182+
error = Some(s"Parse error: ${error.message}")
183+
)
184+
}.toSeq
185+
186+
ElasticResult.success(errorResults)
187+
}
188+
}
189+
.fold(
190+
// If search() fails, throw an exception
191+
onFailure = error => {
192+
throw new IllegalArgumentException(
193+
s"Failed to execute search for SQL query: ${sqlQuery.query}",
194+
error.cause.orNull
195+
)
196+
},
197+
onSuccess = results => ElasticResult.success(results)
198+
)
199+
}
200+
}
201+
}

0 commit comments

Comments
 (0)