Skip to content

Commit 8ad2bb1

Browse files
IGNITE-12692 SQL Calcite: Distributed table modify
1 parent 4d7cd80 commit 8ad2bb1

File tree

8 files changed

+689
-355
lines changed

8 files changed

+689
-355
lines changed
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.benchmarks.jmh.sql;
19+
20+
import java.util.List;
21+
import java.util.concurrent.TimeUnit;
22+
import org.apache.ignite.Ignite;
23+
import org.apache.ignite.IgniteCache;
24+
import org.apache.ignite.IgniteDataStreamer;
25+
import org.apache.ignite.Ignition;
26+
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
27+
import org.apache.ignite.cache.query.SqlFieldsQuery;
28+
import org.apache.ignite.cache.query.annotations.QuerySqlField;
29+
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
30+
import org.apache.ignite.configuration.CacheConfiguration;
31+
import org.apache.ignite.configuration.IgniteConfiguration;
32+
import org.apache.ignite.configuration.SqlConfiguration;
33+
import org.apache.ignite.indexing.IndexingQueryEngineConfiguration;
34+
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
35+
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
36+
import org.openjdk.jmh.annotations.BenchmarkMode;
37+
import org.openjdk.jmh.annotations.Fork;
38+
import org.openjdk.jmh.annotations.Level;
39+
import org.openjdk.jmh.annotations.Measurement;
40+
import org.openjdk.jmh.annotations.Mode;
41+
import org.openjdk.jmh.annotations.OutputTimeUnit;
42+
import org.openjdk.jmh.annotations.Param;
43+
import org.openjdk.jmh.annotations.Scope;
44+
import org.openjdk.jmh.annotations.Setup;
45+
import org.openjdk.jmh.annotations.State;
46+
import org.openjdk.jmh.annotations.TearDown;
47+
import org.openjdk.jmh.annotations.Warmup;
48+
49+
/**
50+
* Abstract SQL queries benchmark.
51+
*/
52+
@Fork(1)
53+
@BenchmarkMode(Mode.Throughput)
54+
@OutputTimeUnit(TimeUnit.SECONDS)
55+
@Warmup(iterations = 3, time = 5)
56+
@Measurement(iterations = 3, time = 5)
57+
@State(Scope.Benchmark)
58+
public abstract class JmhSqlAbstractBenchmark {
59+
/** Count of server nodes. */
60+
protected static final int SRV_NODES_CNT = 3;
61+
62+
/** Keys count. */
63+
protected static final int KEYS_CNT = 100000;
64+
65+
/** Size of batch. */
66+
protected static final int BATCH_SIZE = 1000;
67+
68+
/** Partitions count. */
69+
protected static final int PARTS_CNT = 1024;
70+
71+
/** IP finder shared across nodes. */
72+
private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
73+
74+
/** Query engine. */
75+
@Param({"H2", "CALCITE"})
76+
protected String engine;
77+
78+
/** Ignite client. */
79+
protected Ignite client;
80+
81+
/** Servers. */
82+
protected final Ignite[] servers = new Ignite[SRV_NODES_CNT];
83+
84+
/** Cache. */
85+
private IgniteCache<Integer, Item> cache;
86+
87+
/**
88+
* Create Ignite configuration.
89+
*
90+
* @param igniteInstanceName Ignite instance name.
91+
* @return Configuration.
92+
*/
93+
protected IgniteConfiguration configuration(String igniteInstanceName) {
94+
IgniteConfiguration cfg = new IgniteConfiguration();
95+
96+
cfg.setIgniteInstanceName(igniteInstanceName);
97+
cfg.setLocalHost("127.0.0.1");
98+
cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
99+
cfg.setSqlConfiguration(new SqlConfiguration().setQueryEnginesConfiguration(
100+
"CALCITE".equals(engine) ? new CalciteQueryEngineConfiguration() : new IndexingQueryEngineConfiguration()
101+
));
102+
103+
return cfg;
104+
}
105+
106+
/**
107+
* Initiate Ignite and caches.
108+
*/
109+
@Setup(Level.Trial)
110+
public void setup() {
111+
for (int i = 0; i < SRV_NODES_CNT; i++)
112+
servers[i] = Ignition.start(configuration("server" + i));
113+
114+
client = Ignition.start(configuration("client").setClientMode(true));
115+
116+
cache = client.getOrCreateCache(new CacheConfiguration<Integer, Item>("CACHE")
117+
.setIndexedTypes(Integer.class, Item.class)
118+
.setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT))
119+
);
120+
121+
try (IgniteDataStreamer<Integer, Item> ds = client.dataStreamer("CACHE")) {
122+
for (int i = 0; i < KEYS_CNT; i++)
123+
ds.addData(i, new Item(i));
124+
}
125+
}
126+
127+
/**
128+
* Stop Ignite instance.
129+
*/
130+
@TearDown
131+
public void tearDown() {
132+
client.close();
133+
134+
for (Ignite ignite : servers)
135+
ignite.close();
136+
}
137+
138+
/** */
139+
protected List<List<?>> executeSql(String sql, Object... args) {
140+
return cache.query(new SqlFieldsQuery(sql).setArgs(args)).getAll();
141+
}
142+
143+
/** */
144+
protected static class Item {
145+
/** */
146+
@QuerySqlField
147+
private final String name;
148+
149+
/** */
150+
@QuerySqlField
151+
private final int fld;
152+
153+
/** */
154+
@QuerySqlField
155+
private final int fldBatch;
156+
157+
/** */
158+
@QuerySqlField(index = true)
159+
private final int fldIdx;
160+
161+
/** */
162+
@QuerySqlField(index = true)
163+
private final int fldIdxBatch;
164+
165+
/** */
166+
public Item(int val) {
167+
name = "name" + val;
168+
fld = val;
169+
fldBatch = val / BATCH_SIZE;
170+
fldIdx = val;
171+
fldIdxBatch = val / BATCH_SIZE;
172+
}
173+
}
174+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.benchmarks.jmh.sql;
19+
20+
import java.util.List;
21+
import org.openjdk.jmh.annotations.Benchmark;
22+
import org.openjdk.jmh.runner.Runner;
23+
import org.openjdk.jmh.runner.options.Options;
24+
import org.openjdk.jmh.runner.options.OptionsBuilder;
25+
26+
/**
27+
* Benchmark aggregate SQL queries.
28+
*/
29+
public class JmhSqlAggBenchmark extends JmhSqlAbstractBenchmark {
30+
/**
31+
* Query with group by and aggregate.
32+
*/
33+
@Benchmark
34+
public void queryGroupBy() {
35+
List<?> res = executeSql("SELECT fldBatch, AVG(fld) FROM Item GROUP BY fldBatch");
36+
37+
if (res.size() != KEYS_CNT / BATCH_SIZE)
38+
throw new AssertionError("Unexpected result size: " + res.size());
39+
}
40+
41+
/**
42+
* Query with indexed field group by and aggregate.
43+
*/
44+
@Benchmark
45+
public void queryGroupByIndexed() {
46+
List<?> res = executeSql("SELECT fldIdxBatch, AVG(fld) FROM Item GROUP BY fldIdxBatch");
47+
48+
if (res.size() != KEYS_CNT / BATCH_SIZE)
49+
throw new AssertionError("Unexpected result size: " + res.size());
50+
}
51+
52+
/**
53+
* Query sum of indexed field.
54+
*/
55+
@Benchmark
56+
public void querySumIndexed() {
57+
List<List<?>> res = executeSql("SELECT sum(fldIdx) FROM Item");
58+
59+
Long expRes = ((long)KEYS_CNT) * (KEYS_CNT - 1) / 2;
60+
61+
if (!expRes.equals(res.get(0).get(0)))
62+
throw new AssertionError("Unexpected result: " + res.get(0));
63+
}
64+
65+
/**
66+
* Run benchmarks.
67+
*
68+
* @param args Args.
69+
* @throws Exception Exception.
70+
*/
71+
public static void main(String[] args) throws Exception {
72+
final Options options = new OptionsBuilder()
73+
.include(JmhSqlAggBenchmark.class.getSimpleName())
74+
.build();
75+
76+
new Runner(options).run();
77+
}
78+
}

0 commit comments

Comments
 (0)