From c8ebe6b73dfec575f765029845bcbc77ed7e1db2 Mon Sep 17 00:00:00 2001 From: "(quasar) nebula" Date: Mon, 6 May 2024 09:59:09 -0300 Subject: aggregate: receiveAggregate, aggregate.receive --- src/util/aggregate.js | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) (limited to 'src/util') diff --git a/src/util/aggregate.js b/src/util/aggregate.js index 5ba80b59..2e4d9de7 100644 --- a/src/util/aggregate.js +++ b/src/util/aggregate.js @@ -91,6 +91,22 @@ export function openAggregate({ return aggregate.callAsync(() => withAggregateAsync(...args)); }; + aggregate.receive = (results) => { + if (!Array.isArray(results)) { + throw new Error(`Expected an array`); + } + + return results.map(({aggregate, result}) => { + try { + aggregate.close(); + } catch (error) { + errors.push(error); + } + + return result; + }); + }; + aggregate.map = (...args) => { const parent = aggregate; const {result, aggregate: child} = mapAggregate(...args); @@ -148,6 +164,21 @@ function _reorganizeAggregateArguments(arg1, arg2, desire = v => typeof v === 'f } } +// Takes a list of {aggregate, result} objects, puts all the aggregates into +// a new aggregate, and puts all the results into an array, returning both on +// a new {aggregate, result} object. This is essentailly the generalized +// composable version of functions like mapAggregate or filterAggregate. +export function receiveAggregate(arg1, arg2) { + const [array, opts] = _reorganizeAggregateArguments(arg1, arg2, Array.isArray); + if (!array) { + throw new Error(`Expected an array`); + } + + const aggregate = openAggregate(opts); + const result = aggregate.receive(array); + return {aggregate, result}; +} + // Performs an ordinary array map with the given function, collating into a // results array (with errored inputs filtered out) and an error aggregate. // -- cgit 1.3.0-6-gf8a5