blob: c7707e94db23fa99d9b1b7dc406f208b438f152c [file] [log] [blame]
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.serde.ComplexMetricSerde;
import io.druid.segment.serde.ComplexMetrics;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
*/
public class InputRowSerde
{
private static final Logger log = new Logger(InputRowSerde.class);
private static final Text[] EMPTY_TEXT_ARRAY = new Text[0];
public static final byte[] toBytes(final InputRow row, AggregatorFactory[] aggs)
{
try {
ByteArrayDataOutput out = ByteStreams.newDataOutput();
//write timestamp
out.writeLong(row.getTimestampFromEpoch());
//writing all dimensions
List<String> dimList = row.getDimensions();
Text[] dims = EMPTY_TEXT_ARRAY;
if(dimList != null) {
dims = new Text[dimList.size()];
for (int i = 0; i < dims.length; i++) {
dims[i] = new Text(dimList.get(i));
}
}
StringArrayWritable sw = new StringArrayWritable(dims);
sw.write(out);
MapWritable mw = new MapWritable();
if(dimList != null) {
for (String dim : dimList) {
List<String> dimValue = row.getDimension(dim);
if (dimValue == null || dimValue.size() == 0) {
continue;
}
if (dimValue.size() == 1) {
mw.put(new Text(dim), new Text(dimValue.get(0)));
} else {
Text[] dimValueArr = new Text[dimValue.size()];
for (int i = 0; i < dimValueArr.length; i++) {
dimValueArr[i] = new Text(dimValue.get(i));
}
mw.put(new Text(dim), new StringArrayWritable(dimValueArr));
}
}
}
//writing all metrics
Supplier<InputRow> supplier = new Supplier<InputRow>()
{
@Override
public InputRow get()
{
return row;
}
};
for (AggregatorFactory aggFactory : aggs) {
String k = aggFactory.getName();
Aggregator agg = aggFactory.factorize(
IncrementalIndex.makeColumnSelectorFactory(
aggFactory,
supplier,
true
)
);
agg.aggregate();
String t = aggFactory.getTypeName();
if (t.equals("float")) {
mw.put(new Text(k), new FloatWritable(agg.getFloat()));
} else if (t.equals("long")) {
mw.put(new Text(k), new LongWritable(agg.getLong()));
} else {
//its a complex metric
Object val = agg.get();
ComplexMetricSerde serde = getComplexMetricSerde(t);
mw.put(new Text(k), new BytesWritable(serde.toBytes(val)));
}
}
mw.write(out);
return out.toByteArray();
} catch(IOException ex) {
throw Throwables.propagate(ex);
}
}
public static final InputRow fromBytes(byte[] data, AggregatorFactory[] aggs)
{
try {
DataInput in = ByteStreams.newDataInput(data);
//Read timestamp
long timestamp = in.readLong();
//Read dimensions
StringArrayWritable sw = new StringArrayWritable();
sw.readFields(in);
List<String> dimensions = Arrays.asList(sw.toStrings());
MapWritable mw = new MapWritable();
mw.readFields(in);
Map<String, Object> event = Maps.newHashMap();
for (String d : dimensions) {
Writable v = mw.get(new Text(d));
if (v == null) {
continue;
}
if (v instanceof Text) {
event.put(d, ((Text) v).toString());
} else if (v instanceof StringArrayWritable) {
event.put(d, Arrays.asList(((StringArrayWritable) v).toStrings()));
} else {
throw new ISE("unknown dim value type %s", v.getClass().getName());
}
}
//Read metrics
for (AggregatorFactory aggFactory : aggs) {
String k = aggFactory.getName();
Writable v = mw.get(new Text(k));
if (v == null) {
continue;
}
String t = aggFactory.getTypeName();
if (t.equals("float")) {
event.put(k, ((FloatWritable) v).get());
} else if (t.equals("long")) {
event.put(k, ((LongWritable) v).get());
} else {
//its a complex metric
ComplexMetricSerde serde = getComplexMetricSerde(t);
BytesWritable bw = (BytesWritable) v;
event.put(k, serde.fromBytes(bw.getBytes(), 0, bw.getLength()));
}
}
return new MapBasedInputRow(timestamp, dimensions, event);
} catch(IOException ex) {
throw Throwables.propagate(ex);
}
}
private static ComplexMetricSerde getComplexMetricSerde(String type)
{
ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(type);
if (serde == null) {
throw new IAE("Unknown type[%s]", type);
}
return serde;
}
}
class StringArrayWritable extends ArrayWritable
{
public StringArrayWritable()
{
super(Text.class);
}
public StringArrayWritable(Text[] strs)
{
super(Text.class, strs);
}
}