Skip to content

Commit b229ba1

Browse files
committed
add SQL Match ... Against, update SQL having and group by, update aggregations
1 parent efbb316 commit b229ba1

File tree

17 files changed

+495
-346
lines changed

17 files changed

+495
-346
lines changed

es6/sql-bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/ElasticAggregation.scala

Lines changed: 59 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import com.sksamuel.elastic4s.ElasticApi.{
2323
termsAgg,
2424
valueCountAgg
2525
}
26-
import com.sksamuel.elastic4s.searches.aggs.Aggregation
26+
import com.sksamuel.elastic4s.searches.aggs.{Aggregation, FilterAggregation, NestedAggregation}
2727

2828
import scala.language.implicitConversions
2929

@@ -34,11 +34,14 @@ case class ElasticAggregation(
3434
sources: Seq[String] = Seq.empty,
3535
query: Option[String] = None,
3636
distinct: Boolean = false,
37-
nested: Boolean = false,
38-
filtered: Boolean = false,
37+
nestedAgg: Option[NestedAggregation] = None,
38+
filteredAgg: Option[FilterAggregation] = None,
3939
aggType: AggregateFunction,
4040
agg: Aggregation
41-
)
41+
) {
42+
val nested: Boolean = nestedAgg.nonEmpty
43+
val filtered: Boolean = filteredAgg.nonEmpty
44+
}
4245

4346
object ElasticAggregation {
4447
def apply(sqlAgg: SQLField, filter: Option[SQLCriteria]): ElasticAggregation = {
@@ -52,71 +55,82 @@ object ElasticAggregation {
5255

5356
val distinct = identifier.distinct
5457

55-
val agg =
56-
if (distinct)
57-
s"${function}_distinct_${sourceField.replace(".", "_")}"
58-
else
59-
s"${function}_${sourceField.replace(".", "_")}"
60-
61-
var aggPath = Seq[String]()
62-
6358
val aggType = aggregateFunction.getOrElse(
6459
throw new IllegalArgumentException("Aggregation function is required")
6560
)
6661

62+
val aggName = {
63+
if (fieldAlias.isDefined)
64+
field
65+
else if (distinct)
66+
s"${aggType}_distinct_${sourceField.replace(".", "_")}"
67+
else
68+
s"${aggType}_${sourceField.replace(".", "_")}"
69+
}
70+
71+
var aggPath = Seq[String]()
72+
6773
val _agg =
6874
aggType match {
6975
case Count =>
7076
if (distinct)
71-
cardinalityAgg(agg, sourceField)
77+
cardinalityAgg(aggName, sourceField)
7278
else {
73-
valueCountAgg(agg, sourceField)
79+
valueCountAgg(aggName, sourceField)
7480
}
75-
case Min => minAgg(agg, sourceField)
76-
case Max => maxAgg(agg, sourceField)
77-
case Avg => avgAgg(agg, sourceField)
78-
case Sum => sumAgg(agg, sourceField)
81+
case Min => minAgg(aggName, sourceField)
82+
case Max => maxAgg(aggName, sourceField)
83+
case Avg => avgAgg(aggName, sourceField)
84+
case Sum => sumAgg(aggName, sourceField)
7985
}
8086

81-
def _filtered: Aggregation = filter match {
82-
case Some(f) =>
83-
val boolQuery = Option(ElasticBoolQuery(group = true))
84-
val filteredAgg = s"filtered_agg"
85-
aggPath ++= Seq(filteredAgg)
86-
filterAgg(
87-
filteredAgg,
88-
f.asFilter(boolQuery)
89-
.query(Set(identifier.innerHitsName).flatten, boolQuery)
90-
) subaggs {
91-
aggPath ++= Seq(agg)
92-
_agg
93-
}
94-
case _ =>
95-
aggPath ++= Seq(agg)
96-
_agg
97-
}
87+
val filteredAggName = "filtered_agg"
88+
89+
val filteredAgg: Option[FilterAggregation] =
90+
filter match {
91+
case Some(f) =>
92+
val boolQuery = Option(ElasticBoolQuery(group = true))
93+
Some(
94+
filterAgg(
95+
filteredAggName,
96+
f.asFilter(boolQuery)
97+
.query(Set(identifier.innerHitsName).flatten, boolQuery)
98+
)
99+
)
100+
case _ =>
101+
None
102+
}
103+
104+
def filtered(): Unit =
105+
filteredAgg match {
106+
case Some(_) =>
107+
aggPath ++= Seq(filteredAggName)
108+
aggPath ++= Seq(aggName)
109+
case _ =>
110+
aggPath ++= Seq(aggName)
111+
}
98112

99-
val aggregation =
113+
val nestedAgg =
100114
if (identifier.nested) {
101115
val path = sourceField.split("\\.").head
102-
val nestedAgg = s"nested_$agg"
116+
val nestedAgg = s"nested_${identifier.nestedType.getOrElse(aggName)}"
103117
aggPath ++= Seq(nestedAgg)
104-
nestedAggregation(nestedAgg, path) subaggs {
105-
_filtered
106-
}
118+
filtered()
119+
Some(nestedAggregation(nestedAgg, path))
107120
} else {
108-
_filtered
121+
filtered()
122+
None
109123
}
110124

111125
ElasticAggregation(
112126
aggPath.mkString("."),
113127
field,
114128
sourceField,
115129
distinct = distinct,
116-
nested = identifier.nested,
117-
filtered = filter.nonEmpty,
118-
aggType = aggType, // TODO remove aggType by parsing it from agg
119-
agg = aggregation
130+
nestedAgg = nestedAgg,
131+
filteredAgg = filteredAgg,
132+
aggType = aggType,
133+
agg = _agg
120134
)
121135
}
122136

es6/sql-bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/ElasticSearchRequest.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package app.softnetwork.elastic.sql.bridge
22

3-
import app.softnetwork.elastic.sql.{SQLCriteria, SQLExcept, SQLField}
3+
import app.softnetwork.elastic.sql.{SQLBucket, SQLCriteria, SQLExcept, SQLField}
44
import com.sksamuel.elastic4s.searches.SearchRequest
55
import com.sksamuel.elastic4s.http.search.SearchBodyBuilderFn
66

@@ -11,6 +11,7 @@ case class ElasticSearchRequest(
1111
criteria: Option[SQLCriteria],
1212
limit: Option[Int],
1313
search: SearchRequest,
14+
buckets: Seq[SQLBucket] = Seq.empty,
1415
aggregations: Seq[ElasticAggregation] = Seq.empty
1516
) {
1617
def minScore(score: Option[Double]): ElasticSearchRequest = {

es6/sql-bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/package.scala

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import com.sksamuel.elastic4s.ElasticApi
44
import com.sksamuel.elastic4s.ElasticApi._
55
import com.sksamuel.elastic4s.http.ElasticDsl.BuildableTermsNoOp
66
import com.sksamuel.elastic4s.http.search.SearchBodyBuilderFn
7+
import com.sksamuel.elastic4s.searches.aggs.Aggregation
78
import com.sksamuel.elastic4s.searches.queries.Query
89
import com.sksamuel.elastic4s.searches.{MultiSearchRequest, SearchRequest}
910
import com.sksamuel.elastic4s.searches.sort.FieldSort
@@ -19,12 +20,16 @@ package object bridge {
1920
request.where.flatMap(_.criteria),
2021
request.limit.map(_.limit),
2122
request,
22-
request.aggregates.map(ElasticAggregation(_, None))
23+
request.buckets,
24+
request.aggregates.map(ElasticAggregation(_, request.having.flatMap(_.criteria)))
2325
).minScore(request.score)
2426

2527
implicit def requestToSearchRequest(request: SQLSearchRequest): SearchRequest = {
2628
import request._
27-
val aggregations = aggregates.map(ElasticAggregation(_, None))
29+
val aggregations = aggregates.map(ElasticAggregation(_, request.having.flatMap(_.criteria)))
30+
val notNestedAggregations = aggregations.filterNot(_.nested)
31+
val nestedAggregations =
32+
aggregations.filter(_.nested).groupBy(_.nestedAgg.map(_.name).getOrElse(""))
2833
var _search: SearchRequest = search("") query {
2934
where.flatMap(_.criteria.map(_.asQuery())).getOrElse(matchAllQuery())
3035
} sourceInclude fields
@@ -34,9 +39,22 @@ package object bridge {
3439
case excludes => _search sourceExclude excludes
3540
}
3641

37-
_search = aggregations match {
42+
_search = if (nestedAggregations.nonEmpty) {
43+
_search aggregations {
44+
nestedAggregations.map { case (_, aggs) =>
45+
val first = aggs.head
46+
val filtered: Option[Aggregation] =
47+
first.filteredAgg.map(filtered => filtered.subAggregations(aggs.map(_.agg)))
48+
first.nestedAgg.get.subAggregations(filtered.map(Seq(_)).getOrElse(aggs.map(_.agg)))
49+
}
50+
}
51+
} else {
52+
_search
53+
}
54+
55+
_search = notNestedAggregations match {
3856
case Nil => _search
39-
case _ => _search aggregations { aggregations.map(_.agg) }
57+
case _ => _search aggregations { notNestedAggregations.map(_.agg) }
4058
}
4159

4260
_search = orderBy match {
@@ -272,7 +290,7 @@ package object bridge {
272290
.map {
273291
case Left(l) =>
274292
l.aggregates
275-
.map(ElasticAggregation(_, None))
293+
.map(ElasticAggregation(_, l.having.flatMap(_.criteria)))
276294
.map(aggregation => {
277295
val queryFiltered =
278296
l.where
@@ -295,7 +313,15 @@ package object bridge {
295313
queryFiltered
296314
}
297315
aggregations {
298-
aggregation.agg
316+
val filtered =
317+
aggregation.filteredAgg match {
318+
case Some(filtered) => filtered.subAggregations(aggregation.agg)
319+
case _ => aggregation.agg
320+
}
321+
aggregation.nestedAgg match {
322+
case Some(nested) => nested.subAggregations(filtered)
323+
case _ => filtered
324+
}
299325
}
300326
size 0
301327
)

es6/sql-bridge/src/test/scala/app/softnetwork/elastic/sql/SQLCriteriaSpec.scala

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -782,10 +782,24 @@ class SQLCriteriaSpec extends AnyFlatSpec with Matchers {
782782
"""{
783783
| "query":{
784784
| "bool":{
785-
| "filter":[
785+
| "should":[
786786
| {
787787
| "match":{
788-
| "identifier":{
788+
| "identifier1":{
789+
| "query":"value"
790+
| }
791+
| }
792+
| },
793+
| {
794+
| "match":{
795+
| "identifier2":{
796+
| "query":"value"
797+
| }
798+
| }
799+
| },
800+
| {
801+
| "match":{
802+
| "identifier3":{
789803
| "query":"value"
790804
| }
791805
| }

0 commit comments

Comments
 (0)