blob: a556c3fea18697d6a00a09c03619670bd360943c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.last;
import org.apache.druid.collections.SerializablePair;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.segment.vector.VectorObjectSelector;
import org.apache.druid.segment.vector.VectorValueSelector;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
/**
* Base type for vectorized version of on heap 'last' aggregator for primitive numeric column selectors..
*/
public abstract class NumericLastVectorAggregator implements VectorAggregator
{
static final int NULL_OFFSET = Long.BYTES;
static final int VALUE_OFFSET = NULL_OFFSET + Byte.BYTES;
final VectorValueSelector valueSelector;
final VectorObjectSelector objectSelector;
private final boolean useDefault = NullHandling.replaceWithDefault();
private final VectorValueSelector timeSelector;
private long lastTime;
NumericLastVectorAggregator(VectorValueSelector timeSelector, VectorValueSelector valueSelector, VectorObjectSelector objectSelector)
{
this.timeSelector = timeSelector;
this.valueSelector = valueSelector;
this.objectSelector = objectSelector;
lastTime = Long.MIN_VALUE;
}
@Override
public void init(ByteBuffer buf, int position)
{
buf.putLong(position, Long.MIN_VALUE);
buf.put(position + NULL_OFFSET, useDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE);
initValue(buf, position + VALUE_OFFSET);
}
@Override
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
{
if (timeSelector == null) {
return;
}
final long[] timeVector = timeSelector.getLongVector();
Object[] objectsWhichMightBeNumeric = null;
boolean[] nullValueVector = null;
boolean nullAbsent = false;
if (objectSelector != null) {
objectsWhichMightBeNumeric = objectSelector.getObjectVector();
} else if (valueSelector != null) {
nullValueVector = valueSelector.getNullVector();
}
lastTime = buf.getLong(position);
if (nullValueVector == null) {
nullAbsent = true;
}
//the time vector is already sorted so the last element would be the latest
//traverse the value vector from the back (for latest)
int index = endRow - 1;
if (!useDefault && !nullAbsent) {
for (int i = endRow - 1; i >= startRow; i--) {
if (!nullValueVector[i]) {
index = i;
break;
}
}
}
if (objectsWhichMightBeNumeric != null) {
final SerializablePair<Long, Number> inPair = (SerializablePair<Long, Number>) objectsWhichMightBeNumeric[index];
if (inPair != null && inPair.lhs != null && inPair.lhs >= lastTime) {
lastTime = inPair.lhs;
if (useDefault || inPair.rhs != null) {
updateTimeWithValue(buf, position, lastTime, index);
} else {
updateTimeWithNull(buf, position, lastTime);
}
}
} else {
final long latestTime = timeVector[index];
if (latestTime >= lastTime) {
lastTime = latestTime;
if (useDefault || nullValueVector == null || !nullValueVector[index]) {
updateTimeWithValue(buf, position, lastTime, index);
} else {
updateTimeWithNull(buf, position, lastTime);
}
}
}
}
/**
*
* Checks if the aggregated value at a position in the buffer is null or not
*
* @param buf byte buffer storing the byte array representation of the aggregate
* @param position offset within the byte buffer at which the current aggregate value is stored
* @return
*/
boolean isValueNull(ByteBuffer buf, int position)
{
return buf.get(position + NULL_OFFSET) == NullHandling.IS_NULL_BYTE;
}
@Override
public void aggregate(
ByteBuffer buf,
int numRows,
int[] positions,
@Nullable int[] rows,
int positionOffset
)
{
if (timeSelector == null) {
return;
}
final long[] timeVector = timeSelector.getLongVector();
Object[] objectsWhichMightBeNumeric = null;
boolean[] nulls = null;
if (objectSelector != null) {
objectsWhichMightBeNumeric = objectSelector.getObjectVector();
} else if (valueSelector != null) {
nulls = useDefault ? null : valueSelector.getNullVector();
}
for (int i = 0; i < numRows; i++) {
int position = positions[i] + positionOffset;
int row = rows == null ? i : rows[i];
long lastTime = buf.getLong(position);
if (objectsWhichMightBeNumeric != null) {
final SerializablePair<Long, Number> inPair = (SerializablePair<Long, Number>) objectsWhichMightBeNumeric[row];
if (useDefault || inPair != null) {
if (inPair.lhs != null && inPair.lhs >= lastTime) {
if (inPair.rhs != null) {
updateTimeWithValue(buf, position, inPair.lhs, row);
} else {
updateTimeWithNull(buf, position, inPair.lhs);
}
}
}
} else {
if (timeVector[row] >= lastTime) {
if (useDefault || nulls == null || !nulls[row]) {
updateTimeWithValue(buf, position, timeVector[row], row);
} else {
updateTimeWithNull(buf, position, timeVector[row]);
}
}
}
}
}
/**
*
* @param buf byte buffer storing the byte array representation of the aggregate
* @param position offset within the byte buffer at which the current aggregate value is stored
* @param time the time to be updated in the buffer as the last time
* @param index he index of the vectorized vector which is the last value
*/
void updateTimeWithValue(ByteBuffer buf, int position, long time, int index)
{
buf.putLong(position, time);
buf.put(position + NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE);
putValue(buf, position + VALUE_OFFSET, index);
}
/**
*
* @param buf byte buffer storing the byte array representation of the aggregate
* @param position offset within the byte buffer at which the current aggregate value is stored
* @param time the time to be updated in the buffer as the last time
*/
void updateTimeWithNull(ByteBuffer buf, int position, long time)
{
buf.putLong(position, time);
buf.put(position + NULL_OFFSET, NullHandling.IS_NULL_BYTE);
}
/**
*Abstract function which needs to be overridden by subclasses to set the initial value
*/
abstract void initValue(ByteBuffer buf, int position);
/**
*Abstract function which needs to be overridden by subclasses to set the
* latest value in the buffer depending on the datatype
*/
abstract void putValue(ByteBuffer buf, int position, int index);
@Override
public void close()
{
// no resources to cleanup
}
}