Skip to content

Commit daf055c

Browse files
committed
remove SQLAggregate
1 parent 82fe1c0 commit daf055c

File tree

6 files changed

+57
-59
lines changed

6 files changed

+57
-59
lines changed

es6/sql-bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/ElasticAggregation.scala

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,20 @@ import app.softnetwork.elastic.sql.{
77
ElasticBoolQuery,
88
Max,
99
Min,
10-
SQLAggregate,
10+
SQLBucket,
11+
SQLCriteria,
12+
SQLField,
1113
Sum
1214
}
1315
import com.sksamuel.elastic4s.ElasticApi.{
1416
avgAgg,
1517
cardinalityAgg,
1618
filterAgg,
17-
matchAllQuery,
1819
maxAgg,
1920
minAgg,
2021
nestedAggregation,
2122
sumAgg,
23+
termsAgg,
2224
valueCountAgg
2325
}
2426
import com.sksamuel.elastic4s.searches.aggs.Aggregation
@@ -39,7 +41,7 @@ case class ElasticAggregation(
3941
)
4042

4143
object ElasticAggregation {
42-
def apply(sqlAgg: SQLAggregate): ElasticAggregation = {
44+
def apply(sqlAgg: SQLField, filter: Option[SQLCriteria]): ElasticAggregation = {
4345
import sqlAgg._
4446
val sourceField = identifier.columnName
4547

@@ -48,7 +50,7 @@ object ElasticAggregation {
4850
case _ => sourceField
4951
}
5052

51-
val distinct = identifier.distinct.isDefined
53+
val distinct = identifier.distinct
5254

5355
val agg =
5456
if (distinct)
@@ -58,8 +60,12 @@ object ElasticAggregation {
5860

5961
var aggPath = Seq[String]()
6062

63+
val aggType = aggregateFunction.getOrElse(
64+
throw new IllegalArgumentException("Aggregation function is required")
65+
)
66+
6167
val _agg =
62-
function match {
68+
aggType match {
6369
case Count =>
6470
if (distinct)
6571
cardinalityAgg(agg, sourceField)
@@ -79,12 +85,8 @@ object ElasticAggregation {
7985
aggPath ++= Seq(filteredAgg)
8086
filterAgg(
8187
filteredAgg,
82-
f.criteria
83-
.map(
84-
_.asFilter(boolQuery)
85-
.query(Set(identifier.innerHitsName).flatten, boolQuery)
86-
)
87-
.getOrElse(matchAllQuery())
88+
f.asFilter(boolQuery)
89+
.query(Set(identifier.innerHitsName).flatten, boolQuery)
8890
) subaggs {
8991
aggPath ++= Seq(agg)
9092
_agg
@@ -113,8 +115,22 @@ object ElasticAggregation {
113115
distinct = distinct,
114116
nested = identifier.nested,
115117
filtered = filter.nonEmpty,
116-
aggType = function,
118+
aggType = aggType, // TODO remove aggType by parsing it from agg
117119
agg = aggregation
118120
)
119121
}
122+
123+
def apply(buckets: Seq[SQLBucket], current: Option[Aggregation]): Option[Aggregation] = {
124+
buckets match {
125+
case Nil => current
126+
case bucket +: tail =>
127+
val agg = termsAgg(bucket.name, bucket.sourceBucket)
128+
current match {
129+
case Some(a) =>
130+
a.addSubagg(agg)
131+
apply(tail, Some(agg))
132+
case _ => apply(tail, Some(agg))
133+
}
134+
}
135+
}
120136
}

sql/bridge/src/main/scala/app/softnetwork/elastic/sql/bridge/ElasticAggregation.scala

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ import app.softnetwork.elastic.sql.{
77
ElasticBoolQuery,
88
Max,
99
Min,
10-
SQLAggregate,
10+
SQLBucket,
11+
SQLCriteria,
12+
SQLField,
1113
Sum
1214
}
1315
import com.sksamuel.elastic4s.ElasticApi.{
@@ -19,6 +21,7 @@ import com.sksamuel.elastic4s.ElasticApi.{
1921
minAgg,
2022
nestedAggregation,
2123
sumAgg,
24+
termsAgg,
2225
valueCountAgg
2326
}
2427
import com.sksamuel.elastic4s.requests.searches.aggs.Aggregation
@@ -39,7 +42,7 @@ case class ElasticAggregation(
3942
)
4043

4144
object ElasticAggregation {
42-
def apply(sqlAgg: SQLAggregate): ElasticAggregation = {
45+
def apply(sqlAgg: SQLField, filter: Option[SQLCriteria]): ElasticAggregation = {
4346
import sqlAgg._
4447
val sourceField = identifier.columnName
4548

@@ -48,7 +51,7 @@ object ElasticAggregation {
4851
case _ => sourceField
4952
}
5053

51-
val distinct = identifier.distinct.isDefined
54+
val distinct = identifier.distinct
5255

5356
val agg =
5457
if (distinct)
@@ -117,4 +120,18 @@ object ElasticAggregation {
117120
agg = aggregation
118121
)
119122
}
123+
124+
def apply(buckets: Seq[SQLBucket], current: Option[Aggregation]): Option[Aggregation] = {
125+
buckets match {
126+
case Nil => current
127+
case bucket +: tail =>
128+
val agg = termsAgg(bucket.name, bucket.sourceBucket)
129+
current match {
130+
case Some(a) =>
131+
a.addSubagg(agg)
132+
apply(tail, Some(agg))
133+
case _ => apply(tail, Some(agg))
134+
}
135+
}
136+
}
120137
}

sql/src/main/scala/app/softnetwork/elastic/sql/SQLGroupBy.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,5 @@ case class SQLBucket(
3434
} else {
3535
identifier.columnName
3636
}
37+
lazy val name: String = identifier.alias.getOrElse(sourceBucket.replace(".", "_"))
3738
}

sql/src/main/scala/app/softnetwork/elastic/sql/SQLParser.scala

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ trait SQLSelectParser {
122122
}
123123

124124
def select: Parser[SQLSelect] =
125-
Select.regex ~ rep1sep(aggregate | field, separator) ~ except.? ^^ { case _ ~ fields ~ e =>
125+
Select.regex ~ rep1sep(field, separator) ~ except.? ^^ { case _ ~ fields ~ e =>
126126
SQLSelect(fields, e)
127127
}
128128

@@ -158,17 +158,6 @@ trait SQLWhereParser {
158158

159159
private def ne: Parser[SQLExpressionOperator] = Ne.sql ^^ (_ => Ne)
160160

161-
def filter: Parser[SQLFilter] = Filter.regex ~> "[" ~> whereCriteria <~ "]" ^^ { rawTokens =>
162-
SQLFilter(
163-
processTokens(rawTokens)
164-
)
165-
}
166-
167-
def aggregate: Parser[SQLAggregate] =
168-
aggregateFunction ~ start ~ identifier ~ end ~ alias.? ~ filter.? ^^ {
169-
case agg ~ _ ~ i ~ _ ~ a ~ f => new SQLAggregate(agg, i, a, f)
170-
}
171-
172161
private def equality: Parser[SQLExpression] =
173162
not.? ~ (identifierWithFunction | identifier) ~ (eq | ne) ~ (boolean | literal | double | int) ^^ {
174163
case n ~ i ~ o ~ v => SQLExpression(i, o, v, n)

sql/src/main/scala/app/softnetwork/elastic/sql/SQLSearchRequest.scala

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,9 @@ case class SQLSearchRequest(
2020
updated.copy(select = select.update(updated), where = where.map(_.update(updated)))
2121
}
2222

23-
lazy val fields: Seq[String] =
24-
select.fields
25-
.filterNot {
26-
case _: SQLAggregate => true
27-
case _ => false
28-
}
29-
.map(_.sourceField)
30-
31-
lazy val aggregates: Seq[SQLAggregate] = select.fields.collect { case a: SQLAggregate => a }
23+
lazy val fields: Seq[String] = select.fields.filterNot(_.aggregation).map(_.sourceField)
24+
25+
lazy val aggregates: Seq[SQLField] = select.fields.filter(_.aggregation)
3226

3327
lazy val excludes: Seq[String] = select.except.map(_.fields.map(_.sourceField)).getOrElse(Nil)
3428

sql/src/main/scala/app/softnetwork/elastic/sql/SQLSelect.scala

Lines changed: 4 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ case object Select extends SQLExpr("select") with SQLRegex
55
case class SQLField(
66
identifier: SQLIdentifier,
77
alias: Option[SQLAlias] = None
8-
) extends Updateable {
8+
) extends Updateable
9+
with SQLTokenWithFunction {
910
override def sql: String = s"$identifier${asString(alias)}"
1011
def update(request: SQLSearchRequest): SQLField =
1112
this.copy(identifier = identifier.update(request))
@@ -17,6 +18,8 @@ case class SQLField(
1718
} else {
1819
identifier.columnName
1920
}
21+
22+
override def function: Option[SQLFunction] = identifier.function
2023
}
2124

2225
case object Except extends SQLExpr("except") with SQLRegex
@@ -27,28 +30,6 @@ case class SQLExcept(fields: Seq[SQLField]) extends Updateable {
2730
this.copy(fields = fields.map(_.update(request)))
2831
}
2932

30-
case object Filter extends SQLExpr("filter") with SQLRegex
31-
32-
case class SQLFilter(criteria: Option[SQLCriteria]) extends Updateable {
33-
override def sql: String = criteria match {
34-
case Some(c) => s" $Filter($c)"
35-
case _ => ""
36-
}
37-
def update(request: SQLSearchRequest): SQLFilter =
38-
this.copy(criteria = criteria.map(_.update(request)))
39-
}
40-
41-
class SQLAggregate(
42-
val function: AggregateFunction,
43-
override val identifier: SQLIdentifier,
44-
override val alias: Option[SQLAlias] = None,
45-
val filter: Option[SQLFilter] = None
46-
) extends SQLField(identifier, alias) {
47-
override def sql: String = s"$function($identifier)${asString(alias)}"
48-
override def update(request: SQLSearchRequest): SQLAggregate =
49-
new SQLAggregate(function, identifier.update(request), alias, filter.map(_.update(request)))
50-
}
51-
5233
case class SQLSelect(
5334
fields: Seq[SQLField] = Seq(SQLField(identifier = SQLIdentifier("*"))),
5435
except: Option[SQLExcept] = None

0 commit comments

Comments
 (0)