blob: 3e6196eadcbeca2821f6bb2669c8a757665f1188 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexer;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import java.nio.ByteBuffer;
import java.util.Arrays;
/**
*/
public class SortableBytes
{
private final byte[] groupKey;
private final byte[] sortKey;
public SortableBytes(
byte[] groupKey,
byte[] sortKey
)
{
this.groupKey = groupKey;
this.sortKey = sortKey;
}
public byte[] getGroupKey()
{
return groupKey;
}
public byte[] getSortKey()
{
return sortKey;
}
public byte[] toBytes()
{
ByteBuffer outBytes = ByteBuffer.wrap(new byte[4 + groupKey.length + sortKey.length]);
outBytes.putInt(groupKey.length);
outBytes.put(groupKey);
outBytes.put(sortKey);
return outBytes.array();
}
public BytesWritable toBytesWritable()
{
return new BytesWritable(toBytes());
}
@Override
public String toString()
{
return "SortableBytes{" +
"groupKey='" + StringUtils.fromUtf8(groupKey) + '\'' +
", sortKey='" + StringUtils.fromUtf8(sortKey) + '\'' +
'}';
}
public static SortableBytes fromBytesWritable(BytesWritable bytes)
{
return fromBytes(bytes.getBytes(), 0, bytes.getLength());
}
public static SortableBytes fromBytes(byte[] bytes, int offset, int length)
{
int groupKeySize = ByteBuffer.wrap(bytes, offset, length).getInt();
int sortKeyOffset = offset + 4 + groupKeySize;
return new SortableBytes(
Arrays.copyOfRange(bytes, offset + 4, sortKeyOffset),
Arrays.copyOfRange(bytes, sortKeyOffset, offset + length)
);
}
public static void useSortableBytesAsMapOutputKey(Job job, Class<? extends Partitioner> partitionerClass)
{
job.setMapOutputKeyClass(BytesWritable.class);
job.setGroupingComparatorClass(SortableBytesGroupingComparator.class);
job.setSortComparatorClass(SortableBytesSortingComparator.class);
job.setPartitionerClass(partitionerClass);
}
public static class SortableBytesGroupingComparator extends WritableComparator
{
protected SortableBytesGroupingComparator()
{
super(BytesWritable.class);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)
{
int b1Length = ByteBuffer.wrap(b1, s1 + 4, l1 - 4).getInt();
int b2Length = ByteBuffer.wrap(b2, s2 + 4, l2 - 4).getInt();
final int retVal = compareBytes(b1, s1 + 8, b1Length, b2, s2 + 8, b2Length);
return retVal;
}
}
public static class SortableBytesSortingComparator extends WritableComparator
{
protected SortableBytesSortingComparator()
{
super(BytesWritable.class);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)
{
int b1Length = ByteBuffer.wrap(b1, s1 + 4, l1 - 4).getInt();
int b2Length = ByteBuffer.wrap(b2, s2 + 4, l2 - 4).getInt();
int retVal = compareBytes(b1, s1 + 8, b1Length, b2, s2 + 8, b2Length);
if (retVal == 0) {
retVal = compareBytes(
b1,
s1 + 8 + b1Length,
l1 - 8 - b1Length,
b2,
s2 + 8 + b2Length,
l2 - 8 - b2Length
);
}
return retVal;
}
}
}