blob: 8139b5e6ee9f6214dfb1f67055b4aa7aa8358eca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.pipe.processor.aggregate.window.datastructure;
import org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.AggregatedResultOperator;
import org.apache.iotdb.db.pipe.processor.aggregate.operator.intermediateresult.CustomizedReadableIntermediateResults;
import org.apache.iotdb.db.pipe.processor.aggregate.operator.intermediateresult.IntermediateResultOperator;
import org.apache.iotdb.db.pipe.processor.aggregate.window.processor.AbstractWindowingProcessor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.stream.Collectors;
public class TimeSeriesWindow {
private static final Logger LOGGER = LoggerFactory.getLogger(TimeSeriesWindow.class);
// A window is typically with a timestamp. We define it here to avoid
// boxing/unboxing and simplify the logics.
private long timestamp = 0;
private Map<String, AggregatedResultOperator> aggregatedOutputName2OperatorMap;
private final Map<String, Pair<TSDataType, IntermediateResultOperator>>
intermediateResultName2tsTypeAndOperatorMap = new HashMap<>();
// WARNING: Using the customized runtime value may cause performance loss
// due to boxing/unboxing issues.
private Object customizedRuntimeValue;
private final AbstractWindowingProcessor processor;
public TimeSeriesWindow(
final AbstractWindowingProcessor processor, final Object customizedRuntimeValue) {
this.processor = processor;
this.customizedRuntimeValue = customizedRuntimeValue;
}
/////////////////////////////// Getter/Setters for WindowProcessor ///////////////////////////////
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(final long timestamp) {
this.timestamp = timestamp;
}
public Object getCustomizedRuntimeValue() {
return customizedRuntimeValue;
}
public void setCustomizedRuntimeValue(final Object customizedRuntimeValue) {
this.customizedRuntimeValue = customizedRuntimeValue;
}
/////////////////////////////// Calculation ///////////////////////////////
public void initWindow(
final Map<String, Supplier<IntermediateResultOperator>>
intermediateResult2OperatorSupplierMap,
final Map<String, AggregatedResultOperator> aggregatedResultOperatorMap,
final Map<String, String> systemParameters) {
for (final Map.Entry<String, Supplier<IntermediateResultOperator>> entry :
intermediateResult2OperatorSupplierMap.entrySet()) {
intermediateResultName2tsTypeAndOperatorMap.put(
entry.getKey(), new Pair<>(TSDataType.UNKNOWN, entry.getValue().get()));
}
// Deep copy because some unsupported aggregated results may be removed
this.aggregatedOutputName2OperatorMap = new HashMap<>(aggregatedResultOperatorMap);
// Configure system parameters
this.intermediateResultName2tsTypeAndOperatorMap.values().stream()
.map(Pair::getRight)
.forEach(operator -> operator.configureSystemParameters(systemParameters));
}
// Return the output and state of the window.
// Return null if the state is normal to avoid boxing.
public Pair<WindowState, WindowOutput> updateIntermediateResult(
final long timestamp, final boolean value) {
final Pair<WindowState, WindowOutput> stateOutputPair =
processor.updateAndMaySetWindowState(this, timestamp, value);
final WindowState state = stateOutputPair.getLeft();
if (state.isEmitWithoutCompute()) {
stateOutputPair.getRight().setAggregatedResults(getAggregatedResults(TSDataType.BOOLEAN));
}
if (state.isCalculate()) {
final Iterator<Map.Entry<String, Pair<TSDataType, IntermediateResultOperator>>> iterator =
intermediateResultName2tsTypeAndOperatorMap.entrySet().iterator();
Map.Entry<String, Pair<TSDataType, IntermediateResultOperator>> entry;
while (iterator.hasNext()) {
entry = iterator.next();
final IntermediateResultOperator operator = entry.getValue().getRight();
if (entry.getValue().getLeft() == TSDataType.UNKNOWN) {
if (!operator.initAndGetIsSupport(value, timestamp)) {
// Remove unsupported aggregated results
aggregatedOutputName2OperatorMap
.entrySet()
.removeIf(
entry1 ->
entry1
.getValue()
.getDeclaredIntermediateValueNames()
.contains(operator.getName()));
// If no aggregated values can be calculated, purge the window
if (aggregatedOutputName2OperatorMap.isEmpty()) {
return new Pair<>(WindowState.PURGE, null);
}
// Remove unsupported intermediate values
iterator.remove();
continue;
}
entry.getValue().setLeft(TSDataType.BOOLEAN);
} else if (entry.getValue().getLeft() != TSDataType.BOOLEAN) {
LOGGER.warn(
"Different data type encountered in one window, will purge. Previous type: {}, now type: {}",
entry.getValue().getLeft(),
TSDataType.BOOLEAN);
return new Pair<>(WindowState.PURGE, null);
} else {
operator.updateValue(value, timestamp);
}
}
}
if (state.isEmitWithCompute()) {
stateOutputPair.getRight().setAggregatedResults(getAggregatedResults(TSDataType.BOOLEAN));
}
return state.isEmit() ? stateOutputPair : null;
}
// The same logic is repeated because java does not support basic type template :-)
public Pair<WindowState, WindowOutput> updateIntermediateResult(
final long timestamp, final int value) {
final Pair<WindowState, WindowOutput> stateOutputPair =
processor.updateAndMaySetWindowState(this, timestamp, value);
final WindowState state = stateOutputPair.getLeft();
if (state.isEmitWithoutCompute()) {
stateOutputPair.getRight().setAggregatedResults(getAggregatedResults(TSDataType.INT32));
}
if (state.isCalculate()) {
final Iterator<Map.Entry<String, Pair<TSDataType, IntermediateResultOperator>>> iterator =
intermediateResultName2tsTypeAndOperatorMap.entrySet().iterator();
Map.Entry<String, Pair<TSDataType, IntermediateResultOperator>> entry;
while (iterator.hasNext()) {
entry = iterator.next();
final IntermediateResultOperator operator = entry.getValue().getRight();
if (entry.getValue().getLeft() == TSDataType.UNKNOWN) {
if (!operator.initAndGetIsSupport(value, timestamp)) {
// Remove unsupported aggregated results
aggregatedOutputName2OperatorMap
.entrySet()
.removeIf(
entry1 ->
entry1
.getValue()
.getDeclaredIntermediateValueNames()
.contains(operator.getName()));
// If no aggregated values can be calculated, purge the window
if (aggregatedOutputName2OperatorMap.isEmpty()) {
return new Pair<>(WindowState.PURGE, null);
}
// Remove unsupported intermediate values
iterator.remove();
continue;
}
entry.getValue().setLeft(TSDataType.INT32);
} else if (entry.getValue().getLeft() != TSDataType.INT32) {
LOGGER.warn(
"Different data type encountered in one window, will purge. Previous type: {}, now type: {}",
entry.getValue().getLeft(),
TSDataType.INT32);
return new Pair<>(WindowState.PURGE, null);
} else {
operator.updateValue(value, timestamp);
}
}
}
if (state.isEmitWithCompute()) {
stateOutputPair.getRight().setAggregatedResults(getAggregatedResults(TSDataType.INT32));
}
return state.isEmit() ? stateOutputPair : null;
}
public Pair<WindowState, WindowOutput> updateIntermediateResult(
final long timestamp, final long value) {
final Pair<WindowState, WindowOutput> stateOutputPair =
processor.updateAndMaySetWindowState(this, timestamp, value);
final WindowState state = stateOutputPair.getLeft();
if (state.isEmitWithoutCompute()) {
stateOutputPair.getRight().setAggregatedResults(getAggregatedResults(TSDataType.INT64));
}
if (state.isCalculate()) {
final Iterator<Map.Entry<String, Pair<TSDataType, IntermediateResultOperator>>> iterator =
intermediateResultName2tsTypeAndOperatorMap.entrySet().iterator();
Map.Entry<String, Pair<TSDataType, IntermediateResultOperator>> entry;
while (iterator.hasNext()) {
entry = iterator.next();
final IntermediateResultOperator operator = entry.getValue().getRight();
if (entry.getValue().getLeft() == TSDataType.UNKNOWN) {
if (!operator.initAndGetIsSupport(value, timestamp)) {
// Remove unsupported aggregated results
aggregatedOutputName2OperatorMap
.entrySet()
.removeIf(
entry1 ->
entry1
.getValue()
.getDeclaredIntermediateValueNames()
.contains(operator.getName()));
// If no aggregated values can be calculated, purge the window
if (aggregatedOutputName2OperatorMap.isEmpty()) {
return new Pair<>(WindowState.PURGE, null);
}
// Remove unsupported intermediate values
iterator.remove();
continue;
}
entry.getValue().setLeft(TSDataType.INT64);
} else if (entry.getValue().getLeft() != TSDataType.INT64) {
LOGGER.warn(
"Different data type encountered in one window, will purge. Previous type: {}, now type: {}",
entry.getValue().getLeft(),
TSDataType.INT64);
return new Pair<>(WindowState.PURGE, null);
} else {
operator.updateValue(value, timestamp);
}
}
}
if (state.isEmitWithCompute()) {
stateOutputPair.getRight().setAggregatedResults(getAggregatedResults(TSDataType.INT64));
}
return state.isEmit() ? stateOutputPair : null;
}
public Pair<WindowState, WindowOutput> updateIntermediateResult(
final long timestamp, final float value) {
final Pair<WindowState, WindowOutput> stateOutputPair =
processor.updateAndMaySetWindowState(this, timestamp, value);
final WindowState state = stateOutputPair.getLeft();
if (state.isEmitWithoutCompute()) {
stateOutputPair.getRight().setAggregatedResults(getAggregatedResults(TSDataType.FLOAT));
}
if (state.isCalculate()) {
final Iterator<Map.Entry<String, Pair<TSDataType, IntermediateResultOperator>>> iterator =
intermediateResultName2tsTypeAndOperatorMap.entrySet().iterator();
Map.Entry<String, Pair<TSDataType, IntermediateResultOperator>> entry;
while (iterator.hasNext()) {
entry = iterator.next();
final IntermediateResultOperator operator = entry.getValue().getRight();
if (entry.getValue().getLeft() == TSDataType.UNKNOWN) {
if (!operator.initAndGetIsSupport(value, timestamp)) {
// Remove unsupported aggregated results
aggregatedOutputName2OperatorMap
.entrySet()
.removeIf(
entry1 ->
entry1
.getValue()
.getDeclaredIntermediateValueNames()
.contains(operator.getName()));
// If no aggregated values can be calculated, purge the window
if (aggregatedOutputName2OperatorMap.isEmpty()) {
return new Pair<>(WindowState.PURGE, null);
}
// Remove unsupported intermediate values
iterator.remove();
continue;
}
entry.getValue().setLeft(TSDataType.FLOAT);
} else if (entry.getValue().getLeft() != TSDataType.FLOAT) {
LOGGER.warn(
"Different data type encountered in one window, will purge. Previous type: {}, now type: {}",
entry.getValue().getLeft(),
TSDataType.FLOAT);
return new Pair<>(WindowState.PURGE, null);
} else {
operator.updateValue(value, timestamp);
}
}
}
if (state.isEmitWithCompute()) {
stateOutputPair.getRight().setAggregatedResults(getAggregatedResults(TSDataType.FLOAT));
}
return state.isEmit() ? stateOutputPair : null;
}
public Pair<WindowState, WindowOutput> updateIntermediateResult(
final long timestamp, final double value) {
final Pair<WindowState, WindowOutput> stateOutputPair =
processor.updateAndMaySetWindowState(this, timestamp, value);
final WindowState state = stateOutputPair.getLeft();
if (state.isEmitWithoutCompute()) {
stateOutputPair.getRight().setAggregatedResults(getAggregatedResults(TSDataType.DOUBLE));
}
if (state.isCalculate()) {
final Iterator<Map.Entry<String, Pair<TSDataType, IntermediateResultOperator>>> iterator =
intermediateResultName2tsTypeAndOperatorMap.entrySet().iterator();
Map.Entry<String, Pair<TSDataType, IntermediateResultOperator>> entry;
while (iterator.hasNext()) {
entry = iterator.next();
final IntermediateResultOperator operator = entry.getValue().getRight();
if (entry.getValue().getLeft() == TSDataType.UNKNOWN) {
if (!operator.initAndGetIsSupport(value, timestamp)) {
// Remove unsupported aggregated results
aggregatedOutputName2OperatorMap
.entrySet()
.removeIf(
entry1 ->
entry1
.getValue()
.getDeclaredIntermediateValueNames()
.contains(operator.getName()));
// If no aggregated values can be calculated, purge the window
if (aggregatedOutputName2OperatorMap.isEmpty()) {
return new Pair<>(WindowState.PURGE, null);
}
// Remove unsupported intermediate values
iterator.remove();
continue;
}
entry.getValue().setLeft(TSDataType.DOUBLE);
} else if (entry.getValue().getLeft() != TSDataType.DOUBLE) {
LOGGER.warn(
"Different data type encountered in one window, will purge. Previous type: {}, now type: {}",
entry.getValue().getLeft(),
TSDataType.DOUBLE);
return new Pair<>(WindowState.PURGE, null);
} else {
operator.updateValue(value, timestamp);
}
}
}
if (state.isEmitWithCompute()) {
stateOutputPair.getRight().setAggregatedResults(getAggregatedResults(TSDataType.DOUBLE));
}
return state.isEmit() ? stateOutputPair : null;
}
public Pair<WindowState, WindowOutput> updateIntermediateResult(
final long timestamp, final String value) {
final Pair<WindowState, WindowOutput> stateOutputPair =
processor.updateAndMaySetWindowState(this, timestamp, value);
final WindowState state = stateOutputPair.getLeft();
if (state.isEmitWithoutCompute()) {
stateOutputPair.getRight().setAggregatedResults(getAggregatedResults(TSDataType.TEXT));
}
if (state.isCalculate()) {
final Iterator<Map.Entry<String, Pair<TSDataType, IntermediateResultOperator>>> iterator =
intermediateResultName2tsTypeAndOperatorMap.entrySet().iterator();
Map.Entry<String, Pair<TSDataType, IntermediateResultOperator>> entry;
while (iterator.hasNext()) {
entry = iterator.next();
final IntermediateResultOperator operator = entry.getValue().getRight();
if (entry.getValue().getLeft() == TSDataType.UNKNOWN) {
if (!operator.initAndGetIsSupport(value, timestamp)) {
// Remove unsupported aggregated results
aggregatedOutputName2OperatorMap
.entrySet()
.removeIf(
entry1 ->
entry1
.getValue()
.getDeclaredIntermediateValueNames()
.contains(operator.getName()));
// If no aggregated values can be calculated, purge the window
if (aggregatedOutputName2OperatorMap.isEmpty()) {
return new Pair<>(WindowState.PURGE, null);
}
// Remove unsupported intermediate values
iterator.remove();
continue;
}
entry.getValue().setLeft(TSDataType.TEXT);
} else if (entry.getValue().getLeft() != TSDataType.TEXT) {
LOGGER.warn(
"Different data type encountered in one window, will purge. Previous type: {}, now type: {}",
entry.getValue().getLeft(),
TSDataType.TEXT);
return new Pair<>(WindowState.PURGE, null);
} else {
operator.updateValue(value, timestamp);
}
}
}
if (state.isEmitWithCompute()) {
stateOutputPair.getRight().setAggregatedResults(getAggregatedResults(TSDataType.TEXT));
}
return state.isEmit() ? stateOutputPair : null;
}
public WindowOutput forceOutput() {
return processor
.forceOutput(this)
.setAggregatedResults(
getAggregatedResults(
intermediateResultName2tsTypeAndOperatorMap.values().stream()
.findFirst()
.map(Pair::getLeft)
.orElse(TSDataType.UNKNOWN)));
}
private Map<String, Pair<TSDataType, Object>> getAggregatedResults(final TSDataType dataType) {
// The remaining intermediate results' datatype shall all be equal to this
// If not, return nothing
if (dataType == TSDataType.UNKNOWN
|| intermediateResultName2tsTypeAndOperatorMap.entrySet().stream()
.anyMatch(entry -> entry.getValue().getLeft() != dataType)) {
return Collections.emptyMap();
}
final CustomizedReadableIntermediateResults readableIntermediateResults =
new CustomizedReadableIntermediateResults(
intermediateResultName2tsTypeAndOperatorMap.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey, entry -> entry.getValue().getRight().getResult())));
return aggregatedOutputName2OperatorMap.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().terminateWindow(dataType, readableIntermediateResults)));
}
/////////////////////////////// Ser/De logics ///////////////////////////////
public void serialize(final DataOutputStream outputStream) throws IOException {
ReadWriteIOUtils.write(timestamp, outputStream);
ReadWriteIOUtils.write(intermediateResultName2tsTypeAndOperatorMap.size(), outputStream);
for (final Map.Entry<String, Pair<TSDataType, IntermediateResultOperator>> entry :
intermediateResultName2tsTypeAndOperatorMap.entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
entry.getValue().getLeft().serializeTo(outputStream);
entry.getValue().getRight().serialize(outputStream);
}
processor.serializeCustomizedAttributes(this, outputStream);
}
// Unlike normal deserialization method, the pipe may be altered before deserialization,
// which means the operators may increase or decrease. Hence, we need to combine the
// deserialized value and existing entries.
// WARNING: We do not support removing intermediate values (e.g. intermediate values are
// less after the aggregators decreased) or altering windowing processor in altering aggregate
// processor, only adding intermediate values is permitted.
public void deserialize(final ByteBuffer byteBuffer) throws IOException {
timestamp = ReadWriteIOUtils.readLong(byteBuffer);
final int size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; i++) {
final String intermediateResultName = ReadWriteIOUtils.readString(byteBuffer);
final Pair<TSDataType, IntermediateResultOperator> initializedAndOperatorPair =
intermediateResultName2tsTypeAndOperatorMap.get(intermediateResultName);
if (Objects.nonNull(initializedAndOperatorPair)) {
initializedAndOperatorPair.setLeft(TSDataType.deserializeFrom(byteBuffer));
initializedAndOperatorPair.getRight().deserialize(byteBuffer);
}
}
processor.deserializeCustomizedAttributes(this, byteBuffer);
}
}