Skip to content

Commit 1ef0c23

Browse files
committed
finalizing implementation of having and order by with aggregates and buckets
1 parent 0ffdd7b commit 1ef0c23

File tree

10 files changed

+501
-200
lines changed

10 files changed

+501
-200
lines changed

es6/sql-bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/ElasticAggregation.scala

Lines changed: 61 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,22 @@ package app.softnetwork.elastic.sql.bridge
22

33
import app.softnetwork.elastic.sql.{
44
AggregateFunction,
5+
Asc,
56
Avg,
7+
BucketSelectorScript,
68
Count,
79
ElasticBoolQuery,
810
Max,
911
Min,
1012
SQLBucket,
1113
SQLCriteria,
1214
SQLField,
15+
SortOrder,
1316
Sum
1417
}
1518
import com.sksamuel.elastic4s.ElasticApi.{
1619
avgAgg,
20+
bucketSelectorAggregation,
1721
cardinalityAgg,
1822
filterAgg,
1923
maxAgg,
@@ -23,11 +27,13 @@ import com.sksamuel.elastic4s.ElasticApi.{
2327
termsAgg,
2428
valueCountAgg
2529
}
30+
import com.sksamuel.elastic4s.script.Script
2631
import com.sksamuel.elastic4s.searches.aggs.{
2732
Aggregation,
2833
FilterAggregation,
2934
NestedAggregation,
30-
TermsAggregation
35+
TermsAggregation,
36+
TermsOrder
3137
}
3238

3339
import scala.language.implicitConversions
@@ -42,17 +48,24 @@ case class ElasticAggregation(
4248
nestedAgg: Option[NestedAggregation] = None,
4349
filteredAgg: Option[FilterAggregation] = None,
4450
aggType: AggregateFunction,
45-
agg: Aggregation
51+
agg: Aggregation,
52+
direction: Option[SortOrder] = None
4653
) {
4754
val nested: Boolean = nestedAgg.nonEmpty
4855
val filtered: Boolean = filteredAgg.nonEmpty
4956
}
5057

5158
object ElasticAggregation {
52-
def apply(sqlAgg: SQLField, filter: Option[SQLCriteria]): ElasticAggregation = {
59+
def apply(
60+
sqlAgg: SQLField,
61+
having: Option[SQLCriteria],
62+
bucketsDirection: Map[String, SortOrder]
63+
): ElasticAggregation = {
5364
import sqlAgg._
5465
val sourceField = identifier.name
5566

67+
val direction = bucketsDirection.get(identifier.identifierName)
68+
5669
val field = fieldAlias match {
5770
case Some(alias) => alias.alias
5871
case _ => sourceField
@@ -92,7 +105,7 @@ object ElasticAggregation {
92105
val filteredAggName = "filtered_agg"
93106

94107
val filteredAgg: Option[FilterAggregation] =
95-
filter match {
108+
having match {
96109
case Some(f) =>
97110
val boolQuery = Option(ElasticBoolQuery(group = true))
98111
Some(
@@ -135,44 +148,60 @@ object ElasticAggregation {
135148
nestedAgg = nestedAgg,
136149
filteredAgg = filteredAgg,
137150
aggType = aggType,
138-
agg = _agg
151+
agg = _agg,
152+
direction = direction
139153
)
140154
}
141155

142-
/*
143-
def apply(
156+
def buildBuckets(
144157
buckets: Seq[SQLBucket],
158+
bucketsDirection: Map[String, SortOrder],
145159
aggregations: Seq[Aggregation],
146-
current: Option[TermsAggregation]
160+
aggregationsDirection: Map[String, SortOrder],
161+
having: Option[SQLCriteria]
147162
): Option[TermsAggregation] = {
148-
buckets match {
149-
case Nil =>
150-
current.map(_.copy(subaggs = aggregations))
151-
case bucket +: tail =>
152-
val agg = termsAgg(bucket.name, s"${bucket.identifier.name}.keyword")
153-
current match {
154-
case Some(a) =>
155-
apply(tail, aggregations, Some(agg)) match {
156-
case Some(subAgg) =>
157-
Some(a.copy(subaggs = a.subaggs :+ subAgg))
158-
case _ => Some(a)
159-
}
163+
Console.println(bucketsDirection)
164+
buckets.reverse.foldLeft(Option.empty[TermsAggregation]) { (current, bucket) =>
165+
val agg = {
166+
bucketsDirection.get(bucket.identifier.identifierName) match {
167+
case Some(direction) =>
168+
termsAgg(bucket.name, s"${bucket.identifier.name}.keyword")
169+
.order(Seq(direction match {
170+
case Asc => TermsOrder(bucket.name, asc = true)
171+
case _ => TermsOrder(bucket.name, asc = false)
172+
}))
160173
case None =>
161-
apply(tail, aggregations, Some(agg))
174+
termsAgg(bucket.name, s"${bucket.identifier.name}.keyword")
162175
}
163-
}
164-
}
165-
*/
166-
167-
def buildBuckets(
168-
buckets: Seq[SQLBucket],
169-
aggregations: Seq[Aggregation]
170-
): Option[TermsAggregation] = {
171-
buckets.reverse.foldLeft(Option.empty[TermsAggregation]) { (current, bucket) =>
172-
val agg = termsAgg(bucket.name, s"${bucket.identifier.name}.keyword")
176+
}
173177
current match {
174178
case Some(subAgg) => Some(agg.copy(subaggs = Seq(subAgg)))
175-
case None => Some(agg.copy(subaggs = aggregations))
179+
case None =>
180+
val aggregationsWithOrder: Seq[TermsOrder] = aggregationsDirection.toSeq.map { kv =>
181+
kv._2 match {
182+
case Asc => TermsOrder(kv._1, asc = true)
183+
case _ => TermsOrder(kv._1, asc = false)
184+
}
185+
}
186+
val withAggregationOrders =
187+
if (aggregationsWithOrder.nonEmpty)
188+
agg.order(aggregationsWithOrder)
189+
else
190+
agg
191+
val withHaving = having match {
192+
case Some(criteria) =>
193+
import BucketSelectorScript._
194+
val script = toPainless(criteria)
195+
val bucketsPath = extractBucketsPath(criteria)
196+
197+
val bucketSelector =
198+
bucketSelectorAggregation("having_filter", Script(script), bucketsPath)
199+
200+
withAggregationOrders.copy(subaggs = aggregations :+ bucketSelector)
201+
202+
case None => withAggregationOrders.copy(subaggs = aggregations)
203+
}
204+
Some(withHaving)
176205
}
177206
}
178207
}

es6/sql-bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/package.scala

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,17 @@ package object bridge {
2121
request.limit.map(_.limit),
2222
request,
2323
request.buckets,
24-
request.aggregates.map(ElasticAggregation(_, request.having.flatMap(_.criteria)))
24+
request.aggregates.map(
25+
ElasticAggregation(_, request.having.flatMap(_.criteria), request.sorts)
26+
)
2527
).minScore(request.score)
2628

2729
implicit def requestToSearchRequest(request: SQLSearchRequest): SearchRequest = {
2830
import request._
2931
val notNestedBuckets = buckets.filterNot(_.identifier.nested)
3032
val nestedBuckets = buckets.filter(_.identifier.nested).groupBy(_.nestedBucket.getOrElse(""))
31-
val aggregations = aggregates.map(ElasticAggregation(_, request.having.flatMap(_.criteria)))
33+
val aggregations =
34+
aggregates.map(ElasticAggregation(_, request.having.flatMap(_.criteria), request.sorts))
3235
val notNestedAggregations = aggregations.filterNot(_.nested)
3336
val nestedAggregations =
3437
aggregations.filter(_.nested).groupBy(_.nestedAgg.map(_.name).getOrElse(""))
@@ -41,13 +44,22 @@ package object bridge {
4144
nestedAggregations.map { case (nested, aggs) =>
4245
val first = aggs.head
4346
val aggregations = aggs.map(_.agg)
44-
val buckets = ElasticAggregation.buildBuckets(
45-
nestedBuckets.getOrElse(nested, Seq.empty),
46-
aggregations
47-
) match {
48-
case Some(b) => Seq(b)
49-
case _ => aggregations
50-
}
47+
val aggregationDirections: Map[String, SortOrder] =
48+
aggs
49+
.filter(_.direction.isDefined)
50+
.map(agg => agg.agg.name -> agg.direction.getOrElse(Asc))
51+
.toMap
52+
val buckets =
53+
ElasticAggregation.buildBuckets(
54+
nestedBuckets.getOrElse(nested, Seq.empty),
55+
request.sorts -- aggregationDirections.keys,
56+
aggregations,
57+
aggregationDirections,
58+
request.having.flatMap(_.criteria)
59+
) match {
60+
case Some(b) => Seq(b)
61+
case _ => aggregations
62+
}
5163
val filtered: Option[Aggregation] =
5264
first.filteredAgg.map(filtered => filtered.subAggregations(buckets))
5365
first.nestedAgg.get.subAggregations(filtered.map(Seq(_)).getOrElse(buckets))
@@ -62,10 +74,17 @@ package object bridge {
6274
case _ =>
6375
_search aggregations {
6476
val first = notNestedAggregations.head
77+
val aggregationDirections: Map[String, SortOrder] = notNestedAggregations
78+
.filter(_.direction.isDefined)
79+
.map(agg => agg.agg.name -> agg.direction.get)
80+
.toMap
6581
val aggregations = notNestedAggregations.map(_.agg)
6682
val buckets = ElasticAggregation.buildBuckets(
6783
notNestedBuckets,
68-
aggregations
84+
request.sorts -- aggregationDirections.keys,
85+
aggregations,
86+
aggregationDirections,
87+
request.having.flatMap(_.criteria)
6988
) match {
7089
case Some(b) => Seq(b)
7190
case _ => aggregations
@@ -77,7 +96,7 @@ package object bridge {
7796
}
7897

7998
_search = orderBy match {
80-
case Some(o) =>
99+
case Some(o) if aggregates.isEmpty && buckets.isEmpty =>
81100
_search sortBy o.sorts.map(sort =>
82101
sort.order match {
83102
case Some(Desc) => FieldSort(sort.field).desc()
@@ -372,7 +391,7 @@ package object bridge {
372391
.map {
373392
case Left(l) =>
374393
l.aggregates
375-
.map(ElasticAggregation(_, l.having.flatMap(_.criteria)))
394+
.map(ElasticAggregation(_, l.having.flatMap(_.criteria), l.sorts))
376395
.map(aggregation => {
377396
val queryFiltered =
378397
l.where

0 commit comments

Comments
 (0)