| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.commons.udf.builtin; |
| |
| import org.apache.iotdb.commons.exception.MetadataException; |
| import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer; |
| import org.apache.iotdb.udf.api.access.Row; |
| import org.apache.iotdb.udf.api.access.RowWindow; |
| import org.apache.iotdb.udf.api.collector.PointCollector; |
| import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations; |
| import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator; |
| import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters; |
| import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy; |
| import org.apache.iotdb.udf.api.exception.UDFException; |
| import org.apache.iotdb.udf.api.exception.UDFInputSeriesDataTypeNotValidException; |
| import org.apache.iotdb.udf.api.exception.UDFParameterNotValidException; |
| import org.apache.iotdb.udf.api.type.Type; |
| |
| import org.apache.tsfile.utils.Pair; |
| |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.Comparator; |
| import java.util.PriorityQueue; |
| |
| public class UDTFEqualSizeBucketOutlierSample extends UDTFEqualSizeBucketSample { |
| |
| private String type; |
| private int number; |
| private OutlierSampler outlierSampler; |
| |
| private interface OutlierSampler { |
| |
| void outlierSampleInt(RowWindow rowWindow, PointCollector collector) throws IOException; |
| |
| void outlierSampleLong(RowWindow rowWindow, PointCollector collector) throws IOException; |
| |
| void outlierSampleFloat(RowWindow rowWindow, PointCollector collector) throws IOException; |
| |
| void outlierSampleDouble(RowWindow rowWindow, PointCollector collector) throws IOException; |
| } |
| |
| private class AvgOutlierSampler implements OutlierSampler { |
| |
| @Override |
| public void outlierSampleInt(RowWindow rowWindow, PointCollector collector) throws IOException { |
| int windowSize = rowWindow.windowSize(); |
| if (windowSize <= number) { |
| for (int i = 0; i < windowSize; i++) { |
| Row row = rowWindow.getRow(i); |
| collector.putInt(row.getTime(), row.getInt(0)); |
| } |
| return; |
| } |
| |
| double avg = 0; |
| PriorityQueue<Pair<Integer, Double>> pq = |
| new PriorityQueue<>(number, Comparator.comparing(o -> o.right)); |
| for (int i = 0; i < windowSize; i++) { |
| avg += rowWindow.getRow(i).getInt(0); |
| } |
| avg /= windowSize; |
| for (int i = 0; i < windowSize; i++) { |
| double value = Math.abs(rowWindow.getRow(i).getInt(0) - avg); |
| addToMinHeap(pq, i, value); |
| } |
| |
| putPQValueInt(pq, rowWindow, collector); |
| } |
| |
| @Override |
| public void outlierSampleLong(RowWindow rowWindow, PointCollector collector) |
| throws IOException { |
| int windowSize = rowWindow.windowSize(); |
| if (windowSize <= number) { |
| for (int i = 0; i < windowSize; i++) { |
| Row row = rowWindow.getRow(i); |
| collector.putLong(row.getTime(), row.getLong(0)); |
| } |
| return; |
| } |
| |
| double avg = 0; |
| PriorityQueue<Pair<Integer, Double>> pq = |
| new PriorityQueue<>(number, Comparator.comparing(o -> o.right)); |
| for (int i = 0; i < windowSize; i++) { |
| avg += rowWindow.getRow(i).getLong(0); |
| } |
| avg /= windowSize; |
| for (int i = 0; i < windowSize; i++) { |
| double value = Math.abs(rowWindow.getRow(i).getLong(0) - avg); |
| addToMinHeap(pq, i, value); |
| } |
| |
| putPQValueLong(pq, rowWindow, collector); |
| } |
| |
| @Override |
| public void outlierSampleFloat(RowWindow rowWindow, PointCollector collector) |
| throws IOException { |
| int windowSize = rowWindow.windowSize(); |
| if (windowSize <= number) { |
| for (int i = 0; i < windowSize; i++) { |
| Row row = rowWindow.getRow(i); |
| collector.putFloat(row.getTime(), row.getFloat(0)); |
| } |
| return; |
| } |
| |
| double avg = 0; |
| PriorityQueue<Pair<Integer, Double>> pq = |
| new PriorityQueue<>(number, Comparator.comparing(o -> o.right)); |
| for (int i = 0; i < windowSize; i++) { |
| avg += rowWindow.getRow(i).getFloat(0); |
| } |
| avg /= windowSize; |
| for (int i = 0; i < windowSize; i++) { |
| double value = Math.abs(rowWindow.getRow(i).getFloat(0) - avg); |
| addToMinHeap(pq, i, value); |
| } |
| |
| putPQValueFloat(pq, rowWindow, collector); |
| } |
| |
| @Override |
| public void outlierSampleDouble(RowWindow rowWindow, PointCollector collector) |
| throws IOException { |
| int windowSize = rowWindow.windowSize(); |
| if (windowSize <= number) { |
| for (int i = 0; i < windowSize; i++) { |
| Row row = rowWindow.getRow(i); |
| collector.putDouble(row.getTime(), row.getDouble(0)); |
| } |
| return; |
| } |
| |
| double avg = 0; |
| PriorityQueue<Pair<Integer, Double>> pq = |
| new PriorityQueue<>(number, Comparator.comparing(o -> o.right)); |
| for (int i = 0; i < windowSize; i++) { |
| avg += rowWindow.getRow(i).getDouble(0); |
| } |
| avg /= windowSize; |
| for (int i = 0; i < windowSize; i++) { |
| double value = Math.abs(rowWindow.getRow(i).getDouble(0) - avg); |
| addToMinHeap(pq, i, value); |
| } |
| |
| putPQValueDouble(pq, rowWindow, collector); |
| } |
| } |
| |
| private class StendisOutlierSampler implements OutlierSampler { |
| |
| @Override |
| public void outlierSampleInt(RowWindow rowWindow, PointCollector collector) throws IOException { |
| int windowSize = rowWindow.windowSize(); |
| if (isWindowSizeTooSmallInt(rowWindow, collector, windowSize)) { |
| return; |
| } |
| |
| long row0x = rowWindow.getRow(0).getTime(), |
| row1x = rowWindow.getRow(windowSize - 1).getTime(); |
| int row0y = rowWindow.getRow(0).getInt(0), row1y = rowWindow.getRow(windowSize - 1).getInt(0); |
| |
| PriorityQueue<Pair<Integer, Double>> pq = |
| new PriorityQueue<>(number, Comparator.comparing(o -> o.right)); |
| |
| double A = (double) row0y - row1y; |
| double B = (double) row1x - row0x; |
| double C = (double) row0x * row1y - row1x * row0y; |
| double denominator = Math.sqrt(A * A + B * B); |
| |
| for (int i = 1; i < windowSize - 1; i++) { |
| Row row = rowWindow.getRow(i); |
| double value = Math.abs(A * row.getTime() + B * row.getInt(0) + C) / denominator; |
| addToMinHeap(pq, i, value); |
| } |
| |
| putPQValueInt(pq, rowWindow, collector); |
| } |
| |
| @Override |
| public void outlierSampleLong(RowWindow rowWindow, PointCollector collector) |
| throws IOException { |
| int windowSize = rowWindow.windowSize(); |
| if (isWindowSizeTooSmallLong(rowWindow, collector, windowSize)) { |
| return; |
| } |
| |
| long row0x = rowWindow.getRow(0).getTime(), |
| row1x = rowWindow.getRow(windowSize - 1).getTime(); |
| long row0y = rowWindow.getRow(0).getLong(0), |
| row1y = rowWindow.getRow(windowSize - 1).getLong(0); |
| |
| PriorityQueue<Pair<Integer, Double>> pq = |
| new PriorityQueue<>(number, Comparator.comparing(o -> o.right)); |
| |
| double A = (double) row0y - row1y; |
| double B = (double) row1x - row0x; |
| double C = (double) row0x * row1y - row1x * row0y; |
| double denominator = Math.sqrt(A * A + B * B); |
| |
| for (int i = 1; i < windowSize - 1; i++) { |
| Row row = rowWindow.getRow(i); |
| double value = Math.abs(A * row.getTime() + B * row.getLong(0) + C) / denominator; |
| addToMinHeap(pq, i, value); |
| } |
| |
| putPQValueLong(pq, rowWindow, collector); |
| } |
| |
| @Override |
| public void outlierSampleFloat(RowWindow rowWindow, PointCollector collector) |
| throws IOException { |
| int windowSize = rowWindow.windowSize(); |
| if (isWindowSizeTooSmallFloat(rowWindow, collector, windowSize)) { |
| return; |
| } |
| |
| long row0x = rowWindow.getRow(0).getTime(), |
| row1x = rowWindow.getRow(windowSize - 1).getTime(); |
| float row0y = rowWindow.getRow(0).getFloat(0), |
| row1y = rowWindow.getRow(windowSize - 1).getFloat(0); |
| |
| PriorityQueue<Pair<Integer, Double>> pq = |
| new PriorityQueue<>(number, Comparator.comparing(o -> o.right)); |
| |
| double A = (double) row0y - row1y; |
| double B = (double) row1x - row0x; |
| double C = (double) row0x * row1y - row1x * row0y; |
| double denominator = Math.sqrt(A * A + B * B); |
| |
| for (int i = 1; i < windowSize - 1; i++) { |
| Row row = rowWindow.getRow(i); |
| double value = Math.abs(A * row.getTime() + B * row.getFloat(0) + C) / denominator; |
| addToMinHeap(pq, i, value); |
| } |
| |
| putPQValueFloat(pq, rowWindow, collector); |
| } |
| |
| @Override |
| public void outlierSampleDouble(RowWindow rowWindow, PointCollector collector) |
| throws IOException { |
| int windowSize = rowWindow.windowSize(); |
| if (isWindowSizeTooSmallDouble(rowWindow, collector, windowSize)) { |
| return; |
| } |
| |
| long row0x = rowWindow.getRow(0).getTime(), |
| row1x = rowWindow.getRow(windowSize - 1).getTime(); |
| double row0y = rowWindow.getRow(0).getDouble(0), |
| row1y = rowWindow.getRow(windowSize - 1).getDouble(0); |
| |
| PriorityQueue<Pair<Integer, Double>> pq = |
| new PriorityQueue<>(number, Comparator.comparing(o -> o.right)); |
| |
| double A = row0y - row1y; |
| double B = (double) row1x - row0x; |
| double C = row0x * row1y - row1x * row0y; |
| double denominator = Math.sqrt(A * A + B * B); |
| |
| for (int i = 1; i < windowSize - 1; i++) { |
| Row row = rowWindow.getRow(i); |
| double value = Math.abs(A * row.getTime() + B * row.getDouble(0) + C) / denominator; |
| addToMinHeap(pq, i, value); |
| } |
| |
| putPQValueDouble(pq, rowWindow, collector); |
| } |
| } |
| |
| private class CosOutlierSampler implements OutlierSampler { |
| |
| @Override |
| public void outlierSampleInt(RowWindow rowWindow, PointCollector collector) throws IOException { |
| int windowSize = rowWindow.windowSize(); |
| if (isWindowSizeTooSmallInt(rowWindow, collector, windowSize)) { |
| return; |
| } |
| |
| // o -> -o.right, max heap |
| PriorityQueue<Pair<Integer, Double>> pq = |
| new PriorityQueue<>(number, Comparator.comparing(o -> -o.right)); |
| |
| long lastTime, currentTime, nextTime, x1, x2; |
| int lastValue, currentValue, nextValue, y1, y2; |
| double value; |
| |
| for (int i = 1; i < windowSize - 1; i++) { |
| lastTime = rowWindow.getRow(i - 1).getTime(); |
| currentTime = rowWindow.getRow(i).getTime(); |
| nextTime = rowWindow.getRow(i + 1).getTime(); |
| |
| lastValue = rowWindow.getRow(i - 1).getInt(0); |
| currentValue = rowWindow.getRow(i).getInt(0); |
| nextValue = rowWindow.getRow(i + 1).getInt(0); |
| |
| x1 = currentTime - lastTime; |
| x2 = nextTime - currentTime; |
| y1 = currentValue - lastValue; |
| y2 = nextValue - currentValue; |
| |
| value = |
| (x1 * x2 + y1 * y2) |
| / (Math.sqrt((double) x1 * x1 + y1 * y1) * Math.sqrt((double) x2 * x2 + y2 * y2)); |
| |
| addToMaxHeap(pq, i, value); |
| } |
| |
| putPQValueInt(pq, rowWindow, collector); |
| } |
| |
| @Override |
| public void outlierSampleLong(RowWindow rowWindow, PointCollector collector) |
| throws IOException { |
| int windowSize = rowWindow.windowSize(); |
| if (isWindowSizeTooSmallLong(rowWindow, collector, windowSize)) { |
| return; |
| } |
| |
| // o -> -o.right, max heap |
| PriorityQueue<Pair<Integer, Double>> pq = |
| new PriorityQueue<>(number, Comparator.comparing(o -> -o.right)); |
| |
| long lastTime, currentTime, nextTime, x1, x2; |
| long lastValue, currentValue, nextValue, y1, y2; |
| double value; |
| |
| for (int i = 1; i < windowSize - 1; i++) { |
| lastTime = rowWindow.getRow(i - 1).getTime(); |
| currentTime = rowWindow.getRow(i).getTime(); |
| nextTime = rowWindow.getRow(i + 1).getTime(); |
| |
| lastValue = rowWindow.getRow(i - 1).getLong(0); |
| currentValue = rowWindow.getRow(i).getLong(0); |
| nextValue = rowWindow.getRow(i + 1).getLong(0); |
| |
| x1 = currentTime - lastTime; |
| x2 = nextTime - currentTime; |
| y1 = currentValue - lastValue; |
| y2 = nextValue - currentValue; |
| |
| value = |
| (x1 * x2 + y1 * y2) |
| / (Math.sqrt((double) x1 * x1 + y1 * y1) * Math.sqrt((double) x2 * x2 + y2 * y2)); |
| |
| addToMaxHeap(pq, i, value); |
| } |
| |
| putPQValueLong(pq, rowWindow, collector); |
| } |
| |
| @Override |
| public void outlierSampleFloat(RowWindow rowWindow, PointCollector collector) |
| throws IOException { |
| int windowSize = rowWindow.windowSize(); |
| if (isWindowSizeTooSmallFloat(rowWindow, collector, windowSize)) { |
| return; |
| } |
| |
| // o -> -o.right, max heap |
| PriorityQueue<Pair<Integer, Double>> pq = |
| new PriorityQueue<>(number, Comparator.comparing(o -> -o.right)); |
| |
| long lastTime, currentTime, nextTime, x1, x2; |
| float lastValue, currentValue, nextValue, y1, y2; |
| double value; |
| |
| for (int i = 1; i < windowSize - 1; i++) { |
| lastTime = rowWindow.getRow(i - 1).getTime(); |
| currentTime = rowWindow.getRow(i).getTime(); |
| nextTime = rowWindow.getRow(i + 1).getTime(); |
| |
| lastValue = rowWindow.getRow(i - 1).getFloat(0); |
| currentValue = rowWindow.getRow(i).getFloat(0); |
| nextValue = rowWindow.getRow(i + 1).getFloat(0); |
| |
| x1 = currentTime - lastTime; |
| x2 = nextTime - currentTime; |
| y1 = currentValue - lastValue; |
| y2 = nextValue - currentValue; |
| |
| value = (x1 * x2 + y1 * y2) / (Math.sqrt(x1 * x1 + y1 * y1) * Math.sqrt(x2 * x2 + y2 * y2)); |
| |
| addToMaxHeap(pq, i, value); |
| } |
| |
| putPQValueFloat(pq, rowWindow, collector); |
| } |
| |
| @Override |
| public void outlierSampleDouble(RowWindow rowWindow, PointCollector collector) |
| throws IOException { |
| int windowSize = rowWindow.windowSize(); |
| if (isWindowSizeTooSmallDouble(rowWindow, collector, windowSize)) { |
| return; |
| } |
| |
| // o -> -o.right, max heap |
| PriorityQueue<Pair<Integer, Double>> pq = |
| new PriorityQueue<>(number, Comparator.comparing(o -> -o.right)); |
| |
| long lastTime, currentTime, nextTime, x1, x2; |
| double lastValue, currentValue, nextValue, y1, y2; |
| double value; |
| |
| for (int i = 1; i < windowSize - 1; i++) { |
| lastTime = rowWindow.getRow(i - 1).getTime(); |
| currentTime = rowWindow.getRow(i).getTime(); |
| nextTime = rowWindow.getRow(i + 1).getTime(); |
| |
| lastValue = rowWindow.getRow(i - 1).getDouble(0); |
| currentValue = rowWindow.getRow(i).getDouble(0); |
| nextValue = rowWindow.getRow(i + 1).getDouble(0); |
| |
| x1 = currentTime - lastTime; |
| x2 = nextTime - currentTime; |
| y1 = currentValue - lastValue; |
| y2 = nextValue - currentValue; |
| |
| value = (x1 * x2 + y1 * y2) / (Math.sqrt(x1 * x1 + y1 * y1) * Math.sqrt(x2 * x2 + y2 * y2)); |
| |
| addToMaxHeap(pq, i, value); |
| } |
| |
| putPQValueDouble(pq, rowWindow, collector); |
| } |
| } |
| |
| private class PrenextdisOutlierSampler implements OutlierSampler { |
| |
| @Override |
| public void outlierSampleInt(RowWindow rowWindow, PointCollector collector) throws IOException { |
| int windowSize = rowWindow.windowSize(); |
| if (isWindowSizeTooSmallInt(rowWindow, collector, windowSize)) { |
| return; |
| } |
| |
| PriorityQueue<Pair<Integer, Double>> pq = |
| new PriorityQueue<>(number, Comparator.comparing(o -> o.right)); |
| |
| long lastTime, currentTime, nextTime, x1, x2; |
| int lastValue, currentValue, nextValue, y1, y2; |
| double value; |
| |
| for (int i = 1; i < windowSize - 1; i++) { |
| lastTime = rowWindow.getRow(i - 1).getTime(); |
| currentTime = rowWindow.getRow(i).getTime(); |
| nextTime = rowWindow.getRow(i + 1).getTime(); |
| |
| lastValue = rowWindow.getRow(i - 1).getInt(0); |
| currentValue = rowWindow.getRow(i).getInt(0); |
| nextValue = rowWindow.getRow(i + 1).getInt(0); |
| |
| x1 = Math.abs(currentTime - lastTime); |
| x2 = Math.abs(nextTime - currentTime); |
| y1 = Math.abs(currentValue - lastValue); |
| y2 = Math.abs(nextValue - currentValue); |
| |
| value = (double) x1 + y1 + x2 + y2; |
| |
| addToMinHeap(pq, i, value); |
| } |
| |
| putPQValueInt(pq, rowWindow, collector); |
| } |
| |
| @Override |
| public void outlierSampleLong(RowWindow rowWindow, PointCollector collector) |
| throws IOException { |
| int windowSize = rowWindow.windowSize(); |
| if (isWindowSizeTooSmallLong(rowWindow, collector, windowSize)) { |
| return; |
| } |
| |
| PriorityQueue<Pair<Integer, Double>> pq = |
| new PriorityQueue<>(number, Comparator.comparing(o -> o.right)); |
| |
| long lastTime, currentTime, nextTime, x1, x2; |
| long lastValue, currentValue, nextValue, y1, y2; |
| double value; |
| |
| for (int i = 1; i < windowSize - 1; i++) { |
| lastTime = rowWindow.getRow(i - 1).getTime(); |
| currentTime = rowWindow.getRow(i).getTime(); |
| nextTime = rowWindow.getRow(i + 1).getTime(); |
| |
| lastValue = rowWindow.getRow(i - 1).getLong(0); |
| currentValue = rowWindow.getRow(i).getLong(0); |
| nextValue = rowWindow.getRow(i + 1).getLong(0); |
| |
| x1 = Math.abs(currentTime - lastTime); |
| x2 = Math.abs(nextTime - currentTime); |
| y1 = Math.abs(currentValue - lastValue); |
| y2 = Math.abs(nextValue - currentValue); |
| |
| value = (double) x1 + y1 + x2 + y2; |
| |
| addToMinHeap(pq, i, value); |
| } |
| |
| putPQValueLong(pq, rowWindow, collector); |
| } |
| |
| @Override |
| public void outlierSampleFloat(RowWindow rowWindow, PointCollector collector) |
| throws IOException { |
| int windowSize = rowWindow.windowSize(); |
| if (isWindowSizeTooSmallFloat(rowWindow, collector, windowSize)) { |
| return; |
| } |
| |
| PriorityQueue<Pair<Integer, Double>> pq = |
| new PriorityQueue<>(number, Comparator.comparing(o -> o.right)); |
| |
| long lastTime, currentTime, nextTime, x1, x2; |
| float lastValue, currentValue, nextValue, y1, y2; |
| double value; |
| |
| for (int i = 1; i < windowSize - 1; i++) { |
| lastTime = rowWindow.getRow(i - 1).getTime(); |
| currentTime = rowWindow.getRow(i).getTime(); |
| nextTime = rowWindow.getRow(i + 1).getTime(); |
| |
| lastValue = rowWindow.getRow(i - 1).getFloat(0); |
| currentValue = rowWindow.getRow(i).getFloat(0); |
| nextValue = rowWindow.getRow(i + 1).getFloat(0); |
| |
| x1 = Math.abs(currentTime - lastTime); |
| x2 = Math.abs(nextTime - currentTime); |
| y1 = Math.abs(currentValue - lastValue); |
| y2 = Math.abs(nextValue - currentValue); |
| |
| value = x1 + y1 + x2 + y2; |
| |
| addToMinHeap(pq, i, value); |
| } |
| |
| putPQValueFloat(pq, rowWindow, collector); |
| } |
| |
| @Override |
| public void outlierSampleDouble(RowWindow rowWindow, PointCollector collector) |
| throws IOException { |
| int windowSize = rowWindow.windowSize(); |
| if (isWindowSizeTooSmallDouble(rowWindow, collector, windowSize)) { |
| return; |
| } |
| |
| PriorityQueue<Pair<Integer, Double>> pq = |
| new PriorityQueue<>(number, Comparator.comparing(o -> o.right)); |
| |
| long lastTime, currentTime, nextTime, x1, x2; |
| double lastValue, currentValue, nextValue, y1, y2; |
| double value; |
| |
| for (int i = 1; i < windowSize - 1; i++) { |
| lastTime = rowWindow.getRow(i - 1).getTime(); |
| currentTime = rowWindow.getRow(i).getTime(); |
| nextTime = rowWindow.getRow(i + 1).getTime(); |
| |
| lastValue = rowWindow.getRow(i - 1).getDouble(0); |
| currentValue = rowWindow.getRow(i).getDouble(0); |
| nextValue = rowWindow.getRow(i + 1).getDouble(0); |
| |
| x1 = Math.abs(currentTime - lastTime); |
| x2 = Math.abs(nextTime - currentTime); |
| y1 = Math.abs(currentValue - lastValue); |
| y2 = Math.abs(nextValue - currentValue); |
| |
| value = x1 + y1 + x2 + y2; |
| |
| addToMinHeap(pq, i, value); |
| } |
| |
| putPQValueDouble(pq, rowWindow, collector); |
| } |
| } |
| |
| @Override |
| public void validate(UDFParameterValidator validator) throws UDFException, MetadataException { |
| super.validate(validator); |
| type = validator.getParameters().getStringOrDefault("type", "avg").toLowerCase(); |
| number = validator.getParameters().getIntOrDefault("number", 3); |
| validator |
| .validate( |
| type -> |
| "avg".equals(type) |
| || "stendis".equals(type) |
| || "cos".equals(type) |
| || "prenextdis".equals(type), |
| "Illegal outlier method. Outlier type should be avg, stendis, cos or prenextdis.", |
| type) |
| .validate( |
| number -> (int) number > 0, "Illegal number. Number should be greater than 0.", number); |
| } |
| |
| @Override |
| public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) |
| throws Exception { |
| bucketSize *= number; |
| configurations |
| .setAccessStrategy(new SlidingSizeWindowAccessStrategy(bucketSize)) |
| .setOutputDataType(UDFDataTypeTransformer.transformToUDFDataType(dataType)); |
| switch (type) { |
| case "avg": |
| outlierSampler = new AvgOutlierSampler(); |
| break; |
| case "stendis": |
| outlierSampler = new StendisOutlierSampler(); |
| break; |
| case "cos": |
| outlierSampler = new CosOutlierSampler(); |
| break; |
| case "prenextdis": |
| outlierSampler = new PrenextdisOutlierSampler(); |
| break; |
| default: |
| throw new UDFParameterNotValidException( |
| "Illegal outlier method. Outlier type should be avg, stendis, cos or prenextdis."); |
| } |
| } |
| |
| @Override |
| public void transform(RowWindow rowWindow, PointCollector collector) |
| throws IOException, UDFParameterNotValidException { |
| switch (dataType) { |
| case INT32: |
| outlierSampler.outlierSampleInt(rowWindow, collector); |
| break; |
| case INT64: |
| outlierSampler.outlierSampleLong(rowWindow, collector); |
| break; |
| case FLOAT: |
| outlierSampler.outlierSampleFloat(rowWindow, collector); |
| break; |
| case DOUBLE: |
| outlierSampler.outlierSampleDouble(rowWindow, collector); |
| break; |
| default: |
| // This will not happen |
| throw new UDFInputSeriesDataTypeNotValidException( |
| 0, |
| UDFDataTypeTransformer.transformToUDFDataType(dataType), |
| Type.INT32, |
| Type.INT64, |
| Type.FLOAT, |
| Type.DOUBLE); |
| } |
| } |
| |
| public void addToMinHeap(PriorityQueue<Pair<Integer, Double>> pq, int i, double value) { |
| if (pq.size() < number) { |
| pq.add(new Pair<>(i, value)); |
| } else if (value > pq.peek().right) { |
| pq.poll(); |
| pq.add(new Pair<>(i, value)); |
| } |
| } |
| |
| public void addToMaxHeap(PriorityQueue<Pair<Integer, Double>> pq, int i, double value) { |
| if (pq.size() < number) { |
| pq.add(new Pair<>(i, value)); |
| } else if (value < pq.peek().right) { |
| pq.poll(); |
| pq.add(new Pair<>(i, value)); |
| } |
| } |
| |
| public void putPQValueInt( |
| PriorityQueue<Pair<Integer, Double>> pq, RowWindow rowWindow, PointCollector collector) |
| throws IOException { |
| int[] arr = new int[number]; |
| for (int i = 0; i < number; i++) { |
| arr[i] = pq.peek().left; |
| pq.poll(); |
| } |
| Arrays.sort(arr); |
| for (int i = 0; i < number; i++) { |
| collector.putInt(rowWindow.getRow(arr[i]).getTime(), rowWindow.getRow(arr[i]).getInt(0)); |
| } |
| } |
| |
| public void putPQValueLong( |
| PriorityQueue<Pair<Integer, Double>> pq, RowWindow rowWindow, PointCollector collector) |
| throws IOException { |
| int[] arr = new int[number]; |
| for (int i = 0; i < number; i++) { |
| arr[i] = pq.peek().left; |
| pq.poll(); |
| } |
| Arrays.sort(arr); |
| for (int i = 0; i < number; i++) { |
| collector.putLong(rowWindow.getRow(arr[i]).getTime(), rowWindow.getRow(arr[i]).getLong(0)); |
| } |
| } |
| |
| public void putPQValueFloat( |
| PriorityQueue<Pair<Integer, Double>> pq, RowWindow rowWindow, PointCollector collector) |
| throws IOException { |
| int[] arr = new int[number]; |
| for (int i = 0; i < number; i++) { |
| arr[i] = pq.peek().left; |
| pq.poll(); |
| } |
| Arrays.sort(arr); |
| for (int i = 0; i < number; i++) { |
| collector.putFloat(rowWindow.getRow(arr[i]).getTime(), rowWindow.getRow(arr[i]).getFloat(0)); |
| } |
| } |
| |
| public void putPQValueDouble( |
| PriorityQueue<Pair<Integer, Double>> pq, RowWindow rowWindow, PointCollector collector) |
| throws IOException { |
| int[] arr = new int[number]; |
| for (int i = 0; i < number; i++) { |
| arr[i] = pq.peek().left; |
| pq.poll(); |
| } |
| Arrays.sort(arr); |
| for (int i = 0; i < number; i++) { |
| collector.putDouble( |
| rowWindow.getRow(arr[i]).getTime(), rowWindow.getRow(arr[i]).getDouble(0)); |
| } |
| } |
| |
| public boolean isWindowSizeTooSmallInt( |
| RowWindow rowWindow, PointCollector collector, int windowSize) throws IOException { |
| if (windowSize <= number) { |
| for (int i = 0; i < windowSize; i++) { |
| Row row = rowWindow.getRow(i); |
| collector.putInt(row.getTime(), row.getInt(0)); |
| } |
| return true; |
| } else if (windowSize == number + 1) { |
| for (int i = 0; i < windowSize - 1; i++) { |
| Row row = rowWindow.getRow(i); |
| collector.putInt(row.getTime(), row.getInt(0)); |
| } |
| return true; |
| } else if (windowSize == number + 2) { |
| for (int i = 1; i < windowSize - 1; i++) { |
| Row row = rowWindow.getRow(i); |
| collector.putInt(row.getTime(), row.getInt(0)); |
| } |
| return true; |
| } |
| return false; |
| } |
| |
| public boolean isWindowSizeTooSmallLong( |
| RowWindow rowWindow, PointCollector collector, int windowSize) throws IOException { |
| if (windowSize <= number) { |
| for (int i = 0; i < windowSize; i++) { |
| Row row = rowWindow.getRow(i); |
| collector.putLong(row.getTime(), row.getLong(0)); |
| } |
| return true; |
| } else if (windowSize == number + 1) { |
| for (int i = 0; i < windowSize - 1; i++) { |
| Row row = rowWindow.getRow(i); |
| collector.putLong(row.getTime(), row.getLong(0)); |
| } |
| return true; |
| } else if (windowSize == number + 2) { |
| for (int i = 1; i < windowSize - 1; i++) { |
| Row row = rowWindow.getRow(i); |
| collector.putLong(row.getTime(), row.getLong(0)); |
| } |
| return true; |
| } |
| return false; |
| } |
| |
| public boolean isWindowSizeTooSmallFloat( |
| RowWindow rowWindow, PointCollector collector, int windowSize) throws IOException { |
| if (windowSize <= number) { |
| for (int i = 0; i < windowSize; i++) { |
| Row row = rowWindow.getRow(i); |
| collector.putFloat(row.getTime(), row.getFloat(0)); |
| } |
| return true; |
| } else if (windowSize == number + 1) { |
| for (int i = 0; i < windowSize - 1; i++) { |
| Row row = rowWindow.getRow(i); |
| collector.putFloat(row.getTime(), row.getFloat(0)); |
| } |
| return true; |
| } else if (windowSize == number + 2) { |
| for (int i = 1; i < windowSize - 1; i++) { |
| Row row = rowWindow.getRow(i); |
| collector.putFloat(row.getTime(), row.getFloat(0)); |
| } |
| return true; |
| } |
| return false; |
| } |
| |
| public boolean isWindowSizeTooSmallDouble( |
| RowWindow rowWindow, PointCollector collector, int windowSize) throws IOException { |
| if (windowSize <= number) { |
| for (int i = 0; i < windowSize; i++) { |
| Row row = rowWindow.getRow(i); |
| collector.putDouble(row.getTime(), row.getDouble(0)); |
| } |
| return true; |
| } else if (windowSize == number + 1) { |
| for (int i = 0; i < windowSize - 1; i++) { |
| Row row = rowWindow.getRow(i); |
| collector.putDouble(row.getTime(), row.getDouble(0)); |
| } |
| return true; |
| } else if (windowSize == number + 2) { |
| for (int i = 1; i < windowSize - 1; i++) { |
| Row row = rowWindow.getRow(i); |
| collector.putDouble(row.getTime(), row.getDouble(0)); |
| } |
| return true; |
| } |
| return false; |
| } |
| } |