blob: 90edf4a058bf12b3d1385c30889491cbfe385c21 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.dataflow.std.aggregators;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
public class ConcatAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
private static final long serialVersionUID = 1L;
private static final int INIT_ACCUMULATORS_SIZE = 8;
private final int concatField;
private int outField = -1;
/**
* Initialize the aggregator, with the field to be concatenated.
*
* @param concatField
*/
public ConcatAggregatorDescriptorFactory(int concatField) {
this.concatField = concatField;
}
/**
* Initialize the aggregator, with the field index to be concatenated, and
* also the field where the aggregation result will be outputted.
*
* @param concatField
* @param outField
*/
public ConcatAggregatorDescriptorFactory(int concatField, int outField) {
this.concatField = concatField;
this.outField = outField;
}
/**
* Create a concatenation aggregator. A byte buffer will be allocated inside of the
* aggregator to contain the partial aggregation results. A reference will be written
* onto the output frame for indexing the aggregation result from the buffer.
*/
@Override
public IAggregatorDescriptor createAggregator(IHyracksStageletContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, final int[] keyFields) throws HyracksDataException {
if (this.outField < 0)
this.outField = keyFields.length;
return new IAggregatorDescriptor() {
byte[][] buf = new byte[INIT_ACCUMULATORS_SIZE][];
int currentAggregatorIndex = -1;
int aggregatorCount = 0;
@Override
public void init(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
throws HyracksDataException {
// Initialize the aggregation value
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, concatField);
int fieldLength = accessor.getFieldLength(tIndex, concatField);
int appendOffset = tupleOffset + accessor.getFieldSlotsLength() + fieldStart;
// Get the initial value
currentAggregatorIndex++;
if (currentAggregatorIndex >= buf.length) {
byte[][] newBuf = new byte[buf.length * 2][];
for (int i = 0; i < buf.length; i++) {
newBuf[i] = buf[i];
}
this.buf = newBuf;
}
buf[currentAggregatorIndex] = new byte[fieldLength];
System.arraycopy(accessor.getBuffer().array(), appendOffset, buf[currentAggregatorIndex], 0,
fieldLength);
// Update the aggregator index
aggregatorCount++;
try {
tupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, currentAggregatorIndex);
} catch (IOException e) {
throw new HyracksDataException();
}
}
@Override
public void reset() {
currentAggregatorIndex = -1;
aggregatorCount = 0;
}
@Override
public void close() {
currentAggregatorIndex = -1;
aggregatorCount = 0;
for (int i = 0; i < buf.length; i++) {
buf[i] = null;
}
}
@Override
public int aggregate(IFrameTupleAccessor accessor, int tIndex, byte[] data, int offset, int length)
throws HyracksDataException {
int refIndex = IntegerSerializerDeserializer.getInt(data, offset);
// FIXME Should be done in binary way
StringBuilder sbder = new StringBuilder();
sbder.append(UTF8StringSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
new ByteArrayInputStream(buf[refIndex]))));
// Get the new data
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, concatField);
int fieldLength = accessor.getFieldLength(tIndex, concatField);
sbder.append(UTF8StringSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
new ByteArrayInputStream(accessor.getBuffer().array(), tupleOffset
+ accessor.getFieldSlotsLength() + fieldStart, fieldLength))));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
UTF8StringSerializerDeserializer.INSTANCE.serialize(sbder.toString(), new DataOutputStream(baos));
buf[refIndex] = baos.toByteArray();
return 4;
}
@Override
public void outputResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
throws HyracksDataException {
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldStart = accessor.getFieldStartOffset(tIndex, outField);
int refIndex = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset
+ accessor.getFieldSlotsLength() + fieldStart);
try {
if (refIndex >= 0)
tupleBuilder.getDataOutput().write(buf[refIndex]);
else {
int fieldLength = accessor.getFieldLength(tIndex, outField);
tupleBuilder.getDataOutput().write(accessor.getBuffer().array(),
tupleOffset + accessor.getFieldSlotsLength() + fieldStart + 4, fieldLength - 4);
}
tupleBuilder.addFieldEndOffset();
} catch (IOException e) {
throw new HyracksDataException();
}
}
@Override
public void outputPartialResult(IFrameTupleAccessor accessor, int tIndex, ArrayTupleBuilder tupleBuilder)
throws HyracksDataException {
int tupleOffset = accessor.getTupleStartOffset(tIndex);
int fieldOffset = accessor.getFieldStartOffset(tIndex, outField);
int fieldLength = accessor.getFieldLength(tIndex, outField);
int refIndex = IntegerSerializerDeserializer.getInt(accessor.getBuffer().array(), tupleOffset
+ accessor.getFieldSlotsLength() + fieldOffset);
try {
tupleBuilder.getDataOutput().writeInt(-1);
if (refIndex < 0) {
tupleBuilder.getDataOutput().write(accessor.getBuffer().array(),
tupleOffset + accessor.getFieldSlotsLength() + fieldOffset + 4, fieldLength - 4);
} else {
tupleBuilder.getDataOutput().write(buf[refIndex], 0, buf[refIndex].length);
}
tupleBuilder.addFieldEndOffset();
} catch (IOException e) {
throw new HyracksDataException();
}
}
};
}
}