Skip to content

Commit f7971fc

Browse files
committed
reduce memory
1 parent 3cbcfcc commit f7971fc

File tree

9 files changed

+235
-153
lines changed

9 files changed

+235
-153
lines changed

src/query/expression/src/aggregate/aggregate_hashtable.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,8 @@ impl AggregateHashTable {
189189
group_hash_columns(group_columns, &mut state.group_hashes);
190190

191191
let new_group_count = if self.direct_append {
192-
for idx in 0..row_count {
193-
state.empty_vector[idx] = idx;
192+
for i in 0..row_count {
193+
state.empty_vector[i] = i.into();
194194
}
195195
self.payload.append_rows(state, row_count, group_columns);
196196
row_count

src/query/expression/src/aggregate/hash_index.rs

Lines changed: 57 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use super::payload_row::CompareState;
16+
use super::CompareItem;
1517
use super::PartitionedPayload;
1618
use super::ProbeState;
1719
use super::RowPtr;
@@ -151,17 +153,16 @@ impl HashIndex {
151153
row_count: usize,
152154
mut adapter: impl TableAdapter,
153155
) -> usize {
154-
for (i, row) in state.no_match_vector[..row_count].iter_mut().enumerate() {
155-
*row = i;
156+
for (i, item) in state.no_match_vector[..row_count].iter_mut().enumerate() {
157+
let hash = state.group_hashes[i];
158+
*item = CompareItem {
159+
row: i.into(),
160+
salt: Entry::hash_to_salt(hash),
161+
slot: self.init_slot(hash),
162+
row_ptr: RowPtr::null(),
163+
};
156164
}
157165

158-
let mut slots = state.get_temp();
159-
slots.extend(
160-
state.group_hashes[..row_count]
161-
.iter()
162-
.map(|hash| self.init_slot(*hash)),
163-
);
164-
165166
let mut new_group_count = 0;
166167
let mut remaining_entries = row_count;
167168

@@ -171,18 +172,21 @@ impl HashIndex {
171172
let mut no_match_count = 0;
172173

173174
// 1. inject new_group_count, new_entry_count, need_compare_count, no_match_count
174-
for row in state.no_match_vector[..remaining_entries].iter().copied() {
175-
let slot = &mut slots[row];
176-
let hash = state.group_hashes[row];
177-
178-
let is_new;
179-
(*slot, is_new) = self.find_or_insert(*slot, Entry::hash_to_salt(hash));
175+
for item in state.no_match_vector[..remaining_entries].iter_mut() {
176+
let (slot, is_new) = self.find_or_insert(item.slot, item.salt);
177+
item.slot = slot;
180178

181179
if is_new {
182-
state.empty_vector[new_entry_count] = row;
180+
state.empty_vector[new_entry_count] = item.row;
181+
state.slots[new_entry_count] = slot;
183182
new_entry_count += 1;
184183
} else {
185-
state.group_compare_vector[need_compare_count] = row;
184+
state.group_compare_vector[need_compare_count] = CompareItem {
185+
row: item.row,
186+
slot: item.slot,
187+
salt: item.salt,
188+
row_ptr: self.mut_entry(slot).get_pointer(),
189+
};
186190
need_compare_count += 1;
187191
}
188192
}
@@ -193,42 +197,45 @@ impl HashIndex {
193197

194198
adapter.append_rows(state, new_entry_count);
195199

196-
for row in state.empty_vector[..new_entry_count].iter().copied() {
197-
let entry = self.mut_entry(slots[row]);
200+
for (i, row) in state.empty_vector[..new_entry_count]
201+
.iter()
202+
.copied()
203+
.enumerate()
204+
{
205+
let entry = self.mut_entry(state.slots[i]);
198206
entry.set_pointer(state.addresses[row]);
199207
debug_assert_eq!(entry.get_pointer(), state.addresses[row]);
200208
}
201209
}
202210

203211
// 3. set address of compare vector
204212
if need_compare_count > 0 {
205-
for row in state.group_compare_vector[..need_compare_count]
206-
.iter()
207-
.copied()
208-
{
209-
let entry = self.mut_entry(slots[row]);
213+
for item in &mut state.group_compare_vector[..need_compare_count] {
214+
let entry = self.mut_entry(item.slot);
210215

211216
debug_assert!(entry.is_occupied());
212-
debug_assert_eq!(entry.get_salt(), (state.group_hashes[row] >> 48) as u16);
213-
state.addresses[row] = entry.get_pointer();
217+
debug_assert_eq!(
218+
entry.get_salt(),
219+
(state.group_hashes[item.row] >> 48) as u16
220+
);
221+
item.row_ptr = entry.get_pointer();
222+
state.addresses[item.row] = item.row_ptr;
214223
}
215224

216225
// 4. compare
217226
no_match_count = adapter.compare(state, need_compare_count, no_match_count);
218227
}
219228

220229
// 5. Linear probing, just increase iter_times
221-
for row in state.no_match_vector[..no_match_count].iter().copied() {
222-
let slot = &mut slots[row];
223-
*slot += 1;
224-
if *slot >= self.capacity {
225-
*slot = 0;
230+
for item in &mut state.no_match_vector[..no_match_count] {
231+
item.slot += 1;
232+
if item.slot >= self.capacity {
233+
item.slot = 0;
226234
}
227235
}
228236
remaining_entries = no_match_count;
229237
}
230238

231-
state.save_temp(slots);
232239
self.count += new_group_count;
233240

234241
new_group_count
@@ -251,7 +258,12 @@ impl<'a> TableAdapter for AdapterImpl<'a> {
251258
need_compare_count: usize,
252259
no_match_count: usize,
253260
) -> usize {
254-
state.row_match_columns(
261+
CompareState {
262+
compare: &mut state.group_compare_vector,
263+
matched: &mut state.match_vector,
264+
no_matched: &mut state.no_match_vector,
265+
}
266+
.row_match_columns(
255267
self.group_columns,
256268
&self.payload.row_layout,
257269
(need_compare_count, no_match_count),
@@ -284,8 +296,10 @@ mod tests {
284296
}
285297

286298
fn init_state(&self) -> ProbeState {
287-
let mut state = ProbeState::default();
288-
state.row_count = self.incoming.len();
299+
let mut state = ProbeState {
300+
row_count: self.incoming.len(),
301+
..Default::default()
302+
};
289303

290304
for (i, (_, hash)) in self.incoming.iter().enumerate() {
291305
state.group_hashes[i] = *hash
@@ -323,12 +337,13 @@ mod tests {
323337

324338
impl TableAdapter for &mut TestTableAdapter {
325339
fn append_rows(&mut self, state: &mut ProbeState, new_entry_count: usize) {
326-
for row in state.empty_vector[..new_entry_count].iter().copied() {
327-
let (key, hash) = self.incoming[row];
340+
for row in state.empty_vector[..new_entry_count].iter() {
341+
let row_index = row.to_index();
342+
let (key, hash) = self.incoming[row_index];
328343
let value = key + 20;
329344

330345
self.payload.push((key, hash, value));
331-
state.addresses[row] = self.get_row_ptr(true, row);
346+
state.addresses[*row] = self.get_row_ptr(true, row_index);
332347
}
333348
}
334349

@@ -338,23 +353,18 @@ mod tests {
338353
need_compare_count: usize,
339354
mut no_match_count: usize,
340355
) -> usize {
341-
for row in state.group_compare_vector[..need_compare_count]
342-
.iter()
343-
.copied()
344-
{
345-
let incoming = self.incoming[row];
346-
347-
let row_ptr = state.addresses[row];
356+
for item in &state.group_compare_vector[..need_compare_count] {
357+
let incoming = self.incoming[item.row.to_index()];
348358

349-
let (key, hash, _) = self.get_payload(row_ptr);
359+
let (key, hash, _) = self.get_payload(item.row_ptr);
350360

351361
const POINTER_MASK: u64 = 0x0000FFFFFFFFFFFF;
352362
assert_eq!(incoming.1 | POINTER_MASK, hash | POINTER_MASK);
353363
if incoming.0 == key {
354364
continue;
355365
}
356366

357-
state.no_match_vector[no_match_count] = row;
367+
state.no_match_vector[no_match_count] = item.clone();
358368
no_match_count += 1;
359369
}
360370

src/query/expression/src/aggregate/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ use hash_index::Entry;
3939
pub use partitioned_payload::*;
4040
pub use payload::*;
4141
pub use payload_flush::*;
42-
pub use probe_state::*;
43-
use row_ptr::RowPtr;
42+
pub use probe_state::ProbeState;
43+
use probe_state::*;
44+
use row_ptr::*;
4445

4546
// A batch size to probe, flush, repartition, etc.
4647
pub(crate) const BATCH_SIZE: usize = 2048;

src/query/expression/src/aggregate/partitioned_payload.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ impl PartitionedPayload {
122122
let partition_idx = ((hash & self.mask_v) >> self.shift_v) as usize;
123123
let (count, sel) = &mut state.partition_entries[partition_idx];
124124

125-
sel[*count] = row;
125+
sel[*count as usize] = row;
126126
*count += 1;
127127
}
128128

@@ -133,7 +133,7 @@ impl PartitionedPayload {
133133
{
134134
if *count > 0 {
135135
payload.reserve_append_rows(
136-
&sel[..*count],
136+
&sel[..*count as _],
137137
&state.group_hashes,
138138
&mut state.addresses,
139139
&mut state.page_index,
@@ -200,7 +200,7 @@ impl PartitionedPayload {
200200
let (count, sel) = &state.partition_entries[partition];
201201
if *count > 0 {
202202
let payload = &mut self.payloads[partition];
203-
payload.copy_rows(&sel[..*count], &flush_state.addresses);
203+
payload.copy_rows(&sel[..*count as _], &flush_state.addresses);
204204
}
205205
}
206206
}
@@ -239,7 +239,7 @@ impl PartitionedPayload {
239239
let partition_idx = ((hash & self.mask_v) >> self.shift_v) as usize;
240240

241241
let (count, sel) = &mut state.partition_entries[partition_idx];
242-
sel[*count] = idx;
242+
sel[*count as usize] = idx.into();
243243
*count += 1;
244244
}
245245
flush_state.flush_page_row = end;

src/query/expression/src/aggregate/payload.rs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use super::payload_row::rowformat_size;
2424
use super::payload_row::serialize_column_to_rowformat;
2525
use super::row_ptr::RowLayout;
2626
use super::row_ptr::RowPtr;
27+
use super::RowID;
2728
use crate::types::DataType;
2829
use crate::AggrState;
2930
use crate::AggregateFunctionRef;
@@ -197,17 +198,17 @@ impl Payload {
197198

198199
pub(super) fn reserve_append_rows(
199200
&mut self,
200-
select_vector: &[usize],
201+
select_vector: &[RowID],
201202
group_hashes: &[u64; BATCH_SIZE],
202203
address: &mut [RowPtr; BATCH_SIZE],
203204
page_index: &mut [usize],
204205
group_columns: ProjectedBlock,
205206
) {
206207
let tuple_size = self.tuple_size;
207208
let (mut page, mut page_index_value) = self.writable_page();
208-
for idx in select_vector.iter().copied() {
209-
address[idx] = page.data_ptr(page.rows, tuple_size);
210-
page_index[idx] = page_index_value;
209+
for row in select_vector {
210+
address[*row] = page.data_ptr(page.rows, tuple_size);
211+
page_index[*row] = page_index_value;
211212
page.rows += 1;
212213

213214
if page.rows == page.capacity {
@@ -226,7 +227,7 @@ impl Payload {
226227

227228
fn append_rows(
228229
&mut self,
229-
select_vector: &[usize],
230+
select_vector: &[RowID],
230231
group_hashes: &[u64; BATCH_SIZE],
231232
address: &mut [RowPtr; BATCH_SIZE],
232233
page_index: &mut [usize],
@@ -240,15 +241,16 @@ impl Payload {
240241
if bitmap.null_count() == 0 || bitmap.null_count() == bitmap.len() {
241242
let val: u8 = if bitmap.null_count() == 0 { 1 } else { 0 };
242243
// faster path
243-
for idx in select_vector.iter().copied() {
244+
for row in select_vector {
244245
unsafe {
245-
address[idx].write_u8(write_offset, val);
246+
address[*row].write_u8(write_offset, val);
246247
}
247248
}
248249
} else {
249-
for idx in select_vector.iter().copied() {
250+
for row in select_vector {
250251
unsafe {
251-
address[idx].write_u8(write_offset, bitmap.get_bit(idx) as u8);
252+
address[*row]
253+
.write_u8(write_offset, bitmap.get_bit(row.to_index()) as u8);
252254
}
253255
}
254256
}
@@ -276,8 +278,8 @@ impl Payload {
276278

277279
// write group hashes
278280
debug_assert!(write_offset == self.row_layout.hash_offset);
279-
for idx in select_vector.iter().copied() {
280-
address[idx].set_hash(&self.row_layout, group_hashes[idx]);
281+
for row in select_vector {
282+
address[*row].set_hash(&self.row_layout, group_hashes[*row]);
281283
}
282284

283285
debug_assert!(write_offset + 8 == self.row_layout.state_offset);
@@ -293,7 +295,7 @@ impl Payload {
293295
.iter()
294296
.copied()
295297
.enumerate()
296-
.map(|(i, idx)| (idx, unsafe { place.add(padded_size * i) }))
298+
.map(|(i, row)| (row.to_index(), unsafe { place.add(padded_size * i) }))
297299
{
298300
let place = StateAddr::from(place);
299301
address[idx].set_state_addr(&self.row_layout, &place);
@@ -336,7 +338,7 @@ impl Payload {
336338
}
337339
}
338340

339-
pub fn copy_rows(&mut self, select_vector: &[usize], address: &[RowPtr; BATCH_SIZE]) {
341+
pub fn copy_rows(&mut self, select_vector: &[RowID], address: &[RowPtr; BATCH_SIZE]) {
340342
let tuple_size = self.tuple_size;
341343
let agg_len = self.aggrs.len();
342344
let (mut page, _) = self.writable_page();
@@ -395,7 +397,7 @@ impl Payload {
395397
let partition_idx = (hash % mods) as usize;
396398

397399
let (count, sel) = &mut state.partition_entries[partition_idx];
398-
sel[*count] = idx;
400+
sel[*count as usize] = idx.into();
399401
*count += 1;
400402
}
401403
flush_state.flush_page_row = end;

0 commit comments

Comments
 (0)