From 57d0f8f7b29107ee17053ac6d47d2c4d476e8cb6 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 8 Jul 2025 10:34:08 +0200 Subject: [PATCH 1/2] Remove copy of the reduce operator --- packages/d2mini/src/operators/reduce copy.ts | 103 ------------------- 1 file changed, 103 deletions(-) delete mode 100644 packages/d2mini/src/operators/reduce copy.ts diff --git a/packages/d2mini/src/operators/reduce copy.ts b/packages/d2mini/src/operators/reduce copy.ts deleted file mode 100644 index 6704a38..0000000 --- a/packages/d2mini/src/operators/reduce copy.ts +++ /dev/null @@ -1,103 +0,0 @@ -import { IStreamBuilder, KeyValue } from '../types.js' -import { - DifferenceStreamReader, - DifferenceStreamWriter, - UnaryOperator, -} from '../graph.js' -import { StreamBuilder } from '../d2.js' -import { MultiSet } from '../multiset.js' -import { Index } from '../indexes.js' -import { hash } from '../utils.js' - -/** - * Base operator for reduction operations (version-free) - */ -export class ReduceOperator extends UnaryOperator<[K, V1], [K, V2]> { - #index = new Index() - #indexOut = new Index() - #f: (values: [V1, number][]) => [V2, number][] - - constructor( - id: number, - inputA: DifferenceStreamReader<[K, V1]>, - output: DifferenceStreamWriter<[K, V2]>, - f: (values: [V1, number][]) => [V2, number][], - ) { - super(id, inputA, output) - this.#f = f - } - - run(): void { - const keysTodo = new Set() - - // Collect all input messages and update the index - for (const message of this.inputMessages()) { - for (const [item, multiplicity] of message.getInner()) { - const [key, value] = item - this.#index.addValue(key, [value, multiplicity]) - keysTodo.add(key) - } - } - - // For each key, compute the reduction and delta - const result: [[K, V2], number][] = [] - for (const key of keysTodo) { - const curr = this.#index.get(key) - const currOut = this.#indexOut.get(key) - const out = this.#f(curr) - - // Calculate delta between current and previous output - const delta = new Map() - const values = new Map() - for (const [value, multiplicity] of out) { - const valueKey = hash(value) - values.set(valueKey, value) - delta.set(valueKey, (delta.get(valueKey) || 0) + multiplicity) - } - for (const [value, multiplicity] of currOut) { - const valueKey = hash(value) - values.set(valueKey, value) - delta.set(valueKey, (delta.get(valueKey) || 0) - multiplicity) - } - - // Add non-zero deltas to result - for (const [valueKey, multiplicity] of delta) { - const value = values.get(valueKey)! - if (multiplicity !== 0) { - result.push([[key, value], multiplicity]) - this.#indexOut.addValue(key, [value, multiplicity]) - } - } - } - - if (result.length > 0) { - this.output.sendData(new MultiSet(result)) - } - } -} - -/** - * Reduces the elements in the stream by key (version-free) - */ -export function reduce< - K extends T extends KeyValue ? K : never, - V1 extends T extends KeyValue ? V : never, - R, - T, ->(f: (values: [V1, number][]) => [R, number][]) { - return (stream: IStreamBuilder): IStreamBuilder> => { - const output = new StreamBuilder>( - stream.graph, - new DifferenceStreamWriter>(), - ) - const operator = new ReduceOperator( - stream.graph.getNextOperatorId(), - stream.connectReader() as DifferenceStreamReader>, - output.writer, - f, - ) - stream.graph.addOperator(operator) - stream.graph.addStream(output.connectReader()) - return output - } -} From b23c4240b65f29cbaf302b813de587ae7357672e Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 8 Jul 2025 10:35:07 +0200 Subject: [PATCH 2/2] Changeset --- .changeset/new-colts-doubt.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/new-colts-doubt.md diff --git a/.changeset/new-colts-doubt.md b/.changeset/new-colts-doubt.md new file mode 100644 index 0000000..4a6a5dc --- /dev/null +++ b/.changeset/new-colts-doubt.md @@ -0,0 +1,5 @@ +--- +'@electric-sql/d2mini': patch +--- + +Remove file that was pushed accidentally.