| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| config { hasOutput: true, tags: ["req", "udfs"] } |
| |
| CREATE OR REPLACE AGGREGATE FUNCTION ${self()}(sketch BYTES, params STRUCT<k INT, hra BOOL> NOT AGGREGATE) |
| RETURNS BYTES |
| LANGUAGE js |
| OPTIONS ( |
| library=["${dataform.projectConfig.vars.jsBucket}/req_sketch_float.mjs"], |
| description = '''Merges sketches from the given column. |
| |
| Param sketch: the column of values. |
| Param k: the sketch accuracy/size parameter as an even INT in the range [4, 65534]. |
| Param hra: if true, the high ranks are prioritized for better accuracy. Otherwise the low ranks are prioritized for better accuracy. |
| Returns: a serialized REQ sketch as BYTES. |
| |
| For more information: |
| - https://datasketches.apache.org/docs/REQ/ReqSketch.html |
| ''' |
| ) AS R""" |
| import ModuleFactory from "${dataform.projectConfig.vars.jsBucket}/req_sketch_float.mjs"; |
| var Module = await ModuleFactory(); |
| const default_k = Number(Module.DEFAULT_K); |
| |
| // UDAF interface |
| export function initialState(params) { |
| return { |
| k: params.k == null ? default_k : Number(params.k), |
| hra: params.hra == null ? true : params.hra |
| }; |
| } |
| |
| export function aggregate(state, sketch) { |
| try { |
| if (state.sketch == null) { |
| state.sketch = new Module.req_sketch_float(state.k, state.hra); |
| } |
| state.sketch.merge(sketch); |
| } catch (e) { |
| if (e.message != null) throw e; |
| throw new Error(Module.getExceptionMessage(e)); |
| } |
| } |
| |
| export function serialize(state) { |
| if (state.sketch == null && state.serialized != null) return state; // for transition deserialize-serialize |
| try { |
| if (state.sketch != null) { |
| // for prior transition deserialize-aggregate |
| // merge aggregated and serialized state |
| if (state.serialized != null) state.sketch.merge(state.serialized); |
| state.serialized = state.sketch.serializeAsUint8Array(); |
| } else { |
| state.serialized = null; |
| } |
| return { |
| k: state.k, |
| hra: state.hra, |
| serialized: state.serialized |
| }; |
| } catch (e) { |
| if (e.message != null) throw e; |
| throw new Error(Module.getExceptionMessage(e)); |
| } finally { |
| if (state.sketch != null) { |
| state.sketch.delete(); |
| delete state.sketch; |
| } |
| } |
| } |
| |
| export function deserialize(serialized) { |
| return serialized; |
| } |
| |
| export function merge(state, other_state) { |
| try { |
| if (state.sketch == null) { |
| state.sketch = new Module.req_sketch_float(state.k, state.hra); |
| } |
| if (state.serialized != null) { |
| state.sketch.merge(state.serialized); |
| delete state.serialized; |
| } |
| if (other_state.serialized != null) { |
| state.sketch.merge(other_state.serialized); |
| delete other_state.serialized; |
| } |
| } catch (e) { |
| if (e.message != null) throw e; |
| throw new Error(Module.getExceptionMessage(e)); |
| } |
| } |
| |
| export function finalize(state) { |
| return serialize(state).serialized; |
| } |
| """; |