blob: 5c28c05b3ec03159ec61c93c718068c6b5563c21 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.window;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.column.TimeColumn;
public class SessionWindow implements IWindow {
private final long timeInterval;
private final boolean ascending;
private long timeValue;
private long startTime;
private long endTime;
private long lastTsBlockTime;
private boolean initializedTimeValue;
public SessionWindow(long timeInterval, boolean ascending) {
this.timeInterval = timeInterval;
this.ascending = ascending;
}
@Override
public Column getControlColumn(TsBlock tsBlock) {
return tsBlock.getTimeColumn();
}
@Override
public boolean satisfy(Column column, int index) {
if (!initializedTimeValue) {
return true;
}
if (index == 0) {
return Math.abs(column.getLong(index) - lastTsBlockTime) <= timeInterval;
}
return Math.abs(column.getLong(index) - column.getLong(index - 1)) <= timeInterval;
}
@Override
public void mergeOnePoint(Column[] controlTimeAndValueColumn, int index) {
long currentTime = controlTimeAndValueColumn[0].getLong(index);
// judge whether we need initialize timeValue
if (!initializedTimeValue) {
startTime = currentTime;
endTime = currentTime;
lastTsBlockTime = controlTimeAndValueColumn[0].getLong(0);
timeValue = currentTime;
initializedTimeValue = true;
return;
}
// judge whether we need update startTime
if (startTime > currentTime) {
startTime = currentTime;
}
// judge whether we need update endTime
if (endTime < currentTime) {
endTime = currentTime;
}
// update the last time of session window
timeValue = ascending ? Math.max(timeValue, currentTime) : Math.min(timeValue, currentTime);
setLastTsBlockTime(timeValue);
}
@Override
public boolean contains(Column column) {
TimeColumn timeColumn = (TimeColumn) column;
long minTime = Math.min(timeColumn.getStartTime(), timeColumn.getEndTime());
long maxTime = Math.max(timeColumn.getStartTime(), timeColumn.getEndTime());
boolean contains =
Math.abs(column.getLong(0) - lastTsBlockTime) < timeInterval
&& maxTime - minTime <= timeInterval;
if (contains) {
if (!initializedTimeValue) {
startTime = Long.MAX_VALUE;
endTime = Long.MIN_VALUE;
lastTsBlockTime = column.getLong(0);
timeValue = ascending ? maxTime : minTime;
initializedTimeValue = true;
}
timeValue = ascending ? Math.max(timeValue, maxTime) : Math.min(timeValue, minTime);
startTime = Math.min(startTime, minTime);
endTime = Math.max(endTime, maxTime);
}
return contains;
}
public long getTimeInterval() {
return timeInterval;
}
public long getTimeValue() {
return timeValue;
}
public void setTimeValue(long timeValue) {
this.timeValue = timeValue;
}
public long getStartTime() {
return startTime;
}
public void setStartTime(long startTime) {
this.startTime = startTime;
}
public long getEndTime() {
return endTime;
}
public void setEndTime(long endTime) {
this.endTime = endTime;
}
public void setInitializedTimeValue(boolean initializedTimeValue) {
this.initializedTimeValue = initializedTimeValue;
}
public void setLastTsBlockTime(long lastTsBlockTime) {
this.lastTsBlockTime = lastTsBlockTime;
}
}