Skip to content

Commit 39ed145

Browse files
committed
fix aggregations with having, nested and group by, add support for bucket includes and excludes
1 parent cda49ac commit 39ed145

File tree

12 files changed

+565
-695
lines changed

12 files changed

+565
-695
lines changed

es6/sql-bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/ElasticAggregation.scala

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@ package app.softnetwork.elastic.sql.bridge
33
import app.softnetwork.elastic.sql.query.{
44
Asc,
55
Bucket,
6-
BucketSelectorScript,
6+
BucketIncludesExcludes,
77
Criteria,
88
Desc,
99
Field,
10+
MetricSelectorScript,
1011
NestedElement,
1112
NestedElements,
1213
SortOrder
@@ -104,7 +105,7 @@ object ElasticAggregation {
104105
buildScript: (String, Script) => Aggregation
105106
): Aggregation = {
106107
if (transformFuncs.nonEmpty) {
107-
val scriptSrc = identifier.painless
108+
val scriptSrc = identifier.painless()
108109
val script = Script(scriptSrc).lang("painless")
109110
buildScript(aggName, script)
110111
} else {
@@ -144,7 +145,7 @@ object ElasticAggregation {
144145
.copy(
145146
scripts = th.fields
146147
.filter(_.isScriptField)
147-
.map(f => f.sourceField -> Script(f.painless).lang("painless"))
148+
.map(f => f.sourceField -> Script(f.painless()).lang("painless"))
148149
.toMap
149150
)
150151
.size(limit) sortBy th.orderBy.sorts.map(sort =>
@@ -238,18 +239,37 @@ object ElasticAggregation {
238239
having: Option[Criteria]
239240
): Option[TermsAggregation] = {
240241
buckets.reverse.foldLeft(Option.empty[TermsAggregation]) { (current, bucket) =>
241-
val agg = {
242+
var agg = {
242243
bucketsDirection.get(bucket.identifier.identifierName) match {
243244
case Some(direction) =>
244245
termsAgg(bucket.name, s"${bucket.identifier.path}.keyword")
245246
.order(Seq(direction match {
246-
case Asc => TermsOrder(bucket.name, asc = true)
247-
case _ => TermsOrder(bucket.name, asc = false)
247+
case Asc => TermsOrder("_key", asc = true)
248+
case _ => TermsOrder("_key", asc = false)
248249
}))
249250
case None =>
250251
termsAgg(bucket.name, s"${bucket.identifier.path}.keyword")
251252
}
252253
}
254+
bucket.size.foreach(s => agg = agg.size(s))
255+
having match {
256+
case Some(criteria) =>
257+
criteria.includes(bucket, not = false, BucketIncludesExcludes()) match {
258+
case BucketIncludesExcludes(_, Some(regex)) if regex.nonEmpty =>
259+
agg = agg.include(regex)
260+
case BucketIncludesExcludes(values, _) if values.nonEmpty =>
261+
agg = agg.include(values.toArray)
262+
case _ =>
263+
}
264+
criteria.excludes(bucket, not = false, BucketIncludesExcludes()) match {
265+
case BucketIncludesExcludes(_, Some(regex)) if regex.nonEmpty =>
266+
agg = agg.exclude(regex)
267+
case BucketIncludesExcludes(values, _) if values.nonEmpty =>
268+
agg = agg.exclude(values.toArray)
269+
case _ =>
270+
}
271+
case _ =>
272+
}
253273
current match {
254274
case Some(subAgg) => Some(agg.copy(subaggs = Seq(subAgg)))
255275
case None =>
@@ -266,9 +286,8 @@ object ElasticAggregation {
266286
agg
267287
val withHaving = having match {
268288
case Some(criteria) =>
269-
import BucketSelectorScript._
270-
val script = toPainless(criteria)
271-
val bucketsPath = extractBucketsPath(criteria)
289+
val script = MetricSelectorScript.metricSelector(criteria)
290+
val bucketsPath = criteria.extractMetricsPath
272291

273292
val bucketSelector =
274293
bucketSelectorAggregation(

es6/sql-bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/package.scala

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,6 @@ package object bridge {
7171

7272
val notNestedAggregations = aggregations.filterNot(_.nested)
7373

74-
val filteredAgg: Option[FilterAggregation] =
75-
requestToFilterAggregation(request)
76-
7774
val rootAggregations = notNestedAggregations match {
7875
case Nil => Nil
7976
case aggs =>
@@ -92,9 +89,7 @@ package object bridge {
9289
case Some(b) => Seq(b)
9390
case _ => aggregations
9491
}
95-
val filtered: Option[Aggregation] =
96-
filteredAgg.map(filtered => filtered.subAggregations(buckets))
97-
filtered.map(Seq(_)).getOrElse(buckets)
92+
buckets
9893
}
9994
rootAggregations
10095
}
@@ -116,6 +111,19 @@ package object bridge {
116111
)
117112
)
118113

114+
val nestedGroupedBuckets =
115+
request.buckets
116+
.filter(_.nested)
117+
.groupBy(
118+
_.nestedBucket.getOrElse(
119+
throw new IllegalArgumentException(
120+
"Nested bucket must have a nested element"
121+
)
122+
)
123+
)
124+
125+
val havingCriteria = request.having.flatMap(_.criteria)
126+
119127
val scopedAggregations = NestedElements
120128
.buildNestedTrees(
121129
nestedAggregations.values.flatMap(_.flatMap(_.nestedElement)).toSeq.distinct
@@ -133,22 +141,18 @@ package object bridge {
133141
.toMap
134142
val buckets: Seq[Aggregation] =
135143
ElasticAggregation.buildBuckets(
136-
request.buckets
137-
.filter(_.nested)
138-
.groupBy(_.nestedBucket.getOrElse(""))
144+
nestedGroupedBuckets
139145
.getOrElse(n.innerHitsName, Seq.empty),
140146
request.sorts -- directions.keys,
141147
aggregations,
142148
directions,
143-
request.having.flatMap(_.criteria)
149+
havingCriteria
144150
) match {
145151
case Some(b) => Seq(b)
146152
case _ => aggregations
147153
}
148154
val nestedFilteredAgg: Option[FilterAggregation] =
149155
requestToNestedFilterAggregation(request, n.innerHitsName)
150-
val filtered: Option[Aggregation] =
151-
requestToFilterAggregation(request).map(filtered => filtered.subAggregations(buckets))
152156
val children = n.children
153157
if (children.nonEmpty) {
154158
val innerAggs = children.map(buildNestedAgg)
@@ -173,8 +177,8 @@ package object bridge {
173177
n.path
174178
) subaggs (nestedFilteredAgg match {
175179
case Some(filteredAgg) =>
176-
Seq(filteredAgg subaggs filtered.map(Seq(_)).getOrElse(buckets))
177-
case _ => filtered.map(Seq(_)).getOrElse(buckets)
180+
Seq(filteredAgg subaggs buckets)
181+
case _ => buckets
178182
})
179183
}
180184
}
@@ -293,7 +297,7 @@ package object bridge {
293297
_search scriptfields scriptFields.map { field =>
294298
scriptField(
295299
field.scriptName,
296-
Script(script = field.painless(None))
300+
Script(script = field.painless())
297301
.lang("painless")
298302
.scriptType("source")
299303
.params(field.identifier.functions.headOption match {
@@ -348,7 +352,7 @@ package object bridge {
348352
case _ => true
349353
}))
350354
) {
351-
return scriptQuery(Script(script = painless(None)).lang("painless").scriptType("source"))
355+
return scriptQuery(Script(script = painless()).lang("painless").scriptType("source"))
352356
}
353357
// Geo distance special case
354358
identifier.functions.headOption match {
@@ -562,10 +566,10 @@ package object bridge {
562566
case NE | DIFF => not(rangeQuery(identifier.name) gte script lte script)
563567
}
564568
case _ =>
565-
scriptQuery(Script(script = painless(None)).lang("painless").scriptType("source"))
569+
scriptQuery(Script(script = painless()).lang("painless").scriptType("source"))
566570
}
567571
case _ =>
568-
scriptQuery(Script(script = painless(None)).lang("painless").scriptType("source"))
572+
scriptQuery(Script(script = painless()).lang("painless").scriptType("source"))
569573
}
570574
case _ => matchAllQuery()
571575
}
@@ -719,7 +723,7 @@ package object bridge {
719723
case _ =>
720724
scriptQuery(
721725
Script(
722-
script = distanceCriteria.painless(None),
726+
script = distanceCriteria.painless(),
723727
lang = Some("painless"),
724728
scriptType = Source,
725729
params = distance.params

0 commit comments

Comments
 (0)