blob: 46f00446d419e1f48005417c74f7f33128076cbf [file] [log] [blame]
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
#ifndef SQLUDR_PREDEF_TIMESERIES_H
#define SQLUDR_PREDEF_TIMESERIES_H
#include "sqludr.h"
using namespace tmudr;
// derive a class from UDR
class TimeSeries : public UDR
{
public:
// determine output columns dynamically at compile time
void describeParamsAndColumns(UDRInvocationInfo &info);
// override the runtime method
void processData(UDRInvocationInfo &info,
UDRPlanInfo &plan);
};
// Factory method
extern "C" UDR * TRAF_CPP_TIMESERIES();
// An object that represents a nullable value of a type at a given
// point in time
template<class T>
class NullableTimedValue
{
public:
NullableTimedValue() { isNull_ = true; t_ = 0; }
NullableTimedValue(bool isNull, time_t t, T v) :
isNull_(isNull), t_(t), v_(v) {}
bool isNull() const { return isNull_; }
time_t getTime() const { return t_; }
T getVal() const { return v_; }
NullableTimedValue<double> interpolateLinear(
time_t t,
const NullableTimedValue &nextHigherVal);
private:
bool isNull_;
time_t t_;
T v_;
};
// One time slice aggregate function
class TimeSeriesAggregate
{
public:
TimeSeriesAggregate(const TupleInfo &inTup,
const TupleInfo &outTup,
int inputColNum,
int outputColNum,
bool isFirstVal,
bool isConstInterpol,
bool isIgnoreNulls);
~TimeSeriesAggregate();
int getOutputColNum() const { return outputColNum_; }
int getInputColNum() const { return inputColNum_; }
bool isFirstVal() const { return isFirstVal_; }
bool isConstInterpol() const { return isConstInterpol_; }
bool isIgnoreNulls() const { return isIgnoreNulls_; }
void initPartition();
int numEntries();
time_t getTime(int entry);
void readInputCol(time_t t, int sliceNum);
void finalizePartition();
void setOutputCol(time_t startTime, int sliceNum, time_t width);
private:
// Description of the aggregate function and its columns
const TupleInfo &inTup_;
const TupleInfo &outTup_;
int inputColNum_;
int outputColNum_;
bool isFirstVal_;
bool isConstInterpol_;
bool isIgnoreNulls_;
bool useLong_;
// vectors of first/last values per time slice, collected for one partition
std::vector<NullableTimedValue<long> > lValues_;
std::vector<NullableTimedValue<double> > dValues_;
// state of the vectors when reading and generating rows
int currIx_;
int currSliceNum_;
int entriesForThisTimeSlice_;
};
// Define data that gets passed between compiler phases and runtime
class InternalColumns : public UDRWriterCompileTimeData
{
public:
InternalColumns(const UDRInvocationInfo &info);
~InternalColumns();
int getNumCols() const { return getFirstAggrCol() + getNumAggrCols(); }
int getTimeSliceInColNum() const { return tsInColNum_; }
int getTimeSliceOutColNum() const { return numPartCols_; }
int getNumPartCols() const { return numPartCols_; }
int getNumAggrCols() const { return columns_.size(); }
int getFirstAggrCol() const { return numTSCols_ + numPartCols_; }
TimeSeriesAggregate &getAggrColumn(int i) { return *(columns_[i]); }
void initializePartition();
bool isSamePartition();
int getNumTimeSlices();
void readInputCols();
void finalizePartition();
void setOutputCols(int sliceNum);
private:
const UDRInvocationInfo &info_;
// number of timestamp and partitioning columns
int tsInColNum_;
int numTSCols_;
int numPartCols_;
// key of the current partition we are processing
std::vector<std::string> currPartKey_;
// null indicators for current partition key
std::vector<bool> currPartKeyNulls_;
// width of one time slice in time_t units (seconds)
time_t timeSliceWidth_;
// start time, rounded down to a multiple of
// the time slice width
time_t partitionStartTime_;
// current end time of the partition, set in
// processRowForPartition()
time_t partitionCurrTime_;
// Array of objects that handle the individual aggregates
std::vector<TimeSeriesAggregate *> columns_;
};
#endif