Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSortedIndexSpool;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -149,4 +150,11 @@ public double getRowCount(IgniteAggregate rel, RelMetadataQuery mq) {
public double getRowCount(IgniteLimit rel, RelMetadataQuery mq) {
return rel.estimateRowCount(mq);
}

/**
* Estimation of row count for Table modify operator.
*/
public double getRowCount(IgniteTableModify rel, RelMetadataQuery mq) {
return rel.estimateRowCount(mq);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@
import org.apache.ignite.internal.processors.query.calcite.rule.SortAggregateConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.SortConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.TableFunctionScanConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.TableModifyConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.TableModifyDistributedConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.TableModifySingleNodeConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.UncollectConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.UnionConverterRule;
import org.apache.ignite.internal.processors.query.calcite.rule.ValuesConverterRule;
Expand Down Expand Up @@ -292,7 +293,8 @@ public enum PlannerPhase {
SetOpConverterRule.MAP_REDUCE_INTERSECT,
ProjectConverterRule.INSTANCE,
FilterConverterRule.INSTANCE,
TableModifyConverterRule.INSTANCE,
TableModifySingleNodeConverterRule.INSTANCE,
TableModifyDistributedConverterRule.INSTANCE,
UnionConverterRule.INSTANCE,
SortConverterRule.INSTANCE,
TableFunctionScanConverterRule.INSTANCE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,25 @@
package org.apache.ignite.internal.processors.query.calcite.rel;

import java.util.List;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;

/** */
public class IgniteTableModify extends TableModify implements IgniteRel {
/** If table modify can affect data source. */
private final boolean affectsSrc;

/**
* Creates a {@code TableModify}.
*
Expand All @@ -43,8 +51,9 @@ public class IgniteTableModify extends TableModify implements IgniteRel {
* @param input Sub-query or filter condition.
* @param operation Modify operation (INSERT, UPDATE, DELETE, MERGE).
* @param updateColumnList List of column identifiers to be updated (e.g. ident1, ident2); null if not UPDATE.
* @param sourceExpressionList List of value expressions to be set (e.g. exp1, exp2); null if not UPDATE.
* @param srcExpressionList List of value expressions to be set (e.g. exp1, exp2); null if not UPDATE.
* @param flattened Whether set flattens the input row type.
* @param affectsSrc If table modify can affect data source.
*/
public IgniteTableModify(
RelOptCluster cluster,
Expand All @@ -53,12 +62,15 @@ public IgniteTableModify(
RelNode input,
Operation operation,
List<String> updateColumnList,
List<RexNode> sourceExpressionList,
boolean flattened
List<RexNode> srcExpressionList,
boolean flattened,
boolean affectsSrc
) {
super(cluster, traitSet, table, Commons.context(cluster).catalogReader(),
input, operation, updateColumnList,
sourceExpressionList, flattened);
srcExpressionList, flattened);

this.affectsSrc = affectsSrc;
}

/**
Expand All @@ -75,7 +87,8 @@ public IgniteTableModify(RelInput input) {
input.getEnum("operation", Operation.class),
input.getStringList("updateColumnList"),
input.getExpressionList("sourceExpressionList"),
input.getBoolean("flattened", true)
input.getBoolean("flattened", true),
false // Field only for planning.
);
}

Expand All @@ -89,7 +102,8 @@ public IgniteTableModify(RelInput input) {
getOperation(),
getUpdateColumnList(),
getSourceExpressionList(),
isFlattened());
isFlattened(),
affectsSrc);
}

/** {@inheritDoc} */
Expand All @@ -100,6 +114,34 @@ public IgniteTableModify(RelInput input) {
/** {@inheritDoc} */
@Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
return new IgniteTableModify(cluster, getTraitSet(), getTable(), sole(inputs),
getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened());
getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened(), affectsSrc);
}

/** {@inheritDoc} */
@Override public double estimateRowCount(RelMetadataQuery mq) {
return 1.0D;
}

/** {@inheritDoc} */
@Override public Pair<RelTraitSet, List<RelTraitSet>> deriveTraits(RelTraitSet childTraits, int childId) {
// Don't derive traits for single-node table modify.
if (TraitUtils.distribution(traitSet) == IgniteDistributions.single())
return null;

assert childId == 0;

if (childTraits.getConvention() != IgniteConvention.INSTANCE)
return null;

// If modify can affect data source (for example, INSERT contains self table as source) only
// modified table affinity distibution is possible, otherwise inconsistency is possible on remote nodes.
if (affectsSrc)
return null;

// Any distributed (random/hash) trait is accepted if data source is not affected by modify.
if (!TraitUtils.distribution(childTraits).satisfies(IgniteDistributions.random()))
return null;

return Pair.of(traitSet, ImmutableList.of(childTraits));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.calcite.rel.convert.ConverterRule;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
import org.jetbrains.annotations.Nullable;

/** */
public abstract class AbstractIgniteConverterRule<T extends RelNode> extends ConverterRule {
Expand All @@ -39,7 +40,7 @@ protected AbstractIgniteConverterRule(Class<T> clazz, String descriptionPrefix)
}

/** {@inheritDoc} */
@Override public final RelNode convert(RelNode rel) {
@Override public final @Nullable RelNode convert(RelNode rel) {
return convert(rel.getCluster().getPlanner(), rel.getCluster().getMetadataQuery(), (T)rel);
}

Expand All @@ -51,5 +52,5 @@ protected AbstractIgniteConverterRule(Class<T> clazz, String descriptionPrefix)
* @param rel Rel node.
* @return Physical rel.
*/
protected abstract PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, T rel);
protected abstract @Nullable PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, T rel);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.PhysicalNode;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.convert.ConverterRule;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.ignite.internal.processors.query.calcite.hint.HintUtils;
Expand All @@ -31,16 +31,17 @@
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHashAggregate;
import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceHashAggregate;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.jetbrains.annotations.Nullable;

/**
*
*/
public class HashAggregateConverterRule {
/** */
public static final RelOptRule COLOCATED = new ColocatedHashAggregateConverterRule();
public static final ConverterRule COLOCATED = new ColocatedHashAggregateConverterRule();

/** */
public static final RelOptRule MAP_REDUCE = new MapReduceHashAggregateConverterRule();
public static final ConverterRule MAP_REDUCE = new MapReduceHashAggregateConverterRule();

/** */
private HashAggregateConverterRule() {
Expand All @@ -55,8 +56,11 @@ private static class ColocatedHashAggregateConverterRule extends AbstractIgniteC
}

/** {@inheritDoc} */
@Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq,
LogicalAggregate agg) {
@Override protected @Nullable PhysicalNode convert(
RelOptPlanner planner,
RelMetadataQuery mq,
LogicalAggregate agg
) {
if (HintUtils.isExpandDistinctAggregate(agg))
return null;

Expand Down Expand Up @@ -84,8 +88,11 @@ private static class MapReduceHashAggregateConverterRule extends AbstractIgniteC
}

/** {@inheritDoc} */
@Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq,
LogicalAggregate agg) {
@Override protected @Nullable PhysicalNode convert(
RelOptPlanner planner,
RelMetadataQuery mq,
LogicalAggregate agg
) {
if (HintUtils.isExpandDistinctAggregate(agg))
return null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import java.util.Set;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.PhysicalNode;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.convert.ConverterRule;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
Expand All @@ -39,7 +39,7 @@
*/
public class ProjectConverterRule extends AbstractIgniteConverterRule<LogicalProject> {
/** */
public static final RelOptRule INSTANCE = new ProjectConverterRule();
public static final ConverterRule INSTANCE = new ProjectConverterRule();

/** */
public ProjectConverterRule() {
Expand Down
Loading
Loading