Skip to content

Commit 4d6f755

Browse files
committed
add support for inbound ports to wasm
1 parent 4793aeb commit 4d6f755

File tree

9 files changed

+89
-71
lines changed

9 files changed

+89
-71
lines changed

nohup.out

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
yarn run v1.22.19
2+
error Command "startLocalRepo" not found.
3+
info Visit https://yarnpkg.com/en/docs/cli/run for documentation about this command.
4+
yarn run v1.22.19
5+
error Command "startLocalRepo" not found.
6+
info Visit https://yarnpkg.com/en/docs/cli/run for documentation about this command.

src/adapters/webassembly/index.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import {
44
wrapWasmAdapter,
55
wrapWasmModelSpec,
66
wrapWasmService
7-
} from './wasm-wrappers'
7+
} from './wasm-wrap'
88

99
export * from './wasm-interop'
1010
export * from './wasm-import'
@@ -22,7 +22,7 @@ export const adapterTypes = {
2222
export const wasmAdapters = {
2323
/**
2424
* @param {WebAssembly.Exports} wasm
25-
* @returns {import('./wasm-wrappers').ModelSpecification}
25+
* @returns {import('./wasm-wrap').ModelSpecification}
2626
*/
2727
[adapterTypes.model]: wasm => wrapWasmModelSpec(wasm),
2828
/**

src/adapters/webassembly/repo-client.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ export const RepoClient = {
4040
})
4141
})
4242
.then(function (rest) {
43-
const buf = Buffer.from(rest.data.content, 'base64')
43+
const buf = Buffer.from(rest.data.content)
4444
resolve({
4545
toString: () => buf.toString('utf-8'),
46-
asBase64Buffer: () => buf,
46+
toArrayBuffer: () => buf,
4747
toUint16Array: () =>
4848
new Uint16Array(
4949
buf.buffer,

src/adapters/webassembly/wasm-import.js

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
import { callbackify } from 'node:util'
21
import { wasmAdapters } from '.'
3-
42
import { EventBrokerFactory } from '../../../lib/domain'
3+
import { RepoClient } from './repo-client'
54

65
/**@type {import('../../domain/event-broker').EventBroker} */
76
const broker = EventBrokerFactory.getInstance()
@@ -366,19 +365,19 @@ exports.importWebAssembly = function (remoteEntry) {
366365
return adaptedExports
367366
}
368367

369-
async function compileStream (url) {
368+
async function compileStream (remoteEntry) {
370369
try {
371370
return await globalThis.WebAssembly.compileStreaming(
372-
globalThis.fetch(url)
371+
globalThis.fetch(remoteEntry.url)
373372
)
374373
} catch {
375374
return globalThis.WebAssembly.compile(
376-
await (await import('node:fs/promises')).readFile(url)
375+
(await RepoClient.fetch(remoteEntry)).toArrayBuffer()
377376
)
378377
}
379378
}
380379

381-
const initWasm = async () => instantiate(await compileStream(remoteEntry.url))
380+
const initWasm = async () => instantiate(await compileStream(remoteEntry))
382381

383382
return initWasm().then(wasmExports =>
384383
wasmAdapters[remoteEntry.type](wasmExports)

src/adapters/webassembly/wasm-interop.js

Lines changed: 42 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@
77

88
/**
99
* WASM interop functions
10-
* - find exported functions
11-
* - call exported functions
12-
* - import command configuration
10+
* - call any exported function
11+
* (no js glue code required)
1312
* - import port configuration
14-
* - decode memory addresses
13+
* - import inbound port functions
14+
* - import commands
1515
* @param {WebAssembly.Exports} wasmExports
1616
* @returns adapter functions
1717
*/
@@ -29,11 +29,23 @@ exports.WasmInterop = function (wasmExports) {
2929
_exports
3030
} = wasmExports
3131

32+
function parse (v) {
33+
return !isNaN(parseInt(v))
34+
? parseInt(v)
35+
: !isNaN(parseFloat(v))
36+
? parseFloat(v)
37+
: /true/i.test(v)
38+
? true
39+
: /false/i.test(v)
40+
? false
41+
: v
42+
}
43+
3244
/**
3345
*
3446
* @param {string} fn function name
3547
* @param {string[][]} kv key-value pairs
36-
* @returns
48+
* @returns {object}
3749
*/
3850
function lift (fn, kv) {
3951
return liftArray(
@@ -46,14 +58,14 @@ exports.WasmInterop = function (wasmExports) {
4658
2,
4759
_exports[fn](kv) >>> 0
4860
)
49-
.map(([k, v]) => ({ [k]: v }))
61+
.map(([k, v]) => ({ [k]: parse(v) }))
5062
.reduce((a, b) => ({ ...a, ...b }))
5163
}
5264

5365
/**
5466
*
5567
* @param {string[][]} kv key-value pairs in a 2 dimensional string array
56-
* @returns
68+
* @returns {string[][]}
5769
*/
5870
function lower (kv) {
5971
return (
@@ -63,7 +75,7 @@ exports.WasmInterop = function (wasmExports) {
6375
pointer,
6476
lowerArray(
6577
(pointer, value) => {
66-
store_ref(pointer, lowerString(value?.toString()) || notnull())
78+
store_ref(pointer, lowerString(value) || notnull())
6779
},
6880
4,
6981
2,
@@ -78,10 +90,15 @@ exports.WasmInterop = function (wasmExports) {
7890
)
7991
}
8092

81-
function cleanse (obj) {
82-
return Object.entries(obj)
83-
.filter(([k, v]) => ['string', 'number', 'boolean'].includes(typeof v))
84-
.map(([k, v]) => [k, v.toString()])
93+
function clean (obj) {
94+
const convert = obj =>
95+
Object.entries(obj)
96+
.filter(([k, v]) => ['string', 'number', 'boolean'].includes(typeof v))
97+
.map(([k, v]) => [k, v.toString()])
98+
99+
// handle custom port format
100+
if (obj.port && obj.args) return convert(obj.args)
101+
return convert(obj)
85102
}
86103

87104
/**
@@ -98,9 +115,7 @@ exports.WasmInterop = function (wasmExports) {
98115
* @returns {object|number} object
99116
*/
100117
function callWasmFunction (fn, obj) {
101-
const props = cleanse(obj)
102-
const kv = lower(props)
103-
return lift(fn, kv)
118+
return lift(fn, lower(clean(obj)))
104119
}
105120

106121
return Object.freeze({
@@ -135,7 +150,6 @@ exports.WasmInterop = function (wasmExports) {
135150
* wasm function.
136151
*/
137152
importWasmPorts () {
138-
/** @type {import("../../domain").ports} */
139153
const ports = getPorts()
140154
return Object.keys(ports)
141155
.map(port => {
@@ -149,32 +163,34 @@ exports.WasmInterop = function (wasmExports) {
149163
inbound
150164
] = ports[port].split(',')
151165
return {
152-
/** @type {import("../../domain").ports[x]} */
153166
[port]: {
154167
service,
155168
type,
156169
consumesEvent,
157170
producesEvent,
158171
callback: data => callWasmFunction(callback, data),
159172
undo: data => callWasmFunction(undo, data),
160-
inbound: function inbound (port, args, id) {
173+
inbound (port, args, id) {
161174
callWasmFunction(inbound, { port, ...args, id })
162175
}
163176
}
164177
}
165178
})
166-
.reduce((p, c) => ({ ...p, ...c }))
179+
.reduce((p, c) => ({ ...p, ...c }), {})
167180
},
168181

182+
/**
183+
*
184+
* @returns {{[x: string]:(x) => any}}
185+
*/
169186
importWasmPortFunctions () {
170-
const ports = getPorts()
171-
return Object.values(ports)
172-
.filter(v => v.inbound)
173-
.reduce((a, b) => [...a, ...b], [])
187+
return Object.entries(getPorts())
188+
.map(([k, v]) => [k, v.split(',')[1]])
189+
.filter(([k, v]) => v === 'inbound')
190+
.map(([k, v]) => ({ [k]: x => callWasmFunction(k, x) }))
191+
.reduce((a, b) => ({ ...a, ...b }), {})
174192
},
175193

176-
constructObject (ptr) {
177-
return constructObject(ptr, false)
178-
}
194+
callWasmFunction
179195
})
180196
}
Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
'use strict'
22

3-
const { dependencies } = require('webpack')
3+
const exports = require('webpack')
44
const { WasmInterop } = require('./wasm-interop')
55

66
/**@typedef {import("../../domain").ModelSpecification} ModelSpecification */
@@ -57,31 +57,33 @@ exports.wrapWasmModelSpec = function (wasmExports) {
5757

5858
/**
5959
*
60-
* @param {WebAssembly.Instance} instance
60+
* @param {WebAssembly.Exports} exports
6161
* @returns {Adapter}
6262
*/
63-
exports.wrapWasmAdapter = function (instance) {
64-
const { invoke } = instance.exports
65-
const interop = WasmInterop(instance)
63+
exports.wrapWasmAdapter = function (exports) {
64+
const interop = WasmInterop(exports)
6665

67-
return function (service) {
68-
return async function (options) {
69-
const { model } = options
70-
const adapter = interop.callWasmFunction(invoke, model)
71-
72-
if (service) {
73-
interop.callWasmFunction(service[adapter.serviceFn], model)
74-
}
75-
}
76-
}
66+
return Object.keys(exports)
67+
.filter(k => typeof exports[k] === 'function' && /adapter/i.test(k))
68+
.map(k => ({
69+
[k.replace('adapter', '')]: service => (model, args = []) =>
70+
service
71+
? interop.callWasmFunction(service[k], args[0] || model)
72+
: interop.callWasmFunction(k, args[0] || model)
73+
}))
74+
.reduce((a, b) => ({ ...a, ...b }))
7775
}
7876

7977
/**
8078
*
81-
* @param {WebAssembly.Instance} instance
82-
* @returns {Service}ww w
79+
* @param {WebAssembly.Exports} exports
80+
* @returns {Service}
8381
*/
84-
exports.wrapWasmService = function (instance) {
85-
const { makeService } = instance.exports
86-
return WasmInterop(instance).callWasmFunction(makeService)
82+
exports.wrapWasmService = function (exports) {
83+
return {
84+
[exports.getName()]: Object.keys(exports)
85+
.filter(k => typeof exports[k] === 'function' && /service/i.test(k))
86+
.map(k => ({ [k]: x => callWasmFunction(k, x) }))
87+
.reduce((a, b) => ({ ...a, ...b }))
88+
}
8789
}

src/domain/circuit-breaker.js

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,6 @@ const DefaultThreshold = {
3838
*/
3939
const logs = new Map()
4040

41-
/** @type {{[x: string]: number[]}} */
42-
let counters = {}
43-
4441
/**
4542
*
4643
* @param {*} id
@@ -255,13 +252,12 @@ const Switch = function (id, thresholds) {
255252
error
256253
})
257254
},
258-
259-
incrementInvocationCounter () {
260-
if (!counters[id]) counters[id] = [Date.Now()]
261-
else counters[id].push(Date.now())
262-
counters[id].filter(time => time > Date.now() - 5000).length > 999
263-
},
264-
255+
/**
256+
*
257+
* @param {*} error
258+
* @param {*} arg
259+
* @returns
260+
*/
265261
async fallbackFn (error, arg) {
266262
try {
267263
return getThreshold(error, thresholds).fallbackFn.apply(this, arg)

src/domain/datasource-factory.js

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ const DsCoreExtensions = superclass =>
128128
ModelFactory.loadModel(broker, this, model, this.name)
129129
)
130130
} catch (error) {
131-
console.error({ fn: this.list.name })
131+
console.error({ fn: this.list.name, error })
132132
throw error
133133
}
134134
}
@@ -254,8 +254,7 @@ const DataSourceFactory = (() => {
254254

255255
if (!options.ephemeral) dataSources.set(name, newDs)
256256

257-
//debug &&
258-
console.debug({ newDs })
257+
debug && console.debug({ newDs })
259258
return newDs
260259
}
261260

src/domain/thread-pool.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -349,8 +349,8 @@ export class ThreadPool extends EventEmitter {
349349
}
350350

351351
async stopThreads (reason) {
352-
for (const thread of this.threads)
353-
console.warn(await this.stopThread(thread, reason))
352+
for await (const thread of this.threads)
353+
console.warn(this.stopThread(thread, reason))
354354
this.freeThreads.splice(0, this.freeThreads.length)
355355
return this
356356
}

0 commit comments

Comments
 (0)