diff options
Diffstat (limited to 'src/util/aggregate.js')
-rw-r--r-- | src/util/aggregate.js | 108 |
1 files changed, 94 insertions, 14 deletions
diff --git a/src/util/aggregate.js b/src/util/aggregate.js index c5d4198..3ad8bdb 100644 --- a/src/util/aggregate.js +++ b/src/util/aggregate.js @@ -91,6 +91,46 @@ export function openAggregate({ return aggregate.callAsync(() => withAggregateAsync(...args)); }; + aggregate.receive = (results) => { + if (!Array.isArray(results)) { + if (typeof results === 'object' && results.aggregate) { + const {aggregate, result} = results; + + try { + aggregate.close(); + } catch (error) { + errors.push(error); + } + + return result; + } + + throw new Error(`Expected an array or {aggregate, result} object`); + } + + return results.map(({aggregate, result}) => { + if (!aggregate) { + console.log('nope:', results); + throw new Error(`Expected an array of {aggregate, result} objects`); + } + + try { + aggregate.close(); + } catch (error) { + errors.push(error); + } + + return result; + }); + }; + + aggregate.contain = (results) => { + return { + aggregate, + result: aggregate.receive(results), + }; + }; + aggregate.map = (...args) => { const parent = aggregate; const {result, aggregate: child} = mapAggregate(...args); @@ -136,18 +176,33 @@ export function aggregateThrows(errorClass) { return {[openAggregate.errorClassSymbol]: errorClass}; } -// Helper function for allowing both (fn, aggregateOpts) and (aggregateOpts, fn) -// in aggregate utilities. -function _reorganizeAggregateArguments(arg1, arg2) { - if (typeof arg1 === 'function') { - return {fn: arg1, opts: arg2 ?? {}}; - } else if (typeof arg2 === 'function') { - return {fn: arg2, opts: arg1 ?? {}}; +// Helper function for allowing both (fn, opts) and (opts, fn) in aggregate +// utilities (or other shapes besides functions). +function _reorganizeAggregateArguments(arg1, arg2, desire = v => typeof v === 'function') { + if (desire(arg1)) { + return [arg1, arg2 ?? {}]; + } else if (desire(arg2)) { + return [arg2, arg1]; } else { - throw new Error(`Expected a function`); + return [undefined, undefined]; } } +// Takes a list of {aggregate, result} objects, puts all the aggregates into +// a new aggregate, and puts all the results into an array, returning both on +// a new {aggregate, result} object. This is essentailly the generalized +// composable version of functions like mapAggregate or filterAggregate. +export function receiveAggregate(arg1, arg2) { + const [array, opts] = _reorganizeAggregateArguments(arg1, arg2, Array.isArray); + if (!array) { + throw new Error(`Expected an array`); + } + + const aggregate = openAggregate(opts); + const result = aggregate.receive(array); + return {aggregate, result}; +} + // Performs an ordinary array map with the given function, collating into a // results array (with errored inputs filtered out) and an error aggregate. // @@ -158,12 +213,20 @@ function _reorganizeAggregateArguments(arg1, arg2) { // use aggregate.close() to throw the error. (This aggregate may be passed to a // parent aggregate: `parent.call(aggregate.close)`!) export function mapAggregate(array, arg1, arg2) { - const {fn, opts} = _reorganizeAggregateArguments(arg1, arg2); + const [fn, opts] = _reorganizeAggregateArguments(arg1, arg2); + if (!fn) { + throw new Error(`Expected a function`); + } + return _mapAggregate('sync', null, array, fn, opts); } export function mapAggregateAsync(array, arg1, arg2) { - const {fn, opts} = _reorganizeAggregateArguments(arg1, arg2); + const [fn, opts] = _reorganizeAggregateArguments(arg1, arg2); + if (!fn) { + throw new Error(`Expected a function`); + } + const {promiseAll = Promise.all.bind(Promise), ...remainingOpts} = opts; return _mapAggregate('async', promiseAll, array, fn, remainingOpts); } @@ -200,12 +263,20 @@ export function _mapAggregate(mode, promiseAll, array, fn, aggregateOpts) { // // As with mapAggregate, the returned aggregate property is not yet closed. export function filterAggregate(array, arg1, arg2) { - const {fn, opts} = _reorganizeAggregateArguments(arg1, arg2); + const [fn, opts] = _reorganizeAggregateArguments(arg1, arg2); + if (!fn) { + throw new Error(`Expected a function`); + } + return _filterAggregate('sync', null, array, fn, opts); } export async function filterAggregateAsync(array, arg1, arg2) { - const {fn, opts} = _reorganizeAggregateArguments(arg1, arg2); + const [fn, opts] = _reorganizeAggregateArguments(arg1, arg2); + if (!fn) { + throw new Error(`Expected a function`); + } + const {promiseAll = Promise.all.bind(Promise), ...remainingOpts} = opts; return _filterAggregate('async', promiseAll, array, fn, remainingOpts); } @@ -268,12 +339,20 @@ function _filterAggregate(mode, promiseAll, array, fn, aggregateOpts) { // function with it, then closing the function and returning the result (if // there's no throw). export function withAggregate(arg1, arg2) { - const {fn, opts} = _reorganizeAggregateArguments(arg1, arg2); + const [fn, opts] = _reorganizeAggregateArguments(arg1, arg2); + if (!fn) { + throw new Error(`Expected a function`); + } + return _withAggregate('sync', opts, fn); } export function withAggregateAsync(arg1, arg2) { - const {fn, opts} = _reorganizeAggregateArguments(arg1, arg2); + const [fn, opts] = _reorganizeAggregateArguments(arg1, arg2); + if (!fn) { + throw new Error(`Expected a function`); + } + return _withAggregate('async', opts, fn); } @@ -294,6 +373,7 @@ export function _withAggregate(mode, aggregateOpts, fn) { export const unhelpfulTraceLines = [ /sugar/, + /aggregate/, /node:/, /<anonymous>/, ]; |