Skip to content

Commit 1755af1

Browse files
committed
handle buckets and nested buckets
1 parent b229ba1 commit 1755af1

File tree

10 files changed

+207
-54
lines changed

10 files changed

+207
-54
lines changed

es6/sql-bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/ElasticAggregation.scala

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,12 @@ import com.sksamuel.elastic4s.ElasticApi.{
2323
termsAgg,
2424
valueCountAgg
2525
}
26-
import com.sksamuel.elastic4s.searches.aggs.{Aggregation, FilterAggregation, NestedAggregation}
26+
import com.sksamuel.elastic4s.searches.aggs.{
27+
Aggregation,
28+
FilterAggregation,
29+
NestedAggregation,
30+
TermsAggregation
31+
}
2732

2833
import scala.language.implicitConversions
2934

@@ -134,17 +139,41 @@ object ElasticAggregation {
134139
)
135140
}
136141

137-
def apply(buckets: Seq[SQLBucket], current: Option[Aggregation]): Option[Aggregation] = {
142+
/*
143+
def apply(
144+
buckets: Seq[SQLBucket],
145+
aggregations: Seq[Aggregation],
146+
current: Option[TermsAggregation]
147+
): Option[TermsAggregation] = {
138148
buckets match {
139-
case Nil => current
149+
case Nil =>
150+
current.map(_.copy(subaggs = aggregations))
140151
case bucket +: tail =>
141-
val agg = termsAgg(bucket.name, bucket.sourceBucket)
152+
val agg = termsAgg(bucket.name, s"${bucket.identifier.name}.keyword")
142153
current match {
143154
case Some(a) =>
144-
a.addSubagg(agg)
145-
apply(tail, Some(agg))
146-
case _ => apply(tail, Some(agg))
155+
apply(tail, aggregations, Some(agg)) match {
156+
case Some(subAgg) =>
157+
Some(a.copy(subaggs = a.subaggs :+ subAgg))
158+
case _ => Some(a)
159+
}
160+
case None =>
161+
apply(tail, aggregations, Some(agg))
147162
}
148163
}
149164
}
165+
*/
166+
167+
def buildBuckets(
168+
buckets: Seq[SQLBucket],
169+
aggregations: Seq[Aggregation]
170+
): Option[TermsAggregation] = {
171+
buckets.reverse.foldLeft(Option.empty[TermsAggregation]) { (current, bucket) =>
172+
val agg = termsAgg(bucket.name, s"${bucket.identifier.name}.keyword")
173+
current match {
174+
case Some(subAgg) => Some(agg.copy(subaggs = Seq(subAgg)))
175+
case None => Some(agg.copy(subaggs = aggregations))
176+
}
177+
}
178+
}
150179
}

es6/sql-bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/package.scala

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ package object bridge {
2626

2727
implicit def requestToSearchRequest(request: SQLSearchRequest): SearchRequest = {
2828
import request._
29+
val notNestedBuckets = buckets.filterNot(_.identifier.nested)
30+
val nestedBuckets = buckets.filter(_.identifier.nested).groupBy(_.nestedBucket.getOrElse(""))
2931
val aggregations = aggregates.map(ElasticAggregation(_, request.having.flatMap(_.criteria)))
3032
val notNestedAggregations = aggregations.filterNot(_.nested)
3133
val nestedAggregations =
@@ -41,11 +43,19 @@ package object bridge {
4143

4244
_search = if (nestedAggregations.nonEmpty) {
4345
_search aggregations {
44-
nestedAggregations.map { case (_, aggs) =>
46+
nestedAggregations.map { case (nested, aggs) =>
4547
val first = aggs.head
48+
val aggregations = aggs.map(_.agg)
49+
val buckets = ElasticAggregation.buildBuckets(
50+
nestedBuckets.getOrElse(nested, Seq.empty),
51+
aggregations
52+
) match {
53+
case Some(b) => Seq(b)
54+
case _ => aggregations
55+
}
4656
val filtered: Option[Aggregation] =
47-
first.filteredAgg.map(filtered => filtered.subAggregations(aggs.map(_.agg)))
48-
first.nestedAgg.get.subAggregations(filtered.map(Seq(_)).getOrElse(aggs.map(_.agg)))
57+
first.filteredAgg.map(filtered => filtered.subAggregations(buckets))
58+
first.nestedAgg.get.subAggregations(filtered.map(Seq(_)).getOrElse(buckets))
4959
}
5060
}
5161
} else {
@@ -54,7 +64,21 @@ package object bridge {
5464

5565
_search = notNestedAggregations match {
5666
case Nil => _search
57-
case _ => _search aggregations { notNestedAggregations.map(_.agg) }
67+
case _ =>
68+
_search aggregations {
69+
val first = notNestedAggregations.head
70+
val aggregations = notNestedAggregations.map(_.agg)
71+
val buckets = ElasticAggregation.buildBuckets(
72+
notNestedBuckets,
73+
aggregations
74+
) match {
75+
case Some(b) => Seq(b)
76+
case _ => aggregations
77+
}
78+
val filtered: Option[Aggregation] =
79+
first.filteredAgg.map(filtered => filtered.subAggregations(buckets))
80+
filtered.map(Seq(_)).getOrElse(buckets)
81+
}
5882
}
5983

6084
_search = orderBy match {
@@ -68,7 +92,7 @@ package object bridge {
6892
case _ => _search
6993
}
7094

71-
if (aggregations.nonEmpty && fields.isEmpty) {
95+
if (aggregations.nonEmpty || buckets.nonEmpty) {
7296
_search size 0
7397
} else {
7498
limit match {

es6/sql-bridge/src/test/scala/app/softnetwork/elastic/sql/SQLQuerySpec.scala

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -519,9 +519,8 @@ class SQLQuerySpec extends AnyFlatSpec with Matchers {
519519
val select: ElasticSearchRequest =
520520
SQLQuery(
521521
s"""SELECT
522-
| inner_products.name,
523-
| inner_products.category,
524-
| inner_products.price,
522+
| inner_products.category as category,
523+
| inner_products.name as productName,
525524
| min(inner_products.price) as min_price,
526525
| max(inner_products.price) as max_price
527526
|FROM
@@ -539,17 +538,16 @@ class SQLQuerySpec extends AnyFlatSpec with Matchers {
539538
| (
540539
| distance(pickup.location,(0.0,0.0)) <= "7000m" OR
541540
| distance(withdrawals.location,(0.0,0.0)) <= "7000m"
542-
| ) AND
543-
| (
544-
| inner_products.deleted=false AND
545-
| inner_products.upForSale=true AND
546-
| inner_products.stock > 0
547541
| )
548-
| ) AND
549-
| (
550-
| match (products.name) against ("lasagnes") AND
551-
| match (products.description, products.ingredients) against ("lasagnes")
552542
| )
543+
|GROUP BY
544+
| inner_products.category,
545+
| inner_products.name
546+
|HAVING inner_products.deleted=false AND
547+
| inner_products.upForSale=true AND
548+
| inner_products.stock > 0 AND
549+
| match (inner_products.name) against ("lasagnes") AND
550+
| match (inner_products.description, inner_products.ingredients) against ("lasagnes")
553551
|ORDER BY preparationTime ASC, nbOrders DESC
554552
|LIMIT 100""".stripMargin
555553
).minScore(1.0)

sql/bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/ElasticAggregation.scala

Lines changed: 46 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,29 +16,33 @@ import com.sksamuel.elastic4s.ElasticApi.{
1616
avgAgg,
1717
cardinalityAgg,
1818
filterAgg,
19-
matchAllQuery,
2019
maxAgg,
2120
minAgg,
2221
nestedAggregation,
2322
sumAgg,
2423
termsAgg,
2524
valueCountAgg
2625
}
27-
import com.sksamuel.elastic4s.requests.searches.aggs.{Aggregation, NestedAggregation, FilterAggregation}
26+
import com.sksamuel.elastic4s.requests.searches.aggs.{
27+
Aggregation,
28+
FilterAggregation,
29+
NestedAggregation,
30+
TermsAggregation
31+
}
2832

2933
import scala.language.implicitConversions
3034

3135
case class ElasticAggregation(
32-
aggName: String,
33-
field: String,
34-
sourceField: String,
35-
sources: Seq[String] = Seq.empty,
36-
query: Option[String] = None,
37-
distinct: Boolean = false,
38-
nestedAgg: Option[NestedAggregation] = None,
39-
filteredAgg: Option[FilterAggregation] = None,
40-
aggType: AggregateFunction,
41-
agg: Aggregation) {
36+
aggName: String,
37+
field: String,
38+
sourceField: String,
39+
sources: Seq[String] = Seq.empty,
40+
query: Option[String] = None,
41+
distinct: Boolean = false,
42+
nestedAgg: Option[NestedAggregation] = None,
43+
filteredAgg: Option[FilterAggregation] = None,
44+
aggType: AggregateFunction,
45+
agg: Aggregation) {
4246
val nested: Boolean = nestedAgg.nonEmpty
4347
val filtered: Boolean = filteredAgg.nonEmpty
4448
}
@@ -134,17 +138,41 @@ object ElasticAggregation {
134138
)
135139
}
136140

137-
def apply(buckets: Seq[SQLBucket], current: Option[Aggregation]): Option[Aggregation] = {
141+
/*
142+
def apply(
143+
buckets: Seq[SQLBucket],
144+
aggregations: Seq[Aggregation],
145+
current: Option[TermsAggregation]
146+
): Option[TermsAggregation] = {
138147
buckets match {
139-
case Nil => current
148+
case Nil =>
149+
current.map(_.copy(subaggs = aggregations))
140150
case bucket +: tail =>
141-
val agg = termsAgg(bucket.name, bucket.sourceBucket)
151+
val agg = termsAgg(bucket.name, s"${bucket.identifier.name}.keyword")
142152
current match {
143153
case Some(a) =>
144-
a.addSubagg(agg)
145-
apply(tail, Some(agg))
146-
case _ => apply(tail, Some(agg))
154+
apply(tail, aggregations, Some(agg)) match {
155+
case Some(subAgg) =>
156+
Some(a.copy(subaggs = a.subaggs :+ subAgg))
157+
case _ => Some(a)
158+
}
159+
case None =>
160+
apply(tail, aggregations, Some(agg))
147161
}
148162
}
149163
}
164+
*/
165+
166+
def buildBuckets(
167+
buckets: Seq[SQLBucket],
168+
aggregations: Seq[Aggregation]
169+
): Option[TermsAggregation] = {
170+
buckets.reverse.foldLeft(Option.empty[TermsAggregation]) { (current, bucket) =>
171+
val agg = termsAgg(bucket.name, s"${bucket.identifier.name}.keyword")
172+
current match {
173+
case Some(subAgg) => Some(agg.copy(subaggs = Seq(subAgg)))
174+
case None => Some(agg.copy(subaggs = aggregations))
175+
}
176+
}
177+
}
150178
}

sql/bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/package.scala

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ package object bridge {
2828

2929
implicit def requestToSearchRequest(request: SQLSearchRequest): SearchRequest = {
3030
import request._
31+
val notNestedBuckets = buckets.filterNot(_.identifier.nested)
32+
val nestedBuckets = buckets.filter(_.identifier.nested).groupBy(_.nestedBucket.getOrElse(""))
3133
val aggregations = aggregates.map(ElasticAggregation(_, request.having.flatMap(_.criteria)))
3234
val notNestedAggregations = aggregations.filterNot(_.nested)
3335
val nestedAggregations =
@@ -43,11 +45,19 @@ package object bridge {
4345

4446
_search = if (nestedAggregations.nonEmpty) {
4547
_search aggregations {
46-
nestedAggregations.map { case (_, aggs) =>
48+
nestedAggregations.map { case (nested, aggs) =>
4749
val first = aggs.head
50+
val aggregations = aggs.map(_.agg)
51+
val buckets = ElasticAggregation.buildBuckets(
52+
nestedBuckets.getOrElse(nested, Seq.empty),
53+
aggregations
54+
) match {
55+
case Some(b) => Seq(b)
56+
case _ => aggregations
57+
}
4858
val filtered: Option[Aggregation] =
49-
first.filteredAgg.map(filtered => filtered.subAggregations(aggs.map(_.agg)))
50-
first.nestedAgg.get.subAggregations(filtered.map(Seq(_)).getOrElse(aggs.map(_.agg)))
59+
first.filteredAgg.map(filtered => filtered.subAggregations(buckets))
60+
first.nestedAgg.get.subAggregations(filtered.map(Seq(_)).getOrElse(buckets))
5161
}
5262
}
5363
} else {
@@ -56,7 +66,20 @@ package object bridge {
5666

5767
_search = notNestedAggregations match {
5868
case Nil => _search
59-
case _ => _search aggregations { notNestedAggregations.map(_.agg) }
69+
case _ => _search aggregations {
70+
val first = notNestedAggregations.head
71+
val aggregations = notNestedAggregations.map(_.agg)
72+
val buckets = ElasticAggregation.buildBuckets(
73+
notNestedBuckets,
74+
aggregations
75+
) match {
76+
case Some(b) => Seq(b)
77+
case _ => aggregations
78+
}
79+
val filtered: Option[Aggregation] =
80+
first.filteredAgg.map(filtered => filtered.subAggregations(buckets))
81+
filtered.map(Seq(_)).getOrElse(buckets)
82+
}
6083
}
6184

6285
_search = orderBy match {

sql/src/main/scala/app/softnetwork/elastic/sql/SQLGroupBy.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,8 @@ case class SQLBucket(
2222
} else {
2323
identifier.name
2424
}
25+
lazy val nestedBucket: Option[String] =
26+
identifier.nestedType.map(t => s"nested_$t")
27+
2528
lazy val name: String = identifier.fieldAlias.getOrElse(sourceBucket.replace(".", "_"))
2629
}

sql/src/main/scala/app/softnetwork/elastic/sql/SQLSearchRequest.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,12 @@ case class SQLSearchRequest(
2727
)
2828
}
2929

30-
lazy val fields: Seq[String] = select.fields.filterNot(_.aggregation).map(_.sourceField)
30+
lazy val fields: Seq[String] = {
31+
if (aggregates.isEmpty && buckets.isEmpty)
32+
select.fields.map(_.sourceField).filterNot(f => excludes.contains(f))
33+
else
34+
Seq.empty
35+
}
3136

3237
lazy val aggregates: Seq[SQLField] = select.fields.filter(_.aggregation)
3338

sql/src/main/scala/app/softnetwork/elastic/sql/SQLWhere.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ case class SQLExpression(
159159
) extends SQLCriteriaWithIdentifier
160160
with ElasticFilter {
161161
override def sql =
162-
s"$identifier ${maybeNot.map(_ => "not ").getOrElse("")}$operator $value"
162+
s"${maybeNot.map(_ => "not ").getOrElse("")}$identifier $operator $value"
163163
override def update(request: SQLSearchRequest): SQLCriteria = {
164164
val updated = this.copy(identifier = identifier.update(request))
165165
if (updated.nested) {
@@ -279,6 +279,8 @@ case class SQLMatch(
279279
override def update(request: SQLSearchRequest): SQLCriteria =
280280
this.copy(identifiers = identifiers.map(_.update(request)))
281281

282+
override lazy val nested: Boolean = identifiers.forall(_.nested)
283+
282284
lazy val criteria: SQLCriteria = {
283285
identifiers.map(id => ElasticMatch(id, value, None)) match {
284286
case Nil => throw new IllegalArgumentException("No identifiers for MATCH")

sql/src/main/scala/app/softnetwork/elastic/sql/package.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ package object sql {
214214
) extends SQLExpr({
215215
var parts: Seq[String] = name.split("\\.").toSeq
216216
tableAlias match {
217-
case Some(a) => parts = a +: parts
217+
case Some(a) => parts = a +: (if (nested) parts.tail else parts)
218218
case _ =>
219219
}
220220
val sql = {
@@ -254,18 +254,20 @@ package object sql {
254254
tableAlias = Some(parts.head),
255255
name = s"${tuple._2}.${parts.tail.mkString(".")}",
256256
nested = true,
257-
limit = tuple._3
257+
limit = tuple._3,
258+
fieldAlias = request.fieldAliases.get(identifierName).orElse(fieldAlias)
258259
)
259260
case _ =>
260261
this.copy(
261262
tableAlias = Some(parts.head),
262-
name = parts.tail.mkString(".")
263+
name = parts.tail.mkString("."),
264+
fieldAlias = request.fieldAliases.get(identifierName).orElse(fieldAlias)
263265
)
264266
}
265-
} else if (request.fieldAliases.contains(identifierName)) {
266-
this.copy(fieldAlias = Some(request.fieldAliases(identifierName)))
267267
} else {
268-
this
268+
this.copy(
269+
fieldAlias = request.fieldAliases.get(identifierName).orElse(fieldAlias)
270+
)
269271
}
270272
}
271273
}

0 commit comments

Comments
 (0)