blob: bf4155b2a062655e9fa2d7a58ab65de44355fc1e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { CombineFn } from "./group_and_combine";
import { Coder } from "../coders/coders";
import { VarIntCoder } from "../coders/standard_coders";
// TODO(cleanup): These reductions only work on Arrays, not Iterables.
export const count: CombineFn<any, number, number> = {
createAccumulator: () => 0,
addInput: (acc, i) => acc + 1,
mergeAccumulators: (accumulators: number[]) =>
accumulators.reduce((prev, current) => prev + current),
extractOutput: (acc) => acc,
accumulatorCoder: () => new VarIntCoder(),
};
export const sum: CombineFn<number, number, number> = {
createAccumulator: () => 0,
addInput: (acc: number, i: number) => acc + i,
mergeAccumulators: (accumulators: number[]) =>
accumulators.reduce((prev, current) => prev + current),
extractOutput: (acc: number) => acc,
accumulatorCoder: (inputCoder: Coder<number>) => inputCoder,
};
export const max: CombineFn<any, any, any> = {
createAccumulator: () => undefined,
addInput: (acc: any, i: any) => (acc === undefined || acc < i ? i : acc),
mergeAccumulators: (accumulators: any[]) =>
accumulators
.filter((x) => x !== undefined)
.reduce((a, b) => (a > b ? a : b), undefined),
extractOutput: (acc: any) => acc,
};
export const min: CombineFn<any, any, any> = {
createAccumulator: () => undefined,
addInput: (acc: any, i: any) => (acc === undefined || acc > i ? i : acc),
mergeAccumulators: (accumulators: any[]) =>
accumulators
.filter((x) => x !== undefined)
.reduce((a, b) => (a < b ? a : b), undefined),
extractOutput: (acc: any) => acc,
};
export const mean: CombineFn<number, [number, number], number> = {
createAccumulator: () => [0, 0],
addInput: ([sum, count]: [number, number], i: number) => [sum + i, count + 1],
mergeAccumulators: (accumulators: [number, number][]) =>
accumulators.reduce(([sum0, count0], [sum1, count1]) => [
sum0 + sum1,
count0 + count1,
]),
extractOutput: ([sum, count]: [number, number]) => sum / count,
};
import { requireForSerialization } from "../serialization";
requireForSerialization("apache-beam/transforms/combiners", exports);