blob: a11f5e3b5baeec04cd8f181f2a1c5169975d0187 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
config { hasOutput: true, tags: ["req", "udfs"] }
CREATE OR REPLACE AGGREGATE FUNCTION ${self()}(sketch BYTES, params STRUCT<k INT, hra BOOL> NOT AGGREGATE)
RETURNS BYTES
LANGUAGE js
OPTIONS (
library=["${dataform.projectConfig.vars.jsBucket}/req_sketch_float.mjs"],
description = '''Merges sketches from the given column.
Param sketch: the column of values.
Param k: the sketch accuracy/size parameter as an even INT in the range [4, 65534].
Param hra: if true, the high ranks are prioritized for better accuracy. Otherwise the low ranks are prioritized for better accuracy.
Returns: a serialized REQ sketch as BYTES.
For more information:
- https://datasketches.apache.org/docs/REQ/ReqSketch.html
'''
) AS R"""
import ModuleFactory from "${dataform.projectConfig.vars.jsBucket}/req_sketch_float.mjs";
var Module = await ModuleFactory();
const default_k = Number(Module.DEFAULT_K);
// UDAF interface
export function initialState(params) {
return {
k: params.k == null ? default_k : Number(params.k),
hra: params.hra == null ? true : params.hra
};
}
export function aggregate(state, sketch) {
try {
if (state.sketch == null) {
state.sketch = new Module.req_sketch_float(state.k, state.hra);
}
state.sketch.merge(sketch);
} catch (e) {
if (e.message != null) throw e;
throw new Error(Module.getExceptionMessage(e));
}
}
export function serialize(state) {
if (state.sketch == null && state.serialized != null) return state; // for transition deserialize-serialize
try {
if (state.sketch != null) {
// for prior transition deserialize-aggregate
// merge aggregated and serialized state
if (state.serialized != null) state.sketch.merge(state.serialized);
state.serialized = state.sketch.serializeAsUint8Array();
} else {
state.serialized = null;
}
return {
k: state.k,
hra: state.hra,
serialized: state.serialized
};
} catch (e) {
if (e.message != null) throw e;
throw new Error(Module.getExceptionMessage(e));
} finally {
if (state.sketch != null) {
state.sketch.delete();
delete state.sketch;
}
}
}
export function deserialize(serialized) {
return serialized;
}
export function merge(state, other_state) {
try {
if (state.sketch == null) {
state.sketch = new Module.req_sketch_float(state.k, state.hra);
}
if (state.serialized != null) {
state.sketch.merge(state.serialized);
delete state.serialized;
}
if (other_state.serialized != null) {
state.sketch.merge(other_state.serialized);
delete other_state.serialized;
}
} catch (e) {
if (e.message != null) throw e;
throw new Error(Module.getExceptionMessage(e));
}
}
export function finalize(state) {
return serialize(state).serialized;
}
""";