diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index f63b4b1e2e0..19dce3e3609 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -151,6 +151,7 @@ import org.opensearch.sql.calcite.plan.OpenSearchConstants; import org.opensearch.sql.calcite.utils.BinUtils; import org.opensearch.sql.calcite.utils.JoinAndLookupUtils; +import org.opensearch.sql.calcite.utils.PPLHintUtils; import org.opensearch.sql.calcite.utils.PlanUtils; import org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils; import org.opensearch.sql.calcite.utils.WildcardUtils; @@ -949,13 +950,14 @@ private boolean isCountField(RexCall call) { * @param groupExprList group by expression list * @param aggExprList aggregate expression list * @param context CalcitePlanContext + * @param hintIgnoreNullBucket true if bucket_nullable=false * @return Pair of (group-by list, field list, aggregate list) */ private Pair, List> aggregateWithTrimming( List groupExprList, List aggExprList, CalcitePlanContext context, - boolean hintBucketNonNull) { + boolean hintIgnoreNullBucket) { Pair, List> resolved = resolveAttributesForAggregation(groupExprList, aggExprList, context); List resolvedGroupByList = resolved.getLeft(); @@ -1047,7 +1049,9 @@ private Pair, List> aggregateWithTrimming( // \- Scan t List trimmedRefs = new ArrayList<>(); trimmedRefs.addAll(PlanUtils.getInputRefs(resolvedGroupByList)); // group-by keys first - trimmedRefs.addAll(PlanUtils.getInputRefsFromAggCall(resolvedAggCallList)); + List aggCallRefs = PlanUtils.getInputRefsFromAggCall(resolvedAggCallList); + boolean hintNestedAgg = containsNestedAggregator(context.relBuilder, aggCallRefs); + trimmedRefs.addAll(aggCallRefs); context.relBuilder.project(trimmedRefs); // Re-resolve all attributes based on adding trimmed Project. @@ -1059,7 +1063,8 @@ private Pair, List> aggregateWithTrimming( List intendedGroupKeyAliases = getGroupKeyNamesAfterAggregation(reResolved.getLeft()); context.relBuilder.aggregate( context.relBuilder.groupKey(reResolved.getLeft()), reResolved.getRight()); - if (hintBucketNonNull) PlanUtils.addIgnoreNullBucketHintToAggregate(context.relBuilder); + if (hintIgnoreNullBucket) PPLHintUtils.addIgnoreNullBucketHintToAggregate(context.relBuilder); + if (hintNestedAgg) PPLHintUtils.addNestedAggCallHintToAggregate(context.relBuilder); // During aggregation, Calcite projects both input dependencies and output group-by fields. // When names conflict, Calcite adds numeric suffixes (e.g., "value0"). // Apply explicit renaming to restore the intended aliases. @@ -1068,6 +1073,17 @@ private Pair, List> aggregateWithTrimming( return Pair.of(reResolved.getLeft(), reResolved.getRight()); } + /** + * Return true if the aggCalls contains a nested field. For example: aggCalls: [count(), + * count(a.b)] returns true. + */ + private boolean containsNestedAggregator(RelBuilder relBuilder, List aggCallRefs) { + return aggCallRefs.stream() + .map(r -> relBuilder.peek().getRowType().getFieldNames().get(r.getIndex())) + .map(name -> org.apache.commons.lang3.StringUtils.substringBefore(name, ".")) + .anyMatch(root -> relBuilder.field(root).getType().getSqlTypeName() == SqlTypeName.ARRAY); + } + /** * Imitates {@code Registrar.registerExpression} of {@link RelBuilder} to derive the output order * of group-by keys after aggregation. @@ -1173,8 +1189,8 @@ private void visitAggregation( } groupExprList.addAll(node.getGroupExprList()); - // Add stats hint to LogicalAggregation. - boolean toAddHintsOnAggregate = + // Add a hint to LogicalAggregation when bucket_nullable=false. + boolean hintIgnoreNullBucket = !groupExprList.isEmpty() // This checks if all group-bys should be nonnull && nonNullGroupMask.nextClearBit(0) >= groupExprList.size(); @@ -1194,14 +1210,16 @@ private void visitAggregation( .filter(nonNullGroupMask::get) .mapToObj(nonNullCandidates::get) .toList(); - context.relBuilder.filter( - PlanUtils.getSelectColumns(nonNullFields).stream() - .map(context.relBuilder::field) - .map(context.relBuilder::isNotNull) - .toList()); + if (!nonNullFields.isEmpty()) { + context.relBuilder.filter( + PlanUtils.getSelectColumns(nonNullFields).stream() + .map(context.relBuilder::field) + .map(context.relBuilder::isNotNull) + .toList()); + } Pair, List> aggregationAttributes = - aggregateWithTrimming(groupExprList, aggExprList, context, toAddHintsOnAggregate); + aggregateWithTrimming(groupExprList, aggExprList, context, hintIgnoreNullBucket); // schema reordering List outputFields = context.relBuilder.fields(); @@ -2329,9 +2347,9 @@ public RelNode visitRareTopN(RareTopN node, CalcitePlanContext context) { // if usenull=false, add a isNotNull before Aggregate and the hint to this Aggregate Boolean bucketNullable = (Boolean) argumentMap.get(RareTopN.Option.useNull.name()).getValue(); - boolean toAddHintsOnAggregate = false; + boolean hintIgnoreNullBucket = false; if (!bucketNullable && !groupExprList.isEmpty()) { - toAddHintsOnAggregate = true; + hintIgnoreNullBucket = true; // add isNotNull filter before aggregation to filter out null bucket List groupByList = groupExprList.stream().map(expr -> rexVisitor.analyze(expr, context)).toList(); @@ -2341,7 +2359,7 @@ public RelNode visitRareTopN(RareTopN node, CalcitePlanContext context) { .map(context.relBuilder::isNotNull) .toList()); } - aggregateWithTrimming(groupExprList, aggExprList, context, toAddHintsOnAggregate); + aggregateWithTrimming(groupExprList, aggExprList, context, hintIgnoreNullBucket); // 2. add count() column with sort direction List partitionKeys = rexVisitor.analyze(node.getGroupExprList(), context); diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java b/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java index a5cdf0f45f0..1769242e3c6 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteToolsHelper.java @@ -52,7 +52,6 @@ import org.apache.calcite.jdbc.CalcitePrepare; import org.apache.calcite.jdbc.CalciteSchema; import org.apache.calcite.jdbc.Driver; -import org.apache.calcite.linq4j.function.Function0; import org.apache.calcite.plan.Context; import org.apache.calcite.plan.Contexts; import org.apache.calcite.plan.Convention; @@ -175,8 +174,11 @@ public Connection connect( } @Override - protected Function0 createPrepareFactory() { - return OpenSearchPrepareImpl::new; + public CalcitePrepare createPrepare() { + if (prepareFactory != null) { + return prepareFactory.get(); + } + return new OpenSearchPrepareImpl(); } } @@ -298,10 +300,10 @@ public OpenSearchCalcitePreparingStmt( @Override protected PreparedResult implement(RelRoot root) { - Hook.PLAN_BEFORE_IMPLEMENTATION.run(root); - RelDataType resultType = root.rel.getRowType(); - boolean isDml = root.kind.belongsTo(SqlKind.DML); if (root.rel instanceof Scannable scannable) { + Hook.PLAN_BEFORE_IMPLEMENTATION.run(root); + RelDataType resultType = root.rel.getRowType(); + boolean isDml = root.kind.belongsTo(SqlKind.DML); final Bindable bindable = dataContext -> scannable.scan(); return new PreparedResultImpl( diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/PPLHintStrategyTable.java b/core/src/main/java/org/opensearch/sql/calcite/utils/PPLHintStrategyTable.java deleted file mode 100644 index 84a8b437887..00000000000 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/PPLHintStrategyTable.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.calcite.utils; - -import com.google.common.base.Suppliers; -import java.util.function.Supplier; -import lombok.experimental.UtilityClass; -import org.apache.calcite.rel.hint.HintStrategyTable; -import org.apache.calcite.rel.logical.LogicalAggregate; - -@UtilityClass -public class PPLHintStrategyTable { - - private static final Supplier HINT_STRATEGY_TABLE = - Suppliers.memoize( - () -> - HintStrategyTable.builder() - .hintStrategy( - "stats_args", - (hint, rel) -> { - return rel instanceof LogicalAggregate; - }) - // add more here - .build()); - - /** Update the HINT_STRATEGY_TABLE when you create a new hint. */ - public static HintStrategyTable getHintStrategyTable() { - return HINT_STRATEGY_TABLE.get(); - } -} diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/PPLHintUtils.java b/core/src/main/java/org/opensearch/sql/calcite/utils/PPLHintUtils.java new file mode 100644 index 00000000000..915c45e7083 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/PPLHintUtils.java @@ -0,0 +1,84 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.utils; + +import com.google.common.base.Suppliers; +import java.util.function.Supplier; +import lombok.experimental.UtilityClass; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.hint.HintStrategyTable; +import org.apache.calcite.rel.hint.RelHint; +import org.apache.calcite.rel.logical.LogicalAggregate; +import org.apache.calcite.tools.RelBuilder; + +@UtilityClass +public class PPLHintUtils { + private static final String HINT_AGG_ARGUMENTS = "AGG_ARGS"; + private static final String KEY_IGNORE_NULL_BUCKET = "ignoreNullBucket"; + private static final String KEY_HAS_NESTED_AGG_CALL = "hasNestedAggCall"; + + private static final Supplier HINT_STRATEGY_TABLE = + Suppliers.memoize( + () -> + HintStrategyTable.builder() + .hintStrategy( + HINT_AGG_ARGUMENTS, + (hint, rel) -> { + return rel instanceof LogicalAggregate; + }) + // add more here + .build()); + + /** + * Add hint to aggregate to indicate that the aggregate will ignore null value bucket. Notice, the + * current peek of relBuilder is expected to be LogicalAggregate. + */ + public static void addIgnoreNullBucketHintToAggregate(RelBuilder relBuilder) { + assert relBuilder.peek() instanceof LogicalAggregate + : "Hint HINT_AGG_ARGUMENTS can be added to LogicalAggregate only"; + final RelHint statHint = + RelHint.builder(HINT_AGG_ARGUMENTS).hintOption(KEY_IGNORE_NULL_BUCKET, "true").build(); + relBuilder.hints(statHint); + if (relBuilder.getCluster().getHintStrategies() == HintStrategyTable.EMPTY) { + relBuilder.getCluster().setHintStrategies(HINT_STRATEGY_TABLE.get()); + } + } + + /** + * Add hint to aggregate to indicate that the aggregate has nested agg call. Notice, the current + * peek of relBuilder is expected to be LogicalAggregate. + */ + public static void addNestedAggCallHintToAggregate(RelBuilder relBuilder) { + assert relBuilder.peek() instanceof LogicalAggregate + : "Hint HINT_AGG_ARGUMENTS can be added to LogicalAggregate only"; + final RelHint statHint = + RelHint.builder(HINT_AGG_ARGUMENTS).hintOption(KEY_HAS_NESTED_AGG_CALL, "true").build(); + relBuilder.hints(statHint); + if (relBuilder.getCluster().getHintStrategies() == HintStrategyTable.EMPTY) { + relBuilder.getCluster().setHintStrategies(HINT_STRATEGY_TABLE.get()); + } + } + + /** Return true if the aggregate will ignore null value bucket. */ + public static boolean ignoreNullBucket(Aggregate aggregate) { + return aggregate.getHints().stream() + .anyMatch( + hint -> + hint.hintName.equals(PPLHintUtils.HINT_AGG_ARGUMENTS) + && hint.kvOptions.getOrDefault(KEY_IGNORE_NULL_BUCKET, "false").equals("true")); + } + + /** Return true if the aggregate has any nested agg call. */ + public static boolean hasNestedAggCall(Aggregate aggregate) { + return aggregate.getHints().stream() + .anyMatch( + hint -> + hint.hintName.equals(PPLHintUtils.HINT_AGG_ARGUMENTS) + && hint.kvOptions + .getOrDefault(KEY_HAS_NESTED_AGG_CALL, "false") + .equals("true")); + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java b/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java index d89c36601ef..b5d22a19601 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java @@ -36,8 +36,6 @@ import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rel.core.TableScan; -import org.apache.calcite.rel.hint.RelHint; -import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.logical.LogicalSort; @@ -62,7 +60,6 @@ import org.apache.calcite.util.mapping.Mappings; import org.opensearch.sql.ast.AbstractNodeVisitor; import org.opensearch.sql.ast.Node; -import org.opensearch.sql.ast.expression.Argument; import org.opensearch.sql.ast.expression.IntervalUnit; import org.opensearch.sql.ast.expression.SpanUnit; import org.opensearch.sql.ast.expression.WindowBound; @@ -610,15 +607,6 @@ static void replaceTop(RelBuilder relBuilder, RelNode relNode) { } } - static void addIgnoreNullBucketHintToAggregate(RelBuilder relBuilder) { - final RelHint statHits = - RelHint.builder("stats_args").hintOption(Argument.BUCKET_NULLABLE, "false").build(); - assert relBuilder.peek() instanceof LogicalAggregate - : "Stats hits should be added to LogicalAggregate"; - relBuilder.hints(statHits); - relBuilder.getCluster().setHintStrategies(PPLHintStrategyTable.getHintStrategyTable()); - } - /** Extract the RexLiteral from the aggregate call if the aggregate call is a LITERAL_AGG. */ static @Nullable RexLiteral getObjectFromLiteralAgg(AggregateCall aggCall) { if (aggCall.getAggregation().kind == SqlKind.LITERAL_AGG) { @@ -655,13 +643,7 @@ private static boolean isNotNullOnRef(RexNode rex) { && rexCall.getOperands().get(0) instanceof RexInputRef; } - Predicate aggIgnoreNullBucket = - agg -> - agg.getHints().stream() - .anyMatch( - hint -> - hint.hintName.equals("stats_args") - && hint.kvOptions.get(Argument.BUCKET_NULLABLE).equals("false")); + Predicate aggIgnoreNullBucket = PPLHintUtils::ignoreNullBucket; Predicate maybeTimeSpanAgg = agg -> diff --git a/core/src/main/java/org/opensearch/sql/utils/Utils.java b/core/src/main/java/org/opensearch/sql/utils/Utils.java new file mode 100644 index 00000000000..6cee8f28129 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/utils/Utils.java @@ -0,0 +1,59 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.utils; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.data.type.ExprType; + +public interface Utils { + static List> zipWithIndex(List input) { + LinkedList> result = new LinkedList<>(); + Iterator iter = input.iterator(); + int index = 0; + while (iter.hasNext()) { + result.add(Pair.of(iter.next(), index++)); + } + return result; + } + + /** + * Resolve the nested path from the field name. + * + * @param path the field name + * @param fieldTypes the field types + * @return the nested path if exists, otherwise null + */ + static @Nullable String resolveNestedPath(String path, Map fieldTypes) { + if (path == null || fieldTypes == null || fieldTypes.isEmpty()) { + return null; + } + boolean found = false; + String current = path; + String parent = StringUtils.substringBeforeLast(current, "."); + while (parent != null && !parent.equals(current)) { + ExprType pathType = fieldTypes.get(parent); + // Nested is mapped to ExprCoreType.ARRAY + if (pathType == ExprCoreType.ARRAY) { + found = true; + break; + } + current = parent; + parent = StringUtils.substringBeforeLast(current, "."); + } + if (found) { + return parent; + } else { + return null; + } + } +} diff --git a/core/src/test/java/org/opensearch/sql/utils/UtilsTest.java b/core/src/test/java/org/opensearch/sql/utils/UtilsTest.java new file mode 100644 index 00000000000..7483416b7b2 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/utils/UtilsTest.java @@ -0,0 +1,92 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.utils; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.Test; +import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.data.type.ExprType; + +public class UtilsTest { + + @Test + void testResolveNestedPathWithNullFieldTypes() { + assertNull(Utils.resolveNestedPath("a.b.c", null)); + } + + @Test + void testResolveNestedPathWithEmptyFieldTypes() { + assertNull(Utils.resolveNestedPath("a.b.c", new HashMap<>())); + } + + @Test + void testResolveNestedPathWithArrayType() { + Map fieldTypes = new HashMap<>(); + fieldTypes.put("a", ExprCoreType.ARRAY); + fieldTypes.put("a.b", ExprCoreType.STRING); + + assertEquals("a", Utils.resolveNestedPath("a.b.c", fieldTypes)); + } + + @Test + void testResolveNestedPathWithNestedArrayType() { + Map fieldTypes = new HashMap<>(); + fieldTypes.put("a", ExprCoreType.STRUCT); + fieldTypes.put("a.b", ExprCoreType.ARRAY); + fieldTypes.put("a.b.c", ExprCoreType.STRING); + + assertEquals("a.b", Utils.resolveNestedPath("a.b.c.d", fieldTypes)); + } + + @Test + void testResolveNestedPathWithNoArrayType() { + Map fieldTypes = new HashMap<>(); + fieldTypes.put("a", ExprCoreType.STRUCT); + fieldTypes.put("a.b", ExprCoreType.STRUCT); + fieldTypes.put("a.b.c", ExprCoreType.STRING); + + assertNull(Utils.resolveNestedPath("a.b.c.d", fieldTypes)); + } + + @Test + void testResolveNestedPathWithSingleLevel() { + Map fieldTypes = new HashMap<>(); + fieldTypes.put("a", ExprCoreType.STRING); + + assertNull(Utils.resolveNestedPath("a.b", fieldTypes)); + } + + @Test + void testResolveNestedPathWithTopLevelArray() { + Map fieldTypes = new HashMap<>(); + fieldTypes.put("a", ExprCoreType.ARRAY); + + assertEquals("a", Utils.resolveNestedPath("a.b", fieldTypes)); + } + + @Test + void testResolveNestedPathWithMultipleLevels() { + Map fieldTypes = new HashMap<>(); + fieldTypes.put("a", ExprCoreType.STRUCT); + fieldTypes.put("a.b", ExprCoreType.STRUCT); + fieldTypes.put("a.b.c", ExprCoreType.ARRAY); + fieldTypes.put("a.b.c.d", ExprCoreType.STRING); + + assertEquals("a.b.c", Utils.resolveNestedPath("a.b.c.d.e", fieldTypes)); + } + + @Test + void testResolveNestedPathWithNoParent() { + Map fieldTypes = new HashMap<>(); + fieldTypes.put("x", ExprCoreType.STRING); + + assertNull(Utils.resolveNestedPath("a", fieldTypes)); + } +} diff --git a/docs/user/ppl/cmd/explain.md b/docs/user/ppl/cmd/explain.md index fb60a3b1207..87a2e89bd70 100644 --- a/docs/user/ppl/cmd/explain.md +++ b/docs/user/ppl/cmd/explain.md @@ -10,7 +10,7 @@ explain queryStatement * standard: The default mode. Display logical and physical plan with pushdown information (DSL). * simple: Display the logical plan tree without attributes. * cost: Display the standard information plus plan cost attributes. - * extended: Display the standard information plus generated code. + * extended: Display the standard information plus generated code, if the whole plan is able to pushdown, it equals to standard mode. * queryStatement: mandatory. A PPL query to explain. ## Example 1: Explain a PPL query in v2 engine diff --git a/docs/user/ppl/interfaces/endpoint.md b/docs/user/ppl/interfaces/endpoint.md index e1e9cf705bf..ee6b5f35ebc 100644 --- a/docs/user/ppl/interfaces/endpoint.md +++ b/docs/user/ppl/interfaces/endpoint.md @@ -111,7 +111,7 @@ Explain query ```bash ppl curl -sS -H 'Content-Type: application/json' \ -X POST localhost:9200/_plugins/_ppl/_explain?format=extended \ --d '{"query" : "source=state_country | where age>30"}' +-d '{"query" : "source=state_country | head 10 | where age>30"}' ``` Expected output: @@ -119,9 +119,9 @@ Expected output: ```json { "calcite": { - "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(name=[$0], country=[$1], state=[$2], month=[$3], year=[$4], age=[$5])\n LogicalFilter(condition=[>($5, 30)])\n CalciteLogicalIndexScan(table=[[OpenSearch, state_country]])\n", - "physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, state_country]], PushDownContext=[[PROJECT->[name, country, state, month, year, age], FILTER->>($5, 30), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"name\",\"country\",\"state\",\"month\",\"year\",\"age\"],\"excludes\":[]}}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n", - "extended": "public org.apache.calcite.linq4j.Enumerable bind(final org.apache.calcite.DataContext root) {\n final org.opensearch.sql.opensearch.storage.scan.CalciteEnumerableIndexScan v1stashed = (org.opensearch.sql.opensearch.storage.scan.CalciteEnumerableIndexScan) root.get(\"v1stashed\");\n return v1stashed.scan();\n}\n\n\npublic Class getElementType() {\n return java.lang.Object[].class;\n}\n\n\n" + "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(name=[$0], country=[$1], state=[$2], month=[$3], year=[$4], age=[$5])\n LogicalFilter(condition=[>($5, 30)])\n LogicalSort(fetch=[10])\n CalciteLogicalIndexScan(table=[[OpenSearch, state_country]])\n", + "physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..5=[{inputs}], expr#6=[30], expr#7=[>($t5, $t6)], proj#0..5=[{exprs}], $condition=[$t7])\n CalciteEnumerableIndexScan(table=[[OpenSearch, state_country]], PushDownContext=[[PROJECT->[name, country, state, month, year, age], LIMIT->10], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"name\",\"country\",\"state\",\"month\",\"year\",\"age\"],\"excludes\":[]}}, requestedTotalSize=10, pageSize=null, startFrom=0)])\n", + "extended": "public org.apache.calcite.linq4j.Enumerable bind(final org.apache.calcite.DataContext root) {\n final org.opensearch.sql.opensearch.storage.scan.CalciteEnumerableIndexScan v1stashed = (org.opensearch.sql.opensearch.storage.scan.CalciteEnumerableIndexScan) root.get(\"v1stashed\");\n final org.apache.calcite.linq4j.Enumerable _inputEnumerable = v1stashed.scan();\n final org.apache.calcite.linq4j.AbstractEnumerable child = new org.apache.calcite.linq4j.AbstractEnumerable(){\n public org.apache.calcite.linq4j.Enumerator enumerator() {\n return new org.apache.calcite.linq4j.Enumerator(){\n public final org.apache.calcite.linq4j.Enumerator inputEnumerator = _inputEnumerable.enumerator();\n public void reset() {\n inputEnumerator.reset();\n }\n\n public boolean moveNext() {\n while (inputEnumerator.moveNext()) {\n final Long input_value = (Long) ((Object[]) inputEnumerator.current())[5];\n final Boolean binary_call_value = input_value == null ? null : Boolean.valueOf(input_value.longValue() > (long) 30);\n if (binary_call_value != null && org.apache.calcite.runtime.SqlFunctions.toBoolean(binary_call_value)) {\n return true;\n }\n }\n return false;\n }\n\n public void close() {\n inputEnumerator.close();\n }\n\n public Object current() {\n final Object[] current = (Object[]) inputEnumerator.current();\n final Object input_value = current[0];\n final Object input_value0 = current[1];\n final Object input_value1 = current[2];\n final Object input_value2 = current[3];\n final Object input_value3 = current[4];\n final Object input_value4 = current[5];\n return new Object[] {\n input_value,\n input_value0,\n input_value1,\n input_value2,\n input_value3,\n input_value4};\n }\n\n };\n }\n\n };\n return child.take(10000);\n}\n\n\npublic Class getElementType() {\n return java.lang.Object[].class;\n}\n\n\n" } } ``` diff --git a/docs/user/ppl/limitations/limitations.md b/docs/user/ppl/limitations/limitations.md index 53adc1f072d..ac7494386dd 100644 --- a/docs/user/ppl/limitations/limitations.md +++ b/docs/user/ppl/limitations/limitations.md @@ -39,7 +39,7 @@ For a field to be queryable in PPL, the following index settings must be enabled ## Nested Field Behavior -* There are [limitations](https://github.com/opensearch-project/sql/issues/52) regarding the nested levels and query types that needs improvement. +* There are [limitations](https://github.com/opensearch-project/sql/issues/4625) regarding the nested levels and query types that need improvement. ## Multi-value Field Behavior diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteArrayFunctionIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteArrayFunctionIT.java index 1b32b76b611..26e8ecf73f6 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteArrayFunctionIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteArrayFunctionIT.java @@ -66,8 +66,8 @@ public void testArrayWithMix() { verifyErrorMessageContains( e, - "Cannot resolve function: ARRAY, arguments: [INTEGER,BOOLEAN], caused by: fail to create" - + " array with fixed type"); + "fail to create array with fixed type: At line 0, column 0: Cannot infer return type for" + + " array; operand types: [INTEGER, BOOLEAN]"); } @Test diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExpandCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExpandCommandIT.java index 02ac75bb4b3..f7e994223c6 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExpandCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExpandCommandIT.java @@ -46,6 +46,7 @@ public void testExpandOnNested() throws Exception { null, 24, new JSONObject() + .put("area", 300.13) .put("city", "New york city") .put("state", "NY") .put("moveInDate", new JSONObject().put("dateAndTime", "1984-04-12 09:07:42"))), @@ -54,6 +55,7 @@ public void testExpandOnNested() throws Exception { null, 24, new JSONObject() + .put("area", 400.99) .put("city", "bellevue") .put("state", "WA") .put( @@ -66,6 +68,7 @@ public void testExpandOnNested() throws Exception { null, 24, new JSONObject() + .put("area", 127.4) .put("city", "seattle") .put("state", "WA") .put("moveInDate", new JSONObject().put("dateAndTime", "1966-03-19 03:04:55"))), @@ -74,6 +77,7 @@ public void testExpandOnNested() throws Exception { null, 24, new JSONObject() + .put("area", 10.24) .put("city", "chicago") .put("state", "IL") .put("moveInDate", new JSONObject().put("dateAndTime", "2011-06-01 01:01:42"))), @@ -82,6 +86,7 @@ public void testExpandOnNested() throws Exception { null, 32, new JSONObject() + .put("area", 1000.99) .put("city", "Miami") .put("state", "Florida") .put("moveInDate", new JSONObject().put("dateAndTime", "1901-08-11 04:03:33"))), @@ -90,6 +95,7 @@ public void testExpandOnNested() throws Exception { null, 32, new JSONObject() + .put("area", 9.99) .put("city", "los angeles") .put("state", "CA") .put("moveInDate", new JSONObject().put("dateAndTime", "2023-05-03 08:07:42"))), @@ -98,6 +104,7 @@ public void testExpandOnNested() throws Exception { null, 26, new JSONObject() + .put("area", 231.01) .put("city", "san diego") .put("state", "CA") .put("moveInDate", new JSONObject().put("dateAndTime", "2001-11-11 04:07:44"))), @@ -106,6 +113,7 @@ public void testExpandOnNested() throws Exception { null, 26, new JSONObject() + .put("area", 429.79) .put("city", "austin") .put("state", "TX") .put("moveInDate", new JSONObject().put("dateAndTime", "1977-07-13 09:04:41"))), @@ -122,6 +130,7 @@ public void testExpandOnNested() throws Exception { null, 25, new JSONObject() + .put("area", 190.5) .put("city", "raleigh") .put("state", "NC") .put("moveInDate", new JSONObject().put("dateAndTime", "1909-06-17 01:04:21"))), @@ -160,6 +169,7 @@ public void testExpandWithAlias() throws Exception { schema("age", "bigint"), schema("id", "bigint"), schema("addr", "struct")); + System.out.println(response); verifyDataRows( response, rows( @@ -167,6 +177,7 @@ public void testExpandWithAlias() throws Exception { null, 24, new JSONObject() + .put("area", 300.13) .put("city", "New york city") .put("state", "NY") .put("moveInDate", new JSONObject().put("dateAndTime", "1984-04-12 09:07:42"))), @@ -175,6 +186,7 @@ public void testExpandWithAlias() throws Exception { null, 24, new JSONObject() + .put("area", 400.99) .put("city", "bellevue") .put("state", "WA") .put( @@ -187,6 +199,7 @@ public void testExpandWithAlias() throws Exception { null, 24, new JSONObject() + .put("area", 127.4) .put("city", "seattle") .put("state", "WA") .put("moveInDate", new JSONObject().put("dateAndTime", "1966-03-19 03:04:55"))), @@ -195,6 +208,7 @@ public void testExpandWithAlias() throws Exception { null, 24, new JSONObject() + .put("area", 10.24) .put("city", "chicago") .put("state", "IL") .put("moveInDate", new JSONObject().put("dateAndTime", "2011-06-01 01:01:42"))), @@ -203,6 +217,7 @@ public void testExpandWithAlias() throws Exception { null, 32, new JSONObject() + .put("area", 1000.99) .put("city", "Miami") .put("state", "Florida") .put("moveInDate", new JSONObject().put("dateAndTime", "1901-08-11 04:03:33"))), @@ -211,6 +226,7 @@ public void testExpandWithAlias() throws Exception { null, 32, new JSONObject() + .put("area", 9.99) .put("city", "los angeles") .put("state", "CA") .put("moveInDate", new JSONObject().put("dateAndTime", "2023-05-03 08:07:42"))), @@ -219,6 +235,7 @@ public void testExpandWithAlias() throws Exception { null, 26, new JSONObject() + .put("area", 231.01) .put("city", "san diego") .put("state", "CA") .put("moveInDate", new JSONObject().put("dateAndTime", "2001-11-11 04:07:44"))), @@ -227,6 +244,7 @@ public void testExpandWithAlias() throws Exception { null, 26, new JSONObject() + .put("area", 429.79) .put("city", "austin") .put("state", "TX") .put("moveInDate", new JSONObject().put("dateAndTime", "1977-07-13 09:04:41"))), @@ -243,6 +261,7 @@ public void testExpandWithAlias() throws Exception { null, 25, new JSONObject() + .put("area", 190.5) .put("city", "raleigh") .put("state", "NC") .put("moveInDate", new JSONObject().put("dateAndTime", "1909-06-17 01:04:21"))), diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index cdea1738eb0..e5b07983b6e 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -19,6 +19,7 @@ import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_WORK_INFORMATION; import static org.opensearch.sql.util.MatcherUtils.assertJsonEqualsIgnoreId; import static org.opensearch.sql.util.MatcherUtils.assertYamlEqualsIgnoreId; +import static org.opensearch.sql.util.MatcherUtils.verifyErrorMessageContains; import java.io.IOException; import java.util.Locale; @@ -2215,4 +2216,95 @@ public void testRexStandardizationForScript() throws IOException { TEST_INDEX_BANK), true)); } + + @Test + public void testNestedAggPushDownExplain() throws Exception { + enabledOnlyWhenPushdownIsEnabled(); + String expected = loadExpectedPlan("explain_nested_agg_push.yaml"); + + assertYamlEqualsIgnoreId( + expected, + explainQueryYaml( + "source=opensearch-sql_test_index_nested_simple | stats count(address.area) as" + + " count_area, min(address.area) as min_area, max(address.area) as max_area," + + " avg(address.area) as avg_area, avg(age) as avg_age by name")); + } + + @Test + public void testNestedSingleCountPushDownExplain() throws Exception { + enabledOnlyWhenPushdownIsEnabled(); + String expected = loadExpectedPlan("explain_nested_agg_single_count_push.yaml"); + assertYamlEqualsIgnoreId( + expected, + explainQueryYaml( + "source=opensearch-sql_test_index_nested_simple | stats count(address.area)")); + } + + @Test + public void testNestedAggPushDownSortExplain() throws Exception { + enabledOnlyWhenPushdownIsEnabled(); + String expected = loadExpectedPlan("explain_nested_agg_sort_push.yaml"); + + assertYamlEqualsIgnoreId( + expected, + explainQueryYaml( + "source=opensearch-sql_test_index_nested_simple | stats count() by address.city | sort" + + " -address.city")); + } + + @Test + public void testNestedAggByPushDownExplain() throws Exception { + enabledOnlyWhenPushdownIsEnabled(); + String expected = loadExpectedPlan("explain_nested_agg_by_push.yaml"); + assertYamlEqualsIgnoreId( + expected, + explainQueryYaml( + "source=opensearch-sql_test_index_nested_simple | stats min(address.area) by" + + " address.city")); + // Whatever bucket_nullable=false or bucket_nullable=true is, the plans should be the same. + // The filter(is_not_null) can be safe removed since nested agg only works when pushdown is + // applied. + expected = loadExpectedPlan("explain_nested_agg_by_bucket_nullable_push.yaml"); + assertYamlEqualsIgnoreId( + expected, + explainQueryYaml( + "source=opensearch-sql_test_index_nested_simple | stats bucket_nullable=false" + + " min(address.area) by address.city")); + } + + @Test + public void testNestedAggTop() throws Exception { + enabledOnlyWhenPushdownIsEnabled(); + String expected = loadExpectedPlan("explain_nested_agg_top_push.yaml"); + + assertYamlEqualsIgnoreId( + expected, + explainQueryYaml( + "source=opensearch-sql_test_index_nested_simple | top usenull=false address.city")); + } + + @Test + public void testNestedAggDedupNotPushed() throws Exception { + enabledOnlyWhenPushdownIsEnabled(); + String expected = loadExpectedPlan("explain_nested_agg_dedup_not_push.yaml"); + + assertYamlEqualsIgnoreId( + expected, + explainQueryYaml("source=opensearch-sql_test_index_nested_simple | dedup address.city")); + } + + @Test + public void testNestedAggExplainWhenPushdownNotApplied() throws Exception { + enabledOnlyWhenPushdownIsEnabled(); + Throwable e = + assertThrowsWithReplace( + UnsupportedOperationException.class, + () -> + explainQueryYaml( + "source=opensearch-sql_test_index_nested_simple | head 10000 | stats" + + " count(address.area) as count_area, min(address.area) as min_area," + + " max(address.area) as max_area, avg(address.area) as avg_area, avg(age)" + + " as avg_age by name")); + verifyErrorMessageContains(e, "Cannot execute nested aggregation on"); + } } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLNestedAggregationIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLNestedAggregationIT.java new file mode 100644 index 00000000000..faaae541d1e --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLNestedAggregationIT.java @@ -0,0 +1,186 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.remote; + +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_NESTED_SIMPLE; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.schema; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; +import static org.opensearch.sql.util.MatcherUtils.verifyErrorMessageContains; +import static org.opensearch.sql.util.MatcherUtils.verifyNumOfRows; +import static org.opensearch.sql.util.MatcherUtils.verifySchema; +import static org.opensearch.sql.util.MatcherUtils.verifySchemaInOrder; + +import java.io.IOException; +import org.json.JSONObject; +import org.junit.jupiter.api.Test; +import org.opensearch.sql.ppl.PPLIntegTestCase; + +public class CalcitePPLNestedAggregationIT extends PPLIntegTestCase { + @Override + public void init() throws Exception { + super.init(); + enableCalcite(); + + loadIndex(Index.NESTED_SIMPLE); + } + + @Test + public void testNestedAggregation() throws IOException { + enabledOnlyWhenPushdownIsEnabled(); + JSONObject actual = + executeQuery( + String.format( + "source=%s | stats count(address.area) as count_area, min(address.area) as" + + " min_area, max(address.area) as max_area, avg(address.area) as avg_area," + + " avg(age) as avg_age", + TEST_INDEX_NESTED_SIMPLE)); + verifySchemaInOrder( + actual, + isCalciteEnabled() ? schema("count_area", "bigint") : schema("count_area", "int"), + schema("min_area", "double"), + schema("max_area", "double"), + schema("avg_area", "double"), + schema("avg_age", "double")); + verifyDataRows(actual, rows(9, 9.99, 1000.99, 300.11555555555555, 25.2)); + } + + @Test + public void testNestedAggregationBy() throws IOException { + enabledOnlyWhenPushdownIsEnabled(); + JSONObject actual = + executeQuery( + String.format( + "source=%s | stats count(address.area) as count_area, min(address.area) as" + + " min_area, max(address.area) as max_area, avg(address.area) as avg_area," + + " avg(age) as avg_age by name", + TEST_INDEX_NESTED_SIMPLE)); + verifySchemaInOrder( + actual, + isCalciteEnabled() ? schema("count_area", "bigint") : schema("count_area", "int"), + schema("min_area", "double"), + schema("max_area", "double"), + schema("avg_area", "double"), + schema("avg_age", "double"), + schema("name", "string")); + verifyDataRows( + actual, + rows(4, 10.24, 400.99, 209.69, 24, "abbas"), + rows(0, null, null, null, 19, "andy"), + rows(2, 9.99, 1000.99, 505.49, 32, "chen"), + rows(1, 190.5, 190.5, 190.5, 25, "david"), + rows(2, 231.01, 429.79, 330.4, 26, "peng")); + } + + @Test + public void testNestedAggregationBySpan() throws IOException { + enabledOnlyWhenPushdownIsEnabled(); + JSONObject actual = + executeQuery( + String.format( + "source=%s | stats count(address.area) as count_area, min(address.area) as" + + " min_area, max(address.area) as max_area, avg(address.area) as avg_area," + + " avg(age) as avg_age by span(age, 10)", + TEST_INDEX_NESTED_SIMPLE)); + verifyDataRows( + actual, + rows(0, null, null, null, 19, 10), + rows(7, 10.24, 429.79, 241.43714285714285, 25, 20), + rows(2, 9.99, 1000.99, 505.49, 32, 30)); + } + + @Test + public void testNestedAggregationSingleCount() throws IOException { + enabledOnlyWhenPushdownIsEnabled(); + JSONObject actual = + executeQuery( + String.format( + "source=%s | stats count(address.city), count(address.area)", + TEST_INDEX_NESTED_SIMPLE)); + verifyDataRows(actual, rows(11, 9)); + } + + @Test + public void testNestedAggregationByNestedPath() throws IOException { + enabledOnlyWhenPushdownIsEnabled(); + JSONObject actual = + executeQuery( + String.format( + "source=%s | stats count(), min(age), min(address.area) by address.city", + TEST_INDEX_NESTED_SIMPLE)); + verifySchema( + actual, + schema("count()", null, "bigint"), + schema("min(age)", null, "bigint"), + schema("min(address.area)", null, "double"), + schema("address.city", null, "string")); + verifyDataRows( + actual, + rows(1, null, 9.99, "Miami"), + rows(1, null, 10.24, "New york city"), + rows(1, null, null, "austin"), + rows(1, null, null, "bellevue"), + rows(1, null, null, "charlotte"), + rows(1, null, null, "chicago"), + rows(1, null, null, "houston"), + rows(1, null, null, "los angeles"), + rows(1, null, 190.5, "raleigh"), + rows(1, null, 231.01, "san diego"), + rows(1, null, null, "seattle")); + } + + @Test + public void testNestedAggregationByNestedRoot() throws IOException { + enabledOnlyWhenPushdownIsEnabled(); + JSONObject actual = + executeQuery( + String.format( + "source=%s | stats count(), min(age) by address", TEST_INDEX_NESTED_SIMPLE)); + verifySchema( + actual, + schema("count()", null, "bigint"), + schema("min(age)", null, "bigint"), + schema("address", null, "array")); + verifyNumOfRows(actual, 5); + } + + @Test + public void testRareTopNestedRoot() throws IOException { + enabledOnlyWhenPushdownIsEnabled(); + JSONObject actual = + executeQuery(String.format("source=%s | top address", TEST_INDEX_NESTED_SIMPLE)); + verifySchemaInOrder(actual, schema("address", null, "array"), schema("count", null, "bigint")); + verifyNumOfRows(actual, 5); + } + + @Test + public void testDedupNestedRoot() throws IOException { + enabledOnlyWhenPushdownIsEnabled(); + JSONObject actual = + executeQuery(String.format("source=%s | dedup address", TEST_INDEX_NESTED_SIMPLE)); + verifySchemaInOrder( + actual, + schema("name", null, "string"), + schema("address", null, "array"), + schema("id", null, "bigint"), + schema("age", null, "bigint")); + verifyNumOfRows(actual, 5); + } + + @Test + public void testNestedAggregationThrowExceptionIfPushdownCannotApplied() throws IOException { + enabledOnlyWhenPushdownIsEnabled(); + Throwable t = + assertThrowsWithReplace( + UnsupportedOperationException.class, + () -> + executeQuery( + String.format( + "source=%s | stats count(), min(age), min(address.area) by address", + TEST_INDEX_NESTED_SIMPLE))); + verifyErrorMessageContains(t, "Cannot execute nested aggregation"); + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLStringBuiltinFunctionIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLStringBuiltinFunctionIT.java index 3e0b6cb07aa..f3d3852f938 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLStringBuiltinFunctionIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLStringBuiltinFunctionIT.java @@ -12,6 +12,7 @@ import static org.opensearch.sql.util.MatcherUtils.rows; import java.io.IOException; +import java.util.regex.PatternSyntaxException; import org.json.JSONObject; import org.junit.jupiter.api.Test; import org.opensearch.client.Request; @@ -404,42 +405,39 @@ public void testReplaceWithInvalidRegexPattern() { // Test invalid regex pattern - unclosed character class Throwable e1 = assertThrowsWithReplace( - Exception.class, + PatternSyntaxException.class, () -> executeQuery( String.format( "source=%s | eval result = replace(firstname, '[unclosed', 'X') | fields" + " firstname, result", TEST_INDEX_ACCOUNT))); - verifyErrorMessageContains(e1, "Invalid regex pattern"); verifyErrorMessageContains(e1, "Unclosed character class"); verifyErrorMessageContains(e1, "400 Bad Request"); // Test invalid regex pattern - unclosed group Throwable e2 = assertThrowsWithReplace( - Exception.class, + PatternSyntaxException.class, () -> executeQuery( String.format( "source=%s | eval result = replace(firstname, '(invalid', 'X') | fields" + " firstname, result", TEST_INDEX_ACCOUNT))); - verifyErrorMessageContains(e2, "Invalid regex pattern"); verifyErrorMessageContains(e2, "Unclosed group"); verifyErrorMessageContains(e2, "400 Bad Request"); // Test invalid regex pattern - dangling metacharacter Throwable e3 = assertThrowsWithReplace( - Exception.class, + PatternSyntaxException.class, () -> executeQuery( String.format( "source=%s | eval result = replace(firstname, '?invalid', 'X') | fields" + " firstname, result", TEST_INDEX_ACCOUNT))); - verifyErrorMessageContains(e3, "Invalid regex pattern"); verifyErrorMessageContains(e3, "Dangling meta character"); verifyErrorMessageContains(e3, "400 Bad Request"); } diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/CastFunctionIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/CastFunctionIT.java index fcc0362f057..56e52610aba 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/CastFunctionIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/CastFunctionIT.java @@ -448,8 +448,8 @@ public void testCastToIP() throws IOException { TEST_INDEX_WEBLOGS))); verifyErrorMessageContains( t, - "IP address string 'invalid_ip' is not valid. Error details: invalid_ip IP Address error:" - + " validation options do not allow you to specify a non-segmented single value"); + "invalid_ip IP Address error: validation options do not allow you to specify a" + + " non-segmented single value"); } @Test diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_extended_for_standardization.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_extended_for_standardization.json index 6c60de5dccc..e56186feb95 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_extended_for_standardization.json +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_extended_for_standardization.json @@ -1,7 +1,6 @@ { "calcite": { "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(avg_age=[$1], age_range=[$0])\n LogicalAggregate(group=[{0}], avg_age=[AVG($1)])\n LogicalProject(age_range=[CASE(<($10, 30), 'u30':VARCHAR, SEARCH($10, Sarg[[30..40]]), 'u40':VARCHAR, 'u100':VARCHAR)], age=[$10])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n", - "physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},avg_age=AVG($1)), PROJECT->[avg_age, age_range], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"age_range\":{\"terms\":{\"script\":{\"source\":\"{\\\"langType\\\":\\\"calcite\\\",\\\"script\\\":\\\"{\\\\n \\\\\\\"op\\\\\\\": {\\\\n \\\\\\\"name\\\\\\\": \\\\\\\"CASE\\\\\\\",\\\\n \\\\\\\"kind\\\\\\\": \\\\\\\"CASE\\\\\\\",\\\\n \\\\\\\"syntax\\\\\\\": \\\\\\\"SPECIAL\\\\\\\"\\\\n },\\\\n \\\\\\\"operands\\\\\\\": [\\\\n {\\\\n \\\\\\\"op\\\\\\\": {\\\\n \\\\\\\"name\\\\\\\": \\\\\\\"<\\\\\\\",\\\\n \\\\\\\"kind\\\\\\\": \\\\\\\"LESS_THAN\\\\\\\",\\\\n \\\\\\\"syntax\\\\\\\": \\\\\\\"BINARY\\\\\\\"\\\\n },\\\\n \\\\\\\"operands\\\\\\\": [\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 0,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"BIGINT\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true\\\\n }\\\\n },\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 1,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"BIGINT\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true\\\\n }\\\\n }\\\\n ]\\\\n },\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 2,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"VARCHAR\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true,\\\\n \\\\\\\"precision\\\\\\\": -1\\\\n }\\\\n },\\\\n {\\\\n \\\\\\\"op\\\\\\\": {\\\\n \\\\\\\"name\\\\\\\": \\\\\\\"AND\\\\\\\",\\\\n \\\\\\\"kind\\\\\\\": \\\\\\\"AND\\\\\\\",\\\\n \\\\\\\"syntax\\\\\\\": \\\\\\\"BINARY\\\\\\\"\\\\n },\\\\n \\\\\\\"operands\\\\\\\": [\\\\n {\\\\n \\\\\\\"op\\\\\\\": {\\\\n \\\\\\\"name\\\\\\\": \\\\\\\"<=\\\\\\\",\\\\n \\\\\\\"kind\\\\\\\": \\\\\\\"LESS_THAN_OR_EQUAL\\\\\\\",\\\\n \\\\\\\"syntax\\\\\\\": \\\\\\\"BINARY\\\\\\\"\\\\n },\\\\n \\\\\\\"operands\\\\\\\": [\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 3,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"BIGINT\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true\\\\n }\\\\n },\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 4,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"BIGINT\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true\\\\n }\\\\n }\\\\n ]\\\\n },\\\\n {\\\\n \\\\\\\"op\\\\\\\": {\\\\n \\\\\\\"name\\\\\\\": \\\\\\\"<=\\\\\\\",\\\\n \\\\\\\"kind\\\\\\\": \\\\\\\"LESS_THAN_OR_EQUAL\\\\\\\",\\\\n \\\\\\\"syntax\\\\\\\": \\\\\\\"BINARY\\\\\\\"\\\\n },\\\\n \\\\\\\"operands\\\\\\\": [\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 5,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"BIGINT\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true\\\\n }\\\\n },\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 6,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"BIGINT\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true\\\\n }\\\\n }\\\\n ]\\\\n }\\\\n ]\\\\n },\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 7,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"VARCHAR\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true,\\\\n \\\\\\\"precision\\\\\\\": -1\\\\n }\\\\n },\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 8,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"VARCHAR\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true,\\\\n \\\\\\\"precision\\\\\\\": -1\\\\n }\\\\n }\\\\n ]\\\\n}\\\"}\",\"lang\":\"opensearch_compounded_script\",\"params\":{\"utcTimestamp\":1765429962288092000,\"SOURCES\":[0,2,2,2,0,0,2,2,2],\"DIGESTS\":[\"age\",30,\"u30\",30,\"age\",\"age\",40,\"u40\",\"u100\"]}},\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n", - "extended": "public org.apache.calcite.linq4j.Enumerable bind(final org.apache.calcite.DataContext root) {\n final org.opensearch.sql.opensearch.storage.scan.CalciteEnumerableIndexScan v1stashed = (org.opensearch.sql.opensearch.storage.scan.CalciteEnumerableIndexScan) root.get(\"v1stashed\");\n return v1stashed.scan();\n}\n\n\npublic Class getElementType() {\n return java.lang.Object[].class;\n}\n\n\n" + "physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},avg_age=AVG($1)), PROJECT->[avg_age, age_range], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"age_range\":{\"terms\":{\"script\":{\"source\":\"{\\\"langType\\\":\\\"calcite\\\",\\\"script\\\":\\\"{\\\\n \\\\\\\"op\\\\\\\": {\\\\n \\\\\\\"name\\\\\\\": \\\\\\\"CASE\\\\\\\",\\\\n \\\\\\\"kind\\\\\\\": \\\\\\\"CASE\\\\\\\",\\\\n \\\\\\\"syntax\\\\\\\": \\\\\\\"SPECIAL\\\\\\\"\\\\n },\\\\n \\\\\\\"operands\\\\\\\": [\\\\n {\\\\n \\\\\\\"op\\\\\\\": {\\\\n \\\\\\\"name\\\\\\\": \\\\\\\"<\\\\\\\",\\\\n \\\\\\\"kind\\\\\\\": \\\\\\\"LESS_THAN\\\\\\\",\\\\n \\\\\\\"syntax\\\\\\\": \\\\\\\"BINARY\\\\\\\"\\\\n },\\\\n \\\\\\\"operands\\\\\\\": [\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 0,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"BIGINT\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true\\\\n }\\\\n },\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 1,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"BIGINT\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true\\\\n }\\\\n }\\\\n ]\\\\n },\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 2,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"VARCHAR\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true,\\\\n \\\\\\\"precision\\\\\\\": -1\\\\n }\\\\n },\\\\n {\\\\n \\\\\\\"op\\\\\\\": {\\\\n \\\\\\\"name\\\\\\\": \\\\\\\"AND\\\\\\\",\\\\n \\\\\\\"kind\\\\\\\": \\\\\\\"AND\\\\\\\",\\\\n \\\\\\\"syntax\\\\\\\": \\\\\\\"BINARY\\\\\\\"\\\\n },\\\\n \\\\\\\"operands\\\\\\\": [\\\\n {\\\\n \\\\\\\"op\\\\\\\": {\\\\n \\\\\\\"name\\\\\\\": \\\\\\\"<=\\\\\\\",\\\\n \\\\\\\"kind\\\\\\\": \\\\\\\"LESS_THAN_OR_EQUAL\\\\\\\",\\\\n \\\\\\\"syntax\\\\\\\": \\\\\\\"BINARY\\\\\\\"\\\\n },\\\\n \\\\\\\"operands\\\\\\\": [\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 3,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"BIGINT\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true\\\\n }\\\\n },\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 4,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"BIGINT\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true\\\\n }\\\\n }\\\\n ]\\\\n },\\\\n {\\\\n \\\\\\\"op\\\\\\\": {\\\\n \\\\\\\"name\\\\\\\": \\\\\\\"<=\\\\\\\",\\\\n \\\\\\\"kind\\\\\\\": \\\\\\\"LESS_THAN_OR_EQUAL\\\\\\\",\\\\n \\\\\\\"syntax\\\\\\\": \\\\\\\"BINARY\\\\\\\"\\\\n },\\\\n \\\\\\\"operands\\\\\\\": [\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 5,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"BIGINT\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true\\\\n }\\\\n },\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 6,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"BIGINT\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true\\\\n }\\\\n }\\\\n ]\\\\n }\\\\n ]\\\\n },\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 7,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"VARCHAR\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true,\\\\n \\\\\\\"precision\\\\\\\": -1\\\\n }\\\\n },\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 8,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"VARCHAR\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true,\\\\n \\\\\\\"precision\\\\\\\": -1\\\\n }\\\\n }\\\\n ]\\\\n}\\\"}\",\"lang\":\"opensearch_compounded_script\",\"params\":{\"utcTimestamp\":*,\"SOURCES\":[0,2,2,2,0,0,2,2,2],\"DIGESTS\":[\"age\",30,\"u30\",30,\"age\",\"age\",40,\"u40\",\"u100\"]}},\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n" } } diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_nested_agg_by_bucket_nullable_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_nested_agg_by_bucket_nullable_push.yaml new file mode 100644 index 00000000000..7653a0337ff --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_nested_agg_by_bucket_nullable_push.yaml @@ -0,0 +1,10 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(min(address.area)=[$1], address.city=[$0]) + LogicalAggregate(group=[{0}], min(address.area)=[MIN($1)]) + LogicalProject(address.city=[$3], address.area=[$2]) + LogicalFilter(condition=[IS NOT NULL($3)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_nested_simple]]) + physical: | + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_nested_simple]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={1},min(address.area)=MIN($0)), PROJECT->[min(address.area), address.city], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"nested_composite_buckets":{"nested":{"path":"address"},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"address.city":{"terms":{"field":"address.city.keyword","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"nested_min(address.area)":{"nested":{"path":"address"},"aggregations":{"min(address.area)":{"min":{"field":"address.area"}}}}}}}}}}, requestedTotalSize=10000, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_nested_agg_by_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_nested_agg_by_push.yaml new file mode 100644 index 00000000000..03acf0a409f --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_nested_agg_by_push.yaml @@ -0,0 +1,9 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(min(address.area)=[$1], address.city=[$0]) + LogicalAggregate(group=[{0}], min(address.area)=[MIN($1)]) + LogicalProject(address.city=[$3], address.area=[$2]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_nested_simple]]) + physical: | + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_nested_simple]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},min(address.area)=MIN($1)), PROJECT->[min(address.area), address.city], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"nested_composite_buckets":{"nested":{"path":"address"},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"address.city":{"terms":{"field":"address.city.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}}]},"aggregations":{"nested_min(address.area)":{"nested":{"path":"address"},"aggregations":{"min(address.area)":{"min":{"field":"address.area"}}}}}}}}}}, requestedTotalSize=10000, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_nested_agg_dedup_not_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_nested_agg_dedup_not_push.yaml new file mode 100644 index 00000000000..e5a235e9f98 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_nested_agg_dedup_not_push.yaml @@ -0,0 +1,13 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(name=[$0], address=[$1], id=[$7], age=[$8]) + LogicalFilter(condition=[<=($15, 1)]) + LogicalProject(name=[$0], address=[$1], address.area=[$2], address.city=[$3], address.moveInDate=[$4], address.moveInDate.dateAndTime=[$5], address.state=[$6], id=[$7], age=[$8], _id=[$9], _index=[$10], _score=[$11], _maxscore=[$12], _sort=[$13], _routing=[$14], _row_number_dedup_=[ROW_NUMBER() OVER (PARTITION BY $3)]) + LogicalFilter(condition=[IS NOT NULL($3)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_nested_simple]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..5=[{inputs}], expr#6=[1], expr#7=[<=($t5, $t6)], proj#0..1=[{exprs}], id=[$t3], age=[$t4], $condition=[$t7]) + EnumerableWindow(window#0=[window(partition {2} rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_nested_simple]], PushDownContext=[[PROJECT->[name, address, address.city, id, age], FILTER->IS NOT NULL($2)], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","query":{"nested":{"query":{"exists":{"field":"address.city","boost":1.0}},"path":"address","ignore_unmapped":false,"score_mode":"none","boost":1.0}},"_source":{"includes":["name","address","address.city","id","age"],"excludes":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_nested_agg_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_nested_agg_push.yaml new file mode 100644 index 00000000000..233773bf0d1 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_nested_agg_push.yaml @@ -0,0 +1,9 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(count_area=[$1], min_area=[$2], max_area=[$3], avg_area=[$4], avg_age=[$5], name=[$0]) + LogicalAggregate(group=[{0}], count_area=[COUNT($1)], min_area=[MIN($1)], max_area=[MAX($1)], avg_area=[AVG($1)], avg_age=[AVG($2)]) + LogicalProject(name=[$0], address.area=[$2], age=[$8]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_nested_simple]]) + physical: | + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_nested_simple]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count_area=COUNT($1),min_area=MIN($1),max_area=MAX($1),avg_area=AVG($1),avg_age=AVG($2)), PROJECT->[count_area, min_area, max_area, avg_area, avg_age, name], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"name":{"terms":{"field":"name.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}}]},"aggregations":{"nested_count_area":{"nested":{"path":"address"},"aggregations":{"count_area":{"value_count":{"field":"address.area"}}}},"nested_min_area":{"nested":{"path":"address"},"aggregations":{"min_area":{"min":{"field":"address.area"}}}},"nested_max_area":{"nested":{"path":"address"},"aggregations":{"max_area":{"max":{"field":"address.area"}}}},"nested_avg_area":{"nested":{"path":"address"},"aggregations":{"avg_area":{"avg":{"field":"address.area"}}}},"avg_age":{"avg":{"field":"age"}}}}}}, requestedTotalSize=10000, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_nested_agg_single_count_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_nested_agg_single_count_push.yaml new file mode 100644 index 00000000000..8fa4a02b204 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_nested_agg_single_count_push.yaml @@ -0,0 +1,9 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalAggregate(group=[{}], count(address.area)=[COUNT($0)]) + LogicalProject(address.area=[$2]) + LogicalFilter(condition=[IS NOT NULL($2)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_nested_simple]]) + physical: | + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_nested_simple]], PushDownContext=[[FILTER->IS NOT NULL($0), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={},count(address.area)=COUNT($0)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"nested":{"query":{"exists":{"field":"address.area","boost":1.0}},"path":"address","ignore_unmapped":false,"score_mode":"none","boost":1.0}},"aggregations":{"nested_count(address.area)":{"nested":{"path":"address"},"aggregations":{"count(address.area)":{"value_count":{"field":"address.area"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_nested_agg_sort_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_nested_agg_sort_push.yaml new file mode 100644 index 00000000000..0f2d5b397ee --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_nested_agg_sort_push.yaml @@ -0,0 +1,10 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$1], dir0=[DESC-nulls-last], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(sort0=[$1], dir0=[DESC-nulls-last]) + LogicalProject(count()=[$1], address.city=[$0]) + LogicalAggregate(group=[{0}], count()=[COUNT()]) + LogicalProject(address.city=[$3]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_nested_simple]]) + physical: | + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_nested_simple]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT()), PROJECT->[count(), address.city], SORT->[1 DESC LAST], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"nested_composite_buckets":{"nested":{"path":"address"},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"address.city":{"terms":{"field":"address.city.keyword","missing_bucket":true,"missing_order":"last","order":"desc"}}}]}}}}}}, requestedTotalSize=10000, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_nested_agg_top_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_nested_agg_top_push.yaml new file mode 100644 index 00000000000..65f3ed320fc --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_nested_agg_top_push.yaml @@ -0,0 +1,12 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(address.city=[$0], count=[$1]) + LogicalFilter(condition=[<=($2, 10)]) + LogicalProject(address.city=[$0], count=[$1], _row_number_rare_top_=[ROW_NUMBER() OVER (ORDER BY $1 DESC)]) + LogicalAggregate(group=[{0}], count=[COUNT()]) + LogicalProject(address.city=[$3]) + LogicalFilter(condition=[IS NOT NULL($3)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_nested_simple]]) + physical: | + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_nested_simple]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count=COUNT()), RARE_TOP->top 10 address.city, PROJECT->[address.city, count], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"nested_composite_buckets":{"nested":{"path":"address"},"aggregations":{"address.city":{"terms":{"field":"address.city.keyword","size":10,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false,"order":[{"_count":"desc"},{"_key":"asc"}]}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_partial_filter_isnull.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_partial_filter_isnull.json index 52c53337d61..590f5e273b1 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_partial_filter_isnull.json +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_partial_filter_isnull.json @@ -1,6 +1,6 @@ { "calcite": { - "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(name=[$0], address=[$1], id=[$6], age=[$7])\n LogicalFilter(condition=[AND(IS NULL($1), =($0, 'david'))])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_nested_simple]])\n", + "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(name=[$0], address=[$1], id=[$7], age=[$8])\n LogicalFilter(condition=[AND(IS NULL($1), =($0, 'david'))])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_nested_simple]])\n", "physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..3=[{inputs}], expr#4=[IS NULL($t1)], proj#0..3=[{exprs}], $condition=[$t4])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_nested_simple]], PushDownContext=[[PROJECT->[name, address, id, age], FILTER->=($0, 'david')], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"query\":{\"bool\":{\"must\":[{\"term\":{\"name.keyword\":{\"value\":\"david\",\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"name\",\"address\",\"id\",\"age\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" } -} +} \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_skip_script_encoding.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_skip_script_encoding.json index 99aa2e60259..978d6cf1bcc 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_skip_script_encoding.json +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_skip_script_encoding.json @@ -1 +1 @@ -{"calcite":{"logical":"LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(firstname=[$1], age=[$8], address=[$2])\n LogicalFilter(condition=[AND(=($2, '671 Bristol Street'), =(-($8, 2), 30))])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n","physical":"CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[firstname, address, age], SCRIPT->AND(=($1, '671 Bristol Street'), =(-($2, 2), 30)), PROJECT->[firstname, age, address], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"bool\":{\"must\":[{\"script\":{\"script\":{\"source\":\"{\\\"langType\\\":\\\"calcite\\\",\\\"script\\\":\\\"{\\\\n \\\\\\\"op\\\\\\\": {\\\\n \\\\\\\"name\\\\\\\": \\\\\\\"=\\\\\\\",\\\\n \\\\\\\"kind\\\\\\\": \\\\\\\"EQUALS\\\\\\\",\\\\n \\\\\\\"syntax\\\\\\\": \\\\\\\"BINARY\\\\\\\"\\\\n },\\\\n \\\\\\\"operands\\\\\\\": [\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 0,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"VARCHAR\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true,\\\\n \\\\\\\"precision\\\\\\\": -1\\\\n }\\\\n },\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 1,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"VARCHAR\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true,\\\\n \\\\\\\"precision\\\\\\\": -1\\\\n }\\\\n }\\\\n ]\\\\n}\\\"}\",\"lang\":\"opensearch_compounded_script\",\"params\":{\"utcTimestamp\":*,\"SOURCES\":[1,2],\"DIGESTS\":[\"address\",\"671 Bristol Street\"]}},\"boost\":1.0}},{\"script\":{\"script\":{\"source\":\"{\\\"langType\\\":\\\"calcite\\\",\\\"script\\\":\\\"{\\\\n \\\\\\\"op\\\\\\\": {\\\\n \\\\\\\"name\\\\\\\": \\\\\\\"=\\\\\\\",\\\\n \\\\\\\"kind\\\\\\\": \\\\\\\"EQUALS\\\\\\\",\\\\n \\\\\\\"syntax\\\\\\\": \\\\\\\"BINARY\\\\\\\"\\\\n },\\\\n \\\\\\\"operands\\\\\\\": [\\\\n {\\\\n \\\\\\\"op\\\\\\\": {\\\\n \\\\\\\"name\\\\\\\": \\\\\\\"-\\\\\\\",\\\\n \\\\\\\"kind\\\\\\\": \\\\\\\"MINUS\\\\\\\",\\\\n \\\\\\\"syntax\\\\\\\": \\\\\\\"BINARY\\\\\\\"\\\\n },\\\\n \\\\\\\"operands\\\\\\\": [\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 0,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"BIGINT\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true\\\\n }\\\\n },\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 1,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"BIGINT\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true\\\\n }\\\\n }\\\\n ],\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"BIGINT\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true\\\\n }\\\\n },\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 2,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"BIGINT\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true\\\\n }\\\\n }\\\\n ]\\\\n}\\\"}\",\"lang\":\"opensearch_compounded_script\",\"params\":{\"utcTimestamp\":*,\"SOURCES\":[0,2,2],\"DIGESTS\":[\"age\",2,30]}},\"boost\":1.0}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"firstname\",\"age\",\"address\"],\"excludes\":[]}}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n","extended":"public org.apache.calcite.linq4j.Enumerable bind(final org.apache.calcite.DataContext root) {\n final org.opensearch.sql.opensearch.storage.scan.CalciteEnumerableIndexScan v1stashed = (org.opensearch.sql.opensearch.storage.scan.CalciteEnumerableIndexScan) root.get(\"v1stashed\");\n return v1stashed.scan();\n}\n\n\npublic Class getElementType() {\n return java.lang.Object[].class;\n}\n\n\n"}} +{"calcite":{"logical":"LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(firstname=[$1], age=[$8], address=[$2])\n LogicalFilter(condition=[AND(=($2, '671 Bristol Street'), =(-($8, 2), 30))])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n","physical":"CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[firstname, address, age], SCRIPT->AND(=($1, '671 Bristol Street'), =(-($2, 2), 30)), PROJECT->[firstname, age, address], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"bool\":{\"must\":[{\"script\":{\"script\":{\"source\":\"{\\\"langType\\\":\\\"calcite\\\",\\\"script\\\":\\\"{\\\\n \\\\\\\"op\\\\\\\": {\\\\n \\\\\\\"name\\\\\\\": \\\\\\\"=\\\\\\\",\\\\n \\\\\\\"kind\\\\\\\": \\\\\\\"EQUALS\\\\\\\",\\\\n \\\\\\\"syntax\\\\\\\": \\\\\\\"BINARY\\\\\\\"\\\\n },\\\\n \\\\\\\"operands\\\\\\\": [\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 0,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"VARCHAR\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true,\\\\n \\\\\\\"precision\\\\\\\": -1\\\\n }\\\\n },\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 1,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"VARCHAR\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true,\\\\n \\\\\\\"precision\\\\\\\": -1\\\\n }\\\\n }\\\\n ]\\\\n}\\\"}\",\"lang\":\"opensearch_compounded_script\",\"params\":{\"utcTimestamp\":*,\"SOURCES\":[1,2],\"DIGESTS\":[\"address\",\"671 Bristol Street\"]}},\"boost\":1.0}},{\"script\":{\"script\":{\"source\":\"{\\\"langType\\\":\\\"calcite\\\",\\\"script\\\":\\\"{\\\\n \\\\\\\"op\\\\\\\": {\\\\n \\\\\\\"name\\\\\\\": \\\\\\\"=\\\\\\\",\\\\n \\\\\\\"kind\\\\\\\": \\\\\\\"EQUALS\\\\\\\",\\\\n \\\\\\\"syntax\\\\\\\": \\\\\\\"BINARY\\\\\\\"\\\\n },\\\\n \\\\\\\"operands\\\\\\\": [\\\\n {\\\\n \\\\\\\"op\\\\\\\": {\\\\n \\\\\\\"name\\\\\\\": \\\\\\\"-\\\\\\\",\\\\n \\\\\\\"kind\\\\\\\": \\\\\\\"MINUS\\\\\\\",\\\\n \\\\\\\"syntax\\\\\\\": \\\\\\\"BINARY\\\\\\\"\\\\n },\\\\n \\\\\\\"operands\\\\\\\": [\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 0,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"BIGINT\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true\\\\n }\\\\n },\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 1,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"BIGINT\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true\\\\n }\\\\n }\\\\n ],\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"BIGINT\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true\\\\n }\\\\n },\\\\n {\\\\n \\\\\\\"dynamicParam\\\\\\\": 2,\\\\n \\\\\\\"type\\\\\\\": {\\\\n \\\\\\\"type\\\\\\\": \\\\\\\"BIGINT\\\\\\\",\\\\n \\\\\\\"nullable\\\\\\\": true\\\\n }\\\\n }\\\\n ]\\\\n}\\\"}\",\"lang\":\"opensearch_compounded_script\",\"params\":{\"utcTimestamp\":*,\"SOURCES\":[0,2,2],\"DIGESTS\":[\"age\",2,30]}},\"boost\":1.0}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"firstname\",\"age\",\"address\"],\"excludes\":[]}}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n"}} diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_partial_filter_isnull.json b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_partial_filter_isnull.json index 75c84e80b6c..c2d6da05cde 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_partial_filter_isnull.json +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_partial_filter_isnull.json @@ -1,6 +1,6 @@ { "calcite": { - "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(name=[$0], address=[$1], id=[$6], age=[$7])\n LogicalFilter(condition=[AND(IS NULL($1), =($0, 'david'))])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_nested_simple]])\n", - "physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..13=[{inputs}], expr#14=[IS NULL($t1)], expr#15=['david':VARCHAR], expr#16=[=($t0, $t15)], expr#17=[AND($t14, $t16)], proj#0..1=[{exprs}], id=[$t6], age=[$t7], $condition=[$t17])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_nested_simple]])\n" + "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(name=[$0], address=[$1], id=[$7], age=[$8])\n LogicalFilter(condition=[AND(IS NULL($1), =($0, 'david'))])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_nested_simple]])\n", + "physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..14=[{inputs}], expr#15=[IS NULL($t1)], expr#16=['david':VARCHAR], expr#17=[=($t0, $t16)], expr#18=[AND($t15, $t17)], proj#0..1=[{exprs}], id=[$t7], age=[$t8], $condition=[$t18])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_nested_simple]])\n" } } diff --git a/integ-test/src/test/resources/indexDefinitions/nested_simple_index_mapping.json b/integ-test/src/test/resources/indexDefinitions/nested_simple_index_mapping.json index 7e521cdd44d..e5c26e2dfb6 100644 --- a/integ-test/src/test/resources/indexDefinitions/nested_simple_index_mapping.json +++ b/integ-test/src/test/resources/indexDefinitions/nested_simple_index_mapping.json @@ -29,6 +29,9 @@ "format": "basic_date_time" } } + }, + "area" : { + "type": "double" } } }, diff --git a/integ-test/src/test/resources/nested_simple.json b/integ-test/src/test/resources/nested_simple.json index f3cb1a5ebe4..6dd4d294aa0 100644 --- a/integ-test/src/test/resources/nested_simple.json +++ b/integ-test/src/test/resources/nested_simple.json @@ -1,10 +1,10 @@ {"index":{"_id":"1"}} -{"name":"abbas","age":24,"address":[{"city":"New york city","state":"NY","moveInDate":{"dateAndTime":"19840412T090742.000Z"}},{"city":"bellevue","state":"WA","moveInDate":[{"dateAndTime":"20230503T080742.000Z"},{"dateAndTime":"20011111T040744.000Z"}]},{"city":"seattle","state":"WA","moveInDate":{"dateAndTime":"19660319T030455.000Z"}},{"city":"chicago","state":"IL","moveInDate":{"dateAndTime":"20110601T010142.000Z"}}]} +{"name":"abbas","age":24,"address":[{"city":"New york city","state":"NY","moveInDate":{"dateAndTime":"19840412T090742.000Z"},"area":300.13},{"city":"bellevue","state":"WA","moveInDate":[{"dateAndTime":"20230503T080742.000Z"},{"dateAndTime":"20011111T040744.000Z"}],"area":400.99},{"city":"seattle","state":"WA","moveInDate":{"dateAndTime":"19660319T030455.000Z"},"area":127.4},{"city":"chicago","state":"IL","moveInDate":{"dateAndTime":"20110601T010142.000Z"},"area":10.24}]} {"index":{"_id":"2"}} -{"name":"chen","age":32,"address":[{"city":"Miami","state":"Florida","moveInDate":{"dateAndTime":"19010811T040333.000Z"}},{"city":"los angeles","state":"CA","moveInDate":{"dateAndTime":"20230503T080742.000Z"}}]} +{"name":"chen","age":32,"address":[{"city":"Miami","state":"Florida","moveInDate":{"dateAndTime":"19010811T040333.000Z"},"area":1000.99},{"city":"los angeles","state":"CA","moveInDate":{"dateAndTime":"20230503T080742.000Z"},"area":9.99}]} {"index":{"_id":"3"}} -{"name":"peng","age":26,"address":[{"city":"san diego","state":"CA","moveInDate":{"dateAndTime":"20011111T040744.000Z"}},{"city":"austin","state":"TX","moveInDate":{"dateAndTime":"19770713T090441.000Z"}}]} +{"name":"peng","age":26,"address":[{"city":"san diego","state":"CA","moveInDate":{"dateAndTime":"20011111T040744.000Z"},"area":231.01},{"city":"austin","state":"TX","moveInDate":{"dateAndTime":"19770713T090441.000Z"},"area":429.79}]} {"index":{"_id":"4"}} {"name":"andy","age":19,"id":4,"address":[{"city":"houston","state":"TX","moveInDate":{"dateAndTime":"19331212T050545.000Z"}}]} {"index":{"_id":"5"}} -{"name":"david","age":25,"address":[{"city":"raleigh","state":"NC","moveInDate":{"dateAndTime":"19090617T010421.000Z"}},{"city":"charlotte","state":"SC","moveInDate":[{"dateAndTime":"20011111T040744.000Z"}]}]} +{"name":"david","age":25,"address":[{"city":"raleigh","state":"NC","moveInDate":{"dateAndTime":"19090617T010421.000Z"},"area":190.5},{"city":"charlotte","state":"SC","moveInDate":[{"dateAndTime":"20011111T040744.000Z"}]}]} diff --git a/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/4949.yml b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/4949.yml new file mode 100644 index 00000000000..977ea6dfd5a --- /dev/null +++ b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/4949.yml @@ -0,0 +1,93 @@ +setup: + - do: + query.settings: + body: + transient: + plugins.calcite.enabled: true + - do: + indices.create: + index: test + body: + mappings: + properties: + "rec": + type: nested + properties: + "inner": + type: integer + - do: + bulk: + index: test + refresh: true + body: + - '{"index": {"_id": "1"}}' + - '{"rec":{"inner": 100}}' + - '{"index": {"_id": "2"}}' + - '{"rec":{"inner": 101}}' + - '{"index": {"_id": "3"}}' + - '{"rec":{"inner": 101}}' + +--- +teardown: + - do: + query.settings: + body: + transient: + plugins.calcite.enabled : false + +--- +"nested top": + - skip: + features: + - headers + - allowed_warnings + - do: + allowed_warnings: + - 'Loading the fielddata on the _id field is deprecated and will be removed in future versions. If you require sorting or aggregating on this field you should also include the id in the body of your documents, and map this field as a keyword field that has [doc_values] enabled' + headers: + Content-Type: 'application/json' + ppl: + body: + query: source=test | top rec.inner + + - match: { total: 2 } + - match: {"schema": [{"name": "rec.inner", "type": "int"},{"name": "count", "type": "bigint"}]} + - match: {"datarows": [[101, 2], [100, 1]]} + +--- +"nested agg": + - skip: + features: + - headers + - allowed_warnings + - do: + allowed_warnings: + - 'Loading the fielddata on the _id field is deprecated and will be removed in future versions. If you require sorting or aggregating on this field you should also include the id in the body of your documents, and map this field as a keyword field that has [doc_values] enabled' + headers: + Content-Type: 'application/json' + ppl: + body: + query: source=test | stats count() by rec.inner | sort - rec.inner + + - match: { total: 2 } + - match: {"schema": [{"name": "count()", "type": "bigint"}, {"name": "rec.inner", "type": "int"}]} + - match: {"datarows": [[2, 101], [1, 100]]} + +--- +"nested dedup": + - skip: + features: + - headers + - allowed_warnings + - do: + allowed_warnings: + - 'Loading the fielddata on the _id field is deprecated and will be removed in future versions. If you require sorting or aggregating on this field you should also include the id in the body of your documents, and map this field as a keyword field that has [doc_values] enabled' + headers: + Content-Type: 'application/json' + ppl: + body: + query: source=test | dedup rec.inner + + - match: { total: 2 } + - match: {"schema": [{"name": "rec", "type": "array"}]} + - match: {"datarows": [[[{"inner": 100}]],[[{"inner": 101}]]]} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/CalciteEnumerableNestedAggregate.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/CalciteEnumerableNestedAggregate.java new file mode 100644 index 00000000000..ef569fc2989 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/CalciteEnumerableNestedAggregate.java @@ -0,0 +1,89 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.planner.physical; + +import java.util.List; +import org.apache.calcite.adapter.enumerable.AggImplementor; +import org.apache.calcite.adapter.enumerable.EnumerableAggregateBase; +import org.apache.calcite.adapter.enumerable.EnumerableConvention; +import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; +import org.apache.calcite.adapter.enumerable.RexImpTable; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.InvalidRelException; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.hint.RelHint; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.util.ImmutableBitSet; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * The enumerable aggregate physical implementation for OpenSearch nested aggregation. + * https://docs.opensearch.org/latest/aggregations/bucket/nested/ + */ +public class CalciteEnumerableNestedAggregate extends EnumerableAggregateBase + implements EnumerableRel { + + public CalciteEnumerableNestedAggregate( + RelOptCluster cluster, + RelTraitSet traitSet, + List hints, + RelNode input, + ImmutableBitSet groupSet, + @Nullable List groupSets, + List aggCalls) + throws InvalidRelException { + super(cluster, traitSet, hints, input, groupSet, groupSets, aggCalls); + assert getConvention() instanceof EnumerableConvention; + + for (AggregateCall aggCall : aggCalls) { + if (aggCall.isDistinct()) { + throw new InvalidRelException("distinct aggregation not supported"); + } + if (aggCall.distinctKeys != null) { + throw new InvalidRelException("within-distinct aggregation not supported"); + } + AggImplementor implementor2 = RexImpTable.INSTANCE.get(aggCall.getAggregation(), false); + if (implementor2 == null) { + throw new InvalidRelException("aggregation " + aggCall.getAggregation() + " not supported"); + } + } + } + + @Override + public CalciteEnumerableNestedAggregate copy( + RelTraitSet traitSet, + RelNode input, + ImmutableBitSet groupSet, + @Nullable List groupSets, + List aggCalls) { + try { + return new CalciteEnumerableNestedAggregate( + getCluster(), traitSet, getHints(), input, groupSet, groupSets, aggCalls); + } catch (InvalidRelException e) { + // Semantic error not possible. Must be a bug. Convert to + // internal error. + throw new AssertionError(e); + } + } + + @Override + public Result implement(EnumerableRelImplementor implementor, Prefer pref) { + // TODO implement an enumerable nested aggregate + throw new UnsupportedOperationException( + String.format( + "Cannot execute nested aggregation on %s since pushdown cannot be applied.", aggCalls)); + } + + @Override + public @Nullable RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + return super.computeSelfCost(planner, mq).multiplyBy(.9); + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/DedupPushdownRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/DedupPushdownRule.java index 38059099662..57e8045f2fe 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/DedupPushdownRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/DedupPushdownRule.java @@ -27,9 +27,11 @@ import org.apache.logging.log4j.Logger; import org.immutables.value.Value; import org.opensearch.sql.calcite.plan.OpenSearchRuleConfig; +import org.opensearch.sql.calcite.utils.PPLHintUtils; import org.opensearch.sql.calcite.utils.PlanUtils; import org.opensearch.sql.opensearch.storage.scan.AbstractCalciteIndexScan; import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan; +import org.opensearch.sql.utils.Utils; @Value.Enclosing public class DedupPushdownRule extends InterruptibleRelRule @@ -66,9 +68,13 @@ protected void apply( List dedupColumns = windows.get(0).partitionKeys; if (dedupColumns.stream() .filter(rex -> rex.isA(SqlKind.INPUT_REF)) - .anyMatch(rex -> rex.getType().getSqlTypeName() == SqlTypeName.MAP)) { - LOG.debug("Cannot pushdown the dedup since the dedup fields contains MAP type"); - // TODO https://github.com/opensearch-project/sql/issues/4564 + .anyMatch( + rex -> + rex.getType().getSqlTypeName() == SqlTypeName.MAP + || rex.getType().getSqlTypeName() == SqlTypeName.ARRAY)) { + // TODO https://github.com/opensearch-project/sql/issues/5006 + LOG.debug("Cannot pushdown the dedup since the dedup fields contains MAP/ARRAY type"); + // fallback to non-pushdown return; } // must be row_number <= number @@ -101,6 +107,14 @@ protected void apply( return; } } + if (targetProjections.stream() + .anyMatch( + pair -> + Utils.resolveNestedPath(pair.getValue(), scan.getOsIndex().getFieldTypes()) + != null)) { + // fallback to non-pushdown if the dedup columns contain nested fields. + return; + } for (Pair project : projectWithWindow.getNamedProjects()) { if (!project.getKey().isA(SqlKind.ROW_NUMBER) && !targetProjections.contains(project)) { targetProjections.add(project); @@ -122,7 +136,7 @@ protected void apply( relBuilder.groupKey(relBuilder.fields(newGroupByList)), relBuilder.literalAgg(dedupNumer)); // add bucket_nullable = false hint - PlanUtils.addIgnoreNullBucketHintToAggregate(relBuilder); + PPLHintUtils.addIgnoreNullBucketHintToAggregate(relBuilder); // peek the aggregate after hint being added LogicalAggregate aggregate = (LogicalAggregate) relBuilder.build(); assert aggregate.getGroupSet().asList().equals(newGroupByList) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/EnumerableNestedAggregateRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/EnumerableNestedAggregateRule.java new file mode 100644 index 00000000000..3dee39d9b13 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/EnumerableNestedAggregateRule.java @@ -0,0 +1,65 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.planner.rules; + +import org.apache.calcite.adapter.enumerable.EnumerableConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.InvalidRelException; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.logical.LogicalAggregate; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.opensearch.sql.calcite.utils.PPLHintUtils; +import org.opensearch.sql.opensearch.planner.physical.CalciteEnumerableNestedAggregate; + +/** Rule to convert {@link LogicalAggregate} to {@link CalciteEnumerableNestedAggregate}. */ +public class EnumerableNestedAggregateRule extends ConverterRule { + private static final Logger LOG = LogManager.getLogger(); + + /** Default configuration. */ + public static final Config DEFAULT_CONFIG = + Config.INSTANCE + .withConversion( + LogicalAggregate.class, + Convention.NONE, + EnumerableConvention.INSTANCE, + "EnumerableNestedAggregateRule") + .withRuleFactory(EnumerableNestedAggregateRule::new); + + /** Called from the Config. */ + protected EnumerableNestedAggregateRule(Config config) { + super(config); + } + + @Override + public @Nullable RelNode convert(RelNode rel) { + final Aggregate agg = (Aggregate) rel; + if (PPLHintUtils.hasNestedAggCall(agg)) { + final RelTraitSet traitSet = + rel.getCluster().traitSet().replace(EnumerableConvention.INSTANCE); + try { + return new CalciteEnumerableNestedAggregate( + rel.getCluster(), + traitSet, + agg.getHints(), + convert(agg.getInput(), traitSet), + agg.getGroupSet(), + agg.getGroupSets(), + agg.getAggCallList()); + } catch (InvalidRelException e) { + if (LOG.isDebugEnabled()) { + LOG.debug(e.toString()); + } + return null; + } + } + return null; + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/OpenSearchIndexRules.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/OpenSearchIndexRules.java index 0fa726072f0..db65bb51a80 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/OpenSearchIndexRules.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/OpenSearchIndexRules.java @@ -10,6 +10,21 @@ import org.apache.calcite.plan.RelOptRule; public class OpenSearchIndexRules { + + private static final RelOptRule INDEX_SCAN_RULE = EnumerableIndexScanRule.DEFAULT_CONFIG.toRule(); + private static final RelOptRule SYSTEM_INDEX_SCAN_RULE = + EnumerableSystemIndexScanRule.DEFAULT_CONFIG.toRule(); + private static final RelOptRule NESTED_AGGREGATE_RULE = + EnumerableNestedAggregateRule.DEFAULT_CONFIG.toRule(); + // Rule that always pushes down relevance functions regardless of pushdown settings + private static final RelevanceFunctionPushdownRule RELEVANCE_FUNCTION_RULE = + RelevanceFunctionPushdownRule.Config.DEFAULT.toRule(); + + /** The rules will apply whatever the pushdown setting is. */ + public static final List OPEN_SEARCH_NON_PUSHDOWN_RULES = + ImmutableList.of( + INDEX_SCAN_RULE, SYSTEM_INDEX_SCAN_RULE, NESTED_AGGREGATE_RULE, RELEVANCE_FUNCTION_RULE); + private static final ProjectIndexScanRule PROJECT_INDEX_SCAN = ProjectIndexScanRule.Config.DEFAULT.toRule(); private static final FilterIndexScanRule FILTER_INDEX_SCAN = @@ -42,11 +57,8 @@ public class OpenSearchIndexRules { private static final EnumerableTopKMergeRule ENUMERABLE_TOP_K_MERGE_RULE = EnumerableTopKMergeRule.Config.DEFAULT.toRule(); - // Rule that always pushes down relevance functions regardless of pushdown settings - public static final RelevanceFunctionPushdownRule RELEVANCE_FUNCTION_PUSHDOWN = - RelevanceFunctionPushdownRule.Config.DEFAULT.toRule(); - - public static final List OPEN_SEARCH_INDEX_SCAN_RULES = + /** The rules will apply only when the pushdown is enabled. */ + public static final List OPEN_SEARCH_PUSHDOWN_RULES = ImmutableList.of( PROJECT_INDEX_SCAN, FILTER_INDEX_SCAN, diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java index 924b2ad1918..247f40b3733 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java @@ -40,6 +40,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Function; @@ -97,6 +98,7 @@ import org.opensearch.sql.opensearch.response.agg.StatsParser; import org.opensearch.sql.opensearch.response.agg.TopHitsParser; import org.opensearch.sql.opensearch.storage.script.aggregation.dsl.CompositeAggregationBuilder; +import org.opensearch.sql.utils.Utils; /** * Aggregate analyzer. Convert aggregate to AggregationBuilder {@link AggregationBuilder} and its @@ -236,13 +238,13 @@ public static Pair, OpenSearchAggregationResponseParser } else { // Used to track the current sub-builder as analysis progresses Builder subBuilder = newMetricBuilder; - // Push auto date span & case in group-by list into nested aggregations + // Push auto date span & case in group-by list into structured aggregations List> groupNameAndIndexList = IntStream.range(0, groupList.size()) .mapToObj(i -> Pair.of(outputFields.get(i), groupList.get(i))) .toList(); Pair, AggregationBuilder> aggPushedAndAggBuilder = - createNestedAggregation(groupNameAndIndexList, project, subBuilder, helper); + createStructuredAggregation(groupNameAndIndexList, project, subBuilder, helper); Set aggPushed = aggPushedAndAggBuilder.getLeft(); AggregationBuilder pushedAggBuilder = aggPushedAndAggBuilder.getRight(); // The group-by list after removing pushed aggregations @@ -255,7 +257,7 @@ public static Pair, OpenSearchAggregationResponseParser } // No composite aggregation at top-level -- auto date span & case in group-by list are - // pushed into nested aggregations: + // pushed into structured aggregations: // - stats avg() by range_field // - stats count() by auto_date_span // - stats count() by ...auto_date_spans, ...range_fields @@ -288,6 +290,23 @@ public static Pair, OpenSearchAggregationResponseParser if (subBuilder != null) { compositeBuilder.subAggregations(subBuilder); } + List nestedPathFromBuckets = + groupNameAndIndexList.stream() + .map(b -> Utils.resolveNestedPath(b.getLeft(), helper.fieldTypes)) + .toList(); + boolean validNestedAgg = + nestedPathFromBuckets.stream().noneMatch(Objects::isNull) + && nestedPathFromBuckets.stream().distinct().count() == 1; + if (validNestedAgg) { + String nestedPath = nestedPathFromBuckets.getFirst(); + AggregationBuilder nestedAggBuilder = + AggregationBuilders.nested( + String.format("nested_%s", compositeBuilder.getName()), nestedPath) + .subAggregation(compositeBuilder); + return Pair.of( + Collections.singletonList(nestedAggBuilder), + new BucketAggregationParser(metricParsers, countAggNames)); + } return Pair.of( Collections.singletonList(compositeBuilder), new BucketAggregationParser(metricParsers, countAggNames)); @@ -345,7 +364,7 @@ private static boolean supportCountFiled( } private static Pair> processAggregateCalls( - List aggFieldNames, + List aggNames, List aggCalls, Project project, AggregateAnalyzer.AggregateBuilderHelper helper) @@ -357,12 +376,23 @@ private static Pair> processAggregateCalls( for (int i = 0; i < aggCalls.size(); i++) { AggregateCall aggCall = aggCalls.get(i); List> args = convertAggArgThroughProject(aggCall, project, helper); - String aggFieldName = aggFieldNames.get(i); + String aggName = aggNames.get(i); Pair builderAndParser = - createAggregationBuilderAndParser(aggCall, args, aggFieldName, helper); - builderAndParser = aggFilterAnalyzer.analyze(builderAndParser, aggCall, aggFieldName); - metricBuilder.addAggregator(builderAndParser.getLeft()); + createAggregationBuilderAndParser(aggCall, args, aggName, helper); + builderAndParser = aggFilterAnalyzer.analyze(builderAndParser, aggCall, aggName); + // Nested aggregation (https://docs.opensearch.org/docs/latest/aggregations/bucket/nested/) + String nestedPath = + args.isEmpty() + ? null + : Utils.resolveNestedPath(args.getFirst().getRight(), helper.fieldTypes); + if (nestedPath != null) { + metricBuilder.addAggregator( + AggregationBuilders.nested(String.format("nested_%s", aggCall.getName()), nestedPath) + .subAggregation(builderAndParser.getLeft())); + } else { + metricBuilder.addAggregator(builderAndParser.getLeft()); + } metricParserList.add(builderAndParser.getRight()); } return Pair.of(metricBuilder, metricParserList); @@ -402,19 +432,19 @@ private static List> convertAggArgThroughProject( private static Pair createAggregationBuilderAndParser( AggregateCall aggCall, List> args, - String aggFieldName, + String aggName, AggregateAnalyzer.AggregateBuilderHelper helper) { if (aggCall.isDistinct()) { - return createDistinctAggregation(aggCall, args, aggFieldName, helper); + return createDistinctAggregation(aggCall, args, aggName, helper); } else { - return createRegularAggregation(aggCall, args, aggFieldName, helper); + return createRegularAggregation(aggCall, args, aggName, helper); } } private static Pair createDistinctAggregation( AggregateCall aggCall, List> args, - String aggFieldName, + String aggName, AggregateBuilderHelper helper) { return switch (aggCall.getAggregation().kind) { @@ -422,8 +452,8 @@ private static Pair createDistinctAggregation( Pair.of( helper.build( !args.isEmpty() ? args.getFirst().getKey() : null, - AggregationBuilders.cardinality(aggFieldName)), - new SingleValueParser(aggFieldName)); + AggregationBuilders.cardinality(aggName)), + new SingleValueParser(aggName)); default -> throw new AggregateAnalyzer.AggregateAnalyzerException( String.format("unsupported distinct aggregator %s", aggCall.getAggregation())); @@ -433,36 +463,36 @@ private static Pair createDistinctAggregation( private static Pair createRegularAggregation( AggregateCall aggCall, List> args, - String aggFieldName, + String aggName, AggregateBuilderHelper helper) { return switch (aggCall.getAggregation().kind) { case AVG -> Pair.of( - helper.build(args.getFirst().getKey(), AggregationBuilders.avg(aggFieldName)), - new SingleValueParser(aggFieldName)); + helper.build(args.getFirst().getKey(), AggregationBuilders.avg(aggName)), + new SingleValueParser(aggName)); // 1. Only case SUM, skip SUM0 / COUNT since calling avg() in DSL should be faster. // 2. To align with databases, SUM0 is not preferred now. case SUM -> Pair.of( - helper.build(args.getFirst().getKey(), AggregationBuilders.sum(aggFieldName)), - new SingleValueParser(aggFieldName)); + helper.build(args.getFirst().getKey(), AggregationBuilders.sum(aggName)), + new SingleValueParser(aggName)); case COUNT -> Pair.of( helper.build( !args.isEmpty() ? args.getFirst().getKey() : null, - AggregationBuilders.count(aggFieldName)), - new SingleValueParser(aggFieldName)); + AggregationBuilders.count(aggName)), + new SingleValueParser(aggName)); case MIN -> { ExprType fieldType = OpenSearchTypeFactory.convertRelDataTypeToExprType(args.getFirst().getKey().getType()); if (supportsMaxMinAggregation(fieldType)) { yield Pair.of( - helper.build(args.getFirst().getKey(), AggregationBuilders.min(aggFieldName)), - new SingleValueParser(aggFieldName)); + helper.build(args.getFirst().getKey(), AggregationBuilders.min(aggName)), + new SingleValueParser(aggName)); } else { yield Pair.of( - AggregationBuilders.topHits(aggFieldName) + AggregationBuilders.topHits(aggName) .fetchField( helper.inferNamedField(args.getFirst().getKey()).getReferenceForTermQuery()) .size(1) @@ -470,7 +500,7 @@ private static Pair createRegularAggregation( .sort( helper.inferNamedField(args.getFirst().getKey()).getReferenceForTermQuery(), SortOrder.ASC), - new TopHitsParser(aggFieldName, true, false)); + new TopHitsParser(aggName, true, false)); } } case MAX -> { @@ -478,11 +508,11 @@ private static Pair createRegularAggregation( OpenSearchTypeFactory.convertRelDataTypeToExprType(args.getFirst().getKey().getType()); if (supportsMaxMinAggregation(fieldType)) { yield Pair.of( - helper.build(args.getFirst().getKey(), AggregationBuilders.max(aggFieldName)), - new SingleValueParser(aggFieldName)); + helper.build(args.getFirst().getKey(), AggregationBuilders.max(aggName)), + new SingleValueParser(aggName)); } else { yield Pair.of( - AggregationBuilders.topHits(aggFieldName) + AggregationBuilders.topHits(aggName) .fetchField( helper.inferNamedField(args.getFirst().getKey()).getReferenceForTermQuery()) .size(1) @@ -490,32 +520,28 @@ private static Pair createRegularAggregation( .sort( helper.inferNamedField(args.getFirst().getKey()).getReferenceForTermQuery(), SortOrder.DESC), - new TopHitsParser(aggFieldName, true, false)); + new TopHitsParser(aggName, true, false)); } } case VAR_SAMP -> Pair.of( - helper.build( - args.getFirst().getKey(), AggregationBuilders.extendedStats(aggFieldName)), - new StatsParser(ExtendedStats::getVarianceSampling, aggFieldName)); + helper.build(args.getFirst().getKey(), AggregationBuilders.extendedStats(aggName)), + new StatsParser(ExtendedStats::getVarianceSampling, aggName)); case VAR_POP -> Pair.of( - helper.build( - args.getFirst().getKey(), AggregationBuilders.extendedStats(aggFieldName)), - new StatsParser(ExtendedStats::getVariancePopulation, aggFieldName)); + helper.build(args.getFirst().getKey(), AggregationBuilders.extendedStats(aggName)), + new StatsParser(ExtendedStats::getVariancePopulation, aggName)); case STDDEV_SAMP -> Pair.of( - helper.build( - args.getFirst().getKey(), AggregationBuilders.extendedStats(aggFieldName)), - new StatsParser(ExtendedStats::getStdDeviationSampling, aggFieldName)); + helper.build(args.getFirst().getKey(), AggregationBuilders.extendedStats(aggName)), + new StatsParser(ExtendedStats::getStdDeviationSampling, aggName)); case STDDEV_POP -> Pair.of( - helper.build( - args.getFirst().getKey(), AggregationBuilders.extendedStats(aggFieldName)), - new StatsParser(ExtendedStats::getStdDeviationPopulation, aggFieldName)); + helper.build(args.getFirst().getKey(), AggregationBuilders.extendedStats(aggName)), + new StatsParser(ExtendedStats::getStdDeviationPopulation, aggName)); case ARG_MAX -> Pair.of( - AggregationBuilders.topHits(aggFieldName) + AggregationBuilders.topHits(aggName) .fetchField( helper.inferNamedField(args.getFirst().getKey()).getReferenceForTermQuery()) .size(1) @@ -523,10 +549,10 @@ private static Pair createRegularAggregation( .sort( helper.inferNamedField(args.get(1).getKey()).getReferenceForTermQuery(), org.opensearch.search.sort.SortOrder.DESC), - new ArgMaxMinParser(aggFieldName)); + new ArgMaxMinParser(aggName)); case ARG_MIN -> Pair.of( - AggregationBuilders.topHits(aggFieldName) + AggregationBuilders.topHits(aggName) .fetchField( helper.inferNamedField(args.getFirst().getKey()).getReferenceForTermQuery()) .size(1) @@ -534,33 +560,33 @@ private static Pair createRegularAggregation( .sort( helper.inferNamedField(args.get(1).getKey()).getReferenceForTermQuery(), org.opensearch.search.sort.SortOrder.ASC), - new ArgMaxMinParser(aggFieldName)); + new ArgMaxMinParser(aggName)); case OTHER_FUNCTION -> { BuiltinFunctionName functionName = BuiltinFunctionName.ofAggregation(aggCall.getAggregation().getName()).get(); yield switch (functionName) { case TAKE -> Pair.of( - AggregationBuilders.topHits(aggFieldName) + AggregationBuilders.topHits(aggName) .fetchField( helper .inferNamedField(args.getFirst().getKey()) .getReferenceForTermQuery()) .size(helper.inferValue(args.getLast().getKey(), Integer.class)) .from(0), - new TopHitsParser(aggFieldName, false, true)); + new TopHitsParser(aggName, false, true)); case FIRST -> { TopHitsAggregationBuilder firstBuilder = - AggregationBuilders.topHits(aggFieldName).size(1).from(0); + AggregationBuilders.topHits(aggName).size(1).from(0); if (!args.isEmpty()) { firstBuilder.fetchField( helper.inferNamedField(args.getFirst().getKey()).getReferenceForTermQuery()); } - yield Pair.of(firstBuilder, new TopHitsParser(aggFieldName, true, false)); + yield Pair.of(firstBuilder, new TopHitsParser(aggName, true, false)); } case LAST -> { TopHitsAggregationBuilder lastBuilder = - AggregationBuilders.topHits(aggFieldName) + AggregationBuilders.topHits(aggName) .size(1) .from(0) .sort("_doc", org.opensearch.search.sort.SortOrder.DESC); @@ -568,25 +594,25 @@ yield switch (functionName) { lastBuilder.fetchField( helper.inferNamedField(args.getFirst().getKey()).getReferenceForTermQuery()); } - yield Pair.of(lastBuilder, new TopHitsParser(aggFieldName, true, false)); + yield Pair.of(lastBuilder, new TopHitsParser(aggName, true, false)); } case PERCENTILE_APPROX -> { PercentilesAggregationBuilder aggBuilder = helper - .build(args.getFirst().getKey(), AggregationBuilders.percentiles(aggFieldName)) + .build(args.getFirst().getKey(), AggregationBuilders.percentiles(aggName)) .percentiles(helper.inferValue(args.get(1).getKey(), Double.class)); /* See {@link PercentileApproxFunction}, PERCENTILE_APPROX accepts args of [FIELD, PERCENTILE, TYPE, COMPRESSION(optional)] */ if (args.size() > 3) { aggBuilder.compression(helper.inferValue(args.getLast().getKey(), Double.class)); } - yield Pair.of(aggBuilder, new SinglePercentileParser(aggFieldName)); + yield Pair.of(aggBuilder, new SinglePercentileParser(aggName)); } case DISTINCT_COUNT_APPROX -> Pair.of( helper.build( !args.isEmpty() ? args.getFirst().getKey() : null, - AggregationBuilders.cardinality(aggFieldName)), - new SingleValueParser(aggFieldName)); + AggregationBuilders.cardinality(aggName)), + new SingleValueParser(aggName)); default -> throw new AggregateAnalyzer.AggregateAnalyzerException( String.format("Unsupported push-down aggregator %s", aggCall.getAggregation())); @@ -601,7 +627,7 @@ yield switch (functionName) { Integer dedupNumber = literal.getValueAs(Integer.class); // Disable fetchSource since TopHitsParser only parses fetchField currently. TopHitsAggregationBuilder topHitsAggregationBuilder = - AggregationBuilders.topHits(aggFieldName).from(0).size(dedupNumber); + AggregationBuilders.topHits(aggName).from(0).size(dedupNumber); List sources = new ArrayList<>(); List scripts = new ArrayList<>(); args.forEach( @@ -622,7 +648,7 @@ yield switch (functionName) { topHitsAggregationBuilder.fetchSource( sources.stream().distinct().toArray(String[]::new), new String[0]); topHitsAggregationBuilder.scriptFields(scripts); - yield Pair.of(topHitsAggregationBuilder, new TopHitsParser(aggFieldName, false, false)); + yield Pair.of(topHitsAggregationBuilder, new TopHitsParser(aggName, false, false)); } default -> throw new AggregateAnalyzer.AggregateAnalyzerException( @@ -656,8 +682,8 @@ private static List> createCompositeBuckets( } /** - * Creates nested bucket aggregations for expressions that are not qualified as value sources for - * composite aggregations. + * Creates structured bucket aggregations for expressions that are not qualified as value sources + * for composite aggregations. * *

This method processes a list of group by expressions and identifies those that cannot be * used as value sources in composite aggregations but can be pushed down as sub-aggregations, @@ -667,7 +693,7 @@ private static List> createCompositeBuckets( * *

    * AutoDateHistogram | RangeAggregation
-   *   └── AutoDateHistogram | RangeAggregation (nested)
+   *   └── AutoDateHistogram | RangeAggregation (structured)
    *       └── ... (more composite-incompatible aggregations)
    *           └── Metric Aggregation (at the bottom)
    * 
@@ -683,7 +709,7 @@ private static List> createCompositeBuckets( *
  • The root aggregation builder, or null if no such expressions were found * */ - private static Pair, AggregationBuilder> createNestedAggregation( + private static Pair, AggregationBuilder> createStructuredAggregation( List> groupNameAndIndexList, @Nullable Project project, Builder metricBuilder, diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateFilterAnalyzer.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateFilterAnalyzer.java index 11fa9117693..9fbe0293522 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateFilterAnalyzer.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateFilterAnalyzer.java @@ -31,13 +31,13 @@ public class AggregateFilterAnalyzer { * * @param aggResult the base aggregation and parser to potentially wrap with filter * @param aggCall the aggregate call which may contain filter information - * @param aggFieldName name for the filtered aggregation + * @param aggName name for the filtered aggregation * @return wrapped aggregation with filter if present, otherwise the original result * @throws PredicateAnalyzer.ExpressionNotAnalyzableException if filter condition cannot be * analyzed */ public Pair analyze( - Pair aggResult, AggregateCall aggCall, String aggFieldName) + Pair aggResult, AggregateCall aggCall, String aggName) throws PredicateAnalyzer.ExpressionNotAnalyzableException { if (project == null || !aggCall.hasFilter()) { return aggResult; @@ -45,8 +45,8 @@ public Pair analyze( QueryExpression queryExpression = analyzeAggregateFilter(aggCall); return Pair.of( - buildFilterAggregation(aggResult.getLeft(), aggFieldName, queryExpression), - buildFilterParser(aggResult.getRight(), aggFieldName)); + buildFilterAggregation(aggResult.getLeft(), aggName, queryExpression), + buildFilterParser(aggResult.getRight(), aggName)); } private QueryExpression analyzeAggregateFilter(AggregateCall aggCall) @@ -61,12 +61,12 @@ private QueryExpression analyzeAggregateFilter(AggregateCall aggCall) } private AggregationBuilder buildFilterAggregation( - AggregationBuilder aggBuilder, String aggFieldName, QueryExpression queryExpression) { - return AggregationBuilders.filter(aggFieldName, queryExpression.builder()) + AggregationBuilder aggBuilder, String aggName, QueryExpression queryExpression) { + return AggregationBuilders.filter(aggName, queryExpression.builder()) .subAggregation(aggBuilder); } - private MetricParser buildFilterParser(MetricParser aggParser, String aggFieldName) { - return FilterParser.builder().name(aggFieldName).metricsParser(aggParser).build(); + private MetricParser buildFilterParser(MetricParser aggParser, String aggName) { + return FilterParser.builder().name(aggName).metricsParser(aggParser).build(); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java index 90d81cd02b6..87922bf4262 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java @@ -34,6 +34,7 @@ import static org.opensearch.index.query.QueryBuilders.boolQuery; import static org.opensearch.index.query.QueryBuilders.existsQuery; import static org.opensearch.index.query.QueryBuilders.matchQuery; +import static org.opensearch.index.query.QueryBuilders.nestedQuery; import static org.opensearch.index.query.QueryBuilders.rangeQuery; import static org.opensearch.index.query.QueryBuilders.regexpQuery; import static org.opensearch.index.query.QueryBuilders.termQuery; @@ -57,6 +58,7 @@ import java.util.Map; import java.util.Set; import java.util.function.Supplier; +import javax.annotation.Nullable; import lombok.Getter; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.rel.RelNode; @@ -78,6 +80,7 @@ import org.apache.calcite.util.NlsString; import org.apache.calcite.util.RangeSets; import org.apache.calcite.util.Sarg; +import org.apache.lucene.search.join.ScoreMode; import org.opensearch.index.mapper.DateFieldMapper; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.QueryBuilder; @@ -108,6 +111,7 @@ import org.opensearch.sql.opensearch.storage.serde.RelJsonSerializer; import org.opensearch.sql.opensearch.storage.serde.ScriptParameterHelper; import org.opensearch.sql.opensearch.storage.serde.SerializationWrapper; +import org.opensearch.sql.utils.Utils; /** * Query predicate analyzer. Uses visitor pattern to traverse existing expression and convert it to @@ -1201,6 +1205,9 @@ public QueryBuilder builder() { if (builder == null) { throw new IllegalStateException("Builder was not initialized"); } + if (rel != null && rel.nestedPath != null) { + return nestedQuery(rel.nestedPath, builder, ScoreMode.None); + } return builder; } @@ -1570,21 +1577,19 @@ public static final class NamedFieldExpression implements TerminalExpression { private final String name; private final ExprType type; + private final String nestedPath; public NamedFieldExpression( int refIndex, List schema, Map filedTypes) { this.name = refIndex >= schema.size() ? null : schema.get(refIndex); this.type = filedTypes.get(name); - } - - public NamedFieldExpression(String name, ExprType type) { - this.name = name; - this.type = type; + this.nestedPath = Utils.resolveNestedPath(this.name, filedTypes); } private NamedFieldExpression() { this.name = null; this.type = null; + this.nestedPath = null; } private NamedFieldExpression( @@ -1592,11 +1597,17 @@ private NamedFieldExpression( this.name = (ref == null || ref.getIndex() >= schema.size()) ? null : schema.get(ref.getIndex()); this.type = filedTypes.get(name); + this.nestedPath = Utils.resolveNestedPath(this.name, filedTypes); } private NamedFieldExpression(RexLiteral literal) { this.name = literal == null ? null : RexLiteral.stringValue(literal); this.type = null; + this.nestedPath = null; + } + + public @Nullable String getNestedPath() { + return nestedPath; } public String getRootName() { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/BucketAggregationParser.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/BucketAggregationParser.java index 761aef4fd98..3d8e07e3c5e 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/BucketAggregationParser.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/BucketAggregationParser.java @@ -21,6 +21,7 @@ import org.opensearch.search.aggregations.bucket.MultiBucketsAggregation; import org.opensearch.search.aggregations.bucket.composite.CompositeAggregation; import org.opensearch.search.aggregations.bucket.histogram.InternalAutoDateHistogram; +import org.opensearch.search.aggregations.bucket.nested.InternalNested; import org.opensearch.search.aggregations.bucket.range.Range; import org.opensearch.search.aggregations.bucket.terms.InternalMultiTerms; @@ -50,7 +51,12 @@ public BucketAggregationParser( @Override public List> parse(Aggregations aggregations) { - Aggregation agg = aggregations.asList().getFirst(); + Aggregation agg; + if (aggregations.asList().getFirst() instanceof InternalNested) { + agg = ((InternalNested) aggregations.asList().getFirst()).getAggregations().iterator().next(); + } else { + agg = aggregations.asList().getFirst(); + } return ((MultiBucketsAggregation) agg) .getBuckets().stream() .map(b -> parseBucket(b, agg.getName())) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/MetricParserHelper.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/MetricParserHelper.java index cc85df5bf7b..6e741bdf0b1 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/MetricParserHelper.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/MetricParserHelper.java @@ -15,6 +15,7 @@ import lombok.RequiredArgsConstructor; import org.opensearch.search.aggregations.Aggregation; import org.opensearch.search.aggregations.Aggregations; +import org.opensearch.search.aggregations.bucket.nested.Nested; import org.opensearch.sql.common.utils.StringUtils; /** Parse multiple metrics in one bucket. */ @@ -50,6 +51,9 @@ public List> parse(Aggregations aggregations) { List> resultMapList = new ArrayList<>(); Map mergeMap = new LinkedHashMap<>(); for (Aggregation aggregation : aggregations) { + if (aggregation instanceof Nested) { + aggregation = ((Nested) aggregation).getAggregations().asList().getFirst(); + } MetricParser parser = metricParserMap.get(aggregation.getName()); if (parser == null) { throw new RuntimeException( diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/error/ErrorMessageFactory.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/error/ErrorMessageFactory.java index 901bfc30c8d..8617f264f06 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/error/ErrorMessageFactory.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/error/ErrorMessageFactory.java @@ -25,7 +25,7 @@ public static ErrorMessage createErrorMessage(Throwable e, int status) { OpenSearchException exception = (OpenSearchException) cause; return new OpenSearchErrorMessage(exception, exception.status().getStatus()); } - return new ErrorMessage(e, status); + return new ErrorMessage(cause, status); } protected static Throwable unwrapCause(Throwable t) { @@ -34,6 +34,9 @@ protected static Throwable unwrapCause(Throwable t) { return result; } if (result.getCause() == null) { + if (result.getSuppressed().length > 0) { + return result.getSuppressed()[0]; + } return result; } result = unwrapCause(result.getCause()); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java index 105fc14ea97..e50e1d4fc85 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java @@ -6,6 +6,7 @@ package org.opensearch.sql.opensearch.storage.scan; import com.google.common.collect.ImmutableList; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -39,14 +40,13 @@ import org.apache.logging.log4j.Logger; import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; -import org.opensearch.sql.ast.expression.Argument; import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory; +import org.opensearch.sql.calcite.utils.PPLHintUtils; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.opensearch.data.type.OpenSearchDataType; import org.opensearch.sql.opensearch.data.type.OpenSearchTextType; -import org.opensearch.sql.opensearch.planner.rules.EnumerableIndexScanRule; import org.opensearch.sql.opensearch.planner.rules.OpenSearchIndexRules; import org.opensearch.sql.opensearch.request.AggregateAnalyzer; import org.opensearch.sql.opensearch.request.PredicateAnalyzer; @@ -63,6 +63,7 @@ import org.opensearch.sql.opensearch.storage.scan.context.PushDownContext; import org.opensearch.sql.opensearch.storage.scan.context.PushDownType; import org.opensearch.sql.opensearch.storage.scan.context.RareTopDigest; +import org.opensearch.sql.utils.Utils; /** The logical relational operator representing a scan of an OpenSearchIndex type. */ @Getter @@ -126,22 +127,20 @@ public CalciteLogicalIndexScan copyWithNewTraitSet(RelTraitSet traitSet) { @Override public void register(RelOptPlanner planner) { super.register(planner); - planner.addRule(EnumerableIndexScanRule.DEFAULT_CONFIG.toRule()); + for (RelOptRule rule : OpenSearchIndexRules.OPEN_SEARCH_NON_PUSHDOWN_RULES) { + planner.addRule(rule); + } if ((Boolean) osIndex.getSettings().getSettingValue(Settings.Key.CALCITE_PUSHDOWN_ENABLED)) { - // When pushdown is enabled, use normal rules (they handle everything including relevance - // functions) - for (RelOptRule rule : OpenSearchIndexRules.OPEN_SEARCH_INDEX_SCAN_RULES) { + for (RelOptRule rule : OpenSearchIndexRules.OPEN_SEARCH_PUSHDOWN_RULES) { planner.addRule(rule); } - } else { - planner.addRule(OpenSearchIndexRules.RELEVANCE_FUNCTION_PUSHDOWN); } } public AbstractRelNode pushDownFilter(Filter filter) { try { RelDataType rowType = this.getRowType(); - List schema = this.getRowType().getFieldNames(); + List schema = buildSchema(); Map fieldTypes = this.osIndex.getAllFieldTypes().entrySet().stream() .filter(entry -> schema.contains(entry.getKey())) @@ -179,6 +178,25 @@ public AbstractRelNode pushDownFilter(Filter filter) { return null; } + /** + * Build schema for the current scan. Schema is the combination of all index fields and nested + * fields in index fields. + * + * @return All current outputs fields plus nested paths. + */ + private List buildSchema() { + List schema = new ArrayList<>(this.getRowType().getFieldNames()); + // Add nested paths to schema if it has + List nestedPaths = + schema.stream() + .map(field -> Utils.resolveNestedPath(field, this.osIndex.getAllFieldTypes())) + .filter(Objects::nonNull) + .distinct() + .toList(); + schema.addAll(nestedPaths); + return schema; + } + private static RexNode constructCondition(List conditions, RexBuilder rexBuilder) { return conditions.size() > 1 ? rexBuilder.makeCall(SqlStdOperatorTable.AND, conditions) @@ -352,20 +370,25 @@ public AbstractRelNode pushDownAggregate(Aggregate aggregate, @Nullable Project osIndex, aggregate.getRowType(), pushDownContext.cloneForAggregate(aggregate, project)); - List schema = this.getRowType().getFieldNames(); + List schema = buildSchema(); Map fieldTypes = this.osIndex.getAllFieldTypes().entrySet().stream() .filter(entry -> schema.contains(entry.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); List outputFields = aggregate.getRowType().getFieldNames(); + List bucketNames = outputFields.subList(0, aggregate.getGroupSet().cardinality()); + if (bucketNames.stream() + .map(b -> fieldTypes.get(b)) + .filter(Objects::nonNull) + .anyMatch(expr -> expr.getOriginalType() == ExprCoreType.ARRAY)) { + // TODO https://github.com/opensearch-project/sql/issues/5006 + if (LOG.isDebugEnabled()) { + LOG.debug("Cannot pushdown the aggregate due to bucket contains array (nested) type"); + } + return null; + } int queryBucketSize = osIndex.getQueryBucketSize(); - boolean bucketNullable = - Boolean.parseBoolean( - aggregate.getHints().stream() - .filter(hits -> hits.hintName.equals("stats_args")) - .map(hint -> hint.kvOptions.getOrDefault(Argument.BUCKET_NULLABLE, "true")) - .findFirst() - .orElseGet(() -> "true")); + boolean bucketNullable = !PPLHintUtils.ignoreNullBucket(aggregate); AggregateAnalyzer.AggregateBuilderHelper helper = new AggregateAnalyzer.AggregateBuilderHelper( getRowType(), fieldTypes, getCluster(), bucketNullable, queryBucketSize); @@ -381,10 +404,7 @@ public AbstractRelNode pushDownAggregate(Aggregate aggregate, @Nullable Project OpenSearchTypeFactory.convertRelDataTypeToExprType( field.getType())))); AggPushDownAction action = - new AggPushDownAction( - builderAndParser, - extendedTypeMapping, - outputFields.subList(0, aggregate.getGroupSet().cardinality())); + new AggPushDownAction(builderAndParser, extendedTypeMapping, bucketNames); newScan.pushDownContext.add(PushDownType.AGGREGATION, aggregate, action); return newScan; } catch (Exception e) { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/AggPushDownAction.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/AggPushDownAction.java index 81595cc56fd..ba827cbc6e9 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/AggPushDownAction.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/AggPushDownAction.java @@ -29,6 +29,7 @@ import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.opensearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder; import org.opensearch.search.aggregations.bucket.missing.MissingOrder; +import org.opensearch.search.aggregations.bucket.nested.NestedAggregationBuilder; import org.opensearch.search.aggregations.bucket.terms.MultiTermsAggregationBuilder; import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.opensearch.search.aggregations.support.MultiTermsValuesSourceConfig; @@ -63,6 +64,9 @@ public AggPushDownAction( } private static int getScriptCount(AggregationBuilder aggBuilder) { + if (aggBuilder instanceof NestedAggregationBuilder) { + aggBuilder = aggBuilder.getSubAggregations().iterator().next(); + } if (aggBuilder instanceof ValuesSourceAggregationBuilder && ((ValuesSourceAggregationBuilder) aggBuilder).script() != null) return 1; if (aggBuilder instanceof CompositeAggregationBuilder) { @@ -110,7 +114,14 @@ private String multiTermsBucketNameAsString(CompositeAggregationBuilder composit public void rePushDownSortAggMeasure( List collations, List fieldNames) { if (builderAndParser.getLeft().isEmpty()) return; - if (builderAndParser.getLeft().getFirst() instanceof CompositeAggregationBuilder composite) { + AggregationBuilder original = builderAndParser.getLeft().getFirst(); + AggregationBuilder builder; + if (original instanceof NestedAggregationBuilder) { + builder = original.getSubAggregations().iterator().next(); + } else { + builder = original; + } + if (builder instanceof CompositeAggregationBuilder composite) { boolean asc = collations.get(0).getDirection() == RelFieldCollation.Direction.ASCENDING; String path = getAggregationPath(collations, fieldNames, composite); BucketOrder bucketOrder = @@ -147,6 +158,11 @@ public void rePushDownSortAggMeasure( "Cannot pushdown sort aggregate measure"); } } + if (original instanceof NestedAggregationBuilder nested) { + aggregationBuilder = + AggregationBuilders.nested(nested.getName(), nested.path()) + .subAggregation(aggregationBuilder); + } builderAndParser = Pair.of( Collections.singletonList(aggregationBuilder), @@ -157,7 +173,14 @@ public void rePushDownSortAggMeasure( /** Re-pushdown a nested aggregation for rare/top to replace the pushed composite aggregation */ public void rePushDownRareTop(RareTopDigest digest) { if (builderAndParser.getLeft().isEmpty()) return; - if (builderAndParser.getLeft().getFirst() instanceof CompositeAggregationBuilder composite) { + AggregationBuilder original = builderAndParser.getLeft().getFirst(); + AggregationBuilder builder; + if (original instanceof NestedAggregationBuilder) { + builder = original.getSubAggregations().iterator().next(); + } else { + builder = original; + } + if (builder instanceof CompositeAggregationBuilder composite) { BucketOrder bucketOrder = digest.direction() == RelFieldCollation.Direction.ASCENDING ? BucketOrder.count(true) @@ -200,6 +223,11 @@ public void rePushDownRareTop(RareTopDigest digest) { "Cannot pushdown " + digest); } } + if (aggregationBuilder != null && original instanceof NestedAggregationBuilder nested) { + aggregationBuilder = + AggregationBuilders.nested(nested.getName(), nested.path()) + .subAggregation(aggregationBuilder); + } builderAndParser = Pair.of( Collections.singletonList(aggregationBuilder), @@ -327,7 +355,13 @@ public void pushDownSortIntoAggBucket( List collations, List fieldNames) { // aggregationBuilder.getLeft() could be empty when count agg optimization works if (builderAndParser.getLeft().isEmpty()) return; - AggregationBuilder builder = builderAndParser.getLeft().getFirst(); + AggregationBuilder original = builderAndParser.getLeft().getFirst(); + AggregationBuilder builder; + if (original instanceof NestedAggregationBuilder) { + builder = original.getSubAggregations().iterator().next(); + } else { + builder = original; + } List selected = new ArrayList<>(collations.size()); if (builder instanceof CompositeAggregationBuilder compositeAggBuilder) { // It will always use a single CompositeAggregationBuilder for the aggregation with GroupBy @@ -375,13 +409,17 @@ public void pushDownSortIntoAggBucket( }); AggregatorFactories.Builder newAggBuilder = new AggregatorFactories.Builder(); compositeAggBuilder.getSubAggregations().forEach(newAggBuilder::addAggregator); + AggregationBuilder finalBuilder = + AggregationBuilders.composite("composite_buckets", newBuckets) + .subAggregations(newAggBuilder) + .size(compositeAggBuilder.size()); + if (original instanceof NestedAggregationBuilder nested) { + finalBuilder = + AggregationBuilders.nested(nested.getName(), nested.path()) + .subAggregation(finalBuilder); + } builderAndParser = - Pair.of( - Collections.singletonList( - AggregationBuilders.composite("composite_buckets", newBuckets) - .subAggregations(newAggBuilder) - .size(compositeAggBuilder.size())), - builderAndParser.getRight()); + Pair.of(Collections.singletonList(finalBuilder), builderAndParser.getRight()); bucketNames = newBucketNames; } if (builder instanceof TermsAggregationBuilder termsAggBuilder) { @@ -392,7 +430,12 @@ public void pushDownSortIntoAggBucket( public boolean isCompositeAggregation() { return builderAndParser.getLeft().stream() - .anyMatch(builder -> builder instanceof CompositeAggregationBuilder); + .anyMatch( + builder -> + builder instanceof CompositeAggregationBuilder + || (builder instanceof NestedAggregationBuilder + && builder.getSubAggregations().iterator().next() + instanceof CompositeAggregationBuilder)); } /** @@ -403,6 +446,9 @@ public boolean pushDownLimitIntoBucketSize(Integer size) { // aggregationBuilder.getLeft() could be empty when count agg optimization works if (builderAndParser.getLeft().isEmpty()) return false; AggregationBuilder builder = builderAndParser.getLeft().getFirst(); + if (builder instanceof NestedAggregationBuilder) { + builder = builder.getSubAggregations().iterator().next(); + } if (builder instanceof CompositeAggregationBuilder compositeAggBuilder) { if (size < compositeAggBuilder.size()) { compositeAggBuilder.size(size); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/response/AggregationResponseUtils.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/response/AggregationResponseUtils.java index ccdfdce7a48..f230bae5a8a 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/response/AggregationResponseUtils.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/response/AggregationResponseUtils.java @@ -26,6 +26,8 @@ import org.opensearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder; import org.opensearch.search.aggregations.bucket.histogram.ParsedDateHistogram; import org.opensearch.search.aggregations.bucket.histogram.ParsedHistogram; +import org.opensearch.search.aggregations.bucket.nested.NestedAggregationBuilder; +import org.opensearch.search.aggregations.bucket.nested.ParsedNested; import org.opensearch.search.aggregations.bucket.terms.DoubleTerms; import org.opensearch.search.aggregations.bucket.terms.LongTerms; import org.opensearch.search.aggregations.bucket.terms.ParsedDoubleTerms; @@ -87,6 +89,8 @@ public class AggregationResponseUtils { .put( TopHitsAggregationBuilder.NAME, (p, c) -> ParsedTopHits.fromXContent(p, (String) c)) + .put( + NestedAggregationBuilder.NAME, (p, c) -> ParsedNested.fromXContent(p, (String) c)) .build() .entrySet() .stream()