blob: 5feca5ab5ab6136ba3b04cad5446e7b7f79361bb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.spark.converter;
import java.io.Serializable;
import java.util.Comparator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
/**
* Utility class that handles secondary key for sorting.
*/
class PigSecondaryKeyComparatorSpark implements Comparator, Serializable {
private static final Log LOG = LogFactory.getLog(PigSecondaryKeyComparatorSpark.class);
private static final long serialVersionUID = 1L;
private static boolean[] secondarySortOrder;
public PigSecondaryKeyComparatorSpark(boolean[] pSecondarySortOrder) {
secondarySortOrder = pSecondarySortOrder;
}
//IndexedKeyPartitioner will put the tuple with same mainKey together, in PigSecondaryKeyComparatorSpark#compare
// (Object o1, Object o2)
//we only compare the secondaryKey
@Override
public int compare(Object o1, Object o2) {
Tuple t1 = (Tuple) o1;
Tuple t2 = (Tuple) o2;
try {
if ((t1.size() < 3) || (t2.size() < 3)) {
throw new RuntimeException("tuple size must bigger than 3, tuple[0] stands for index, tuple[1]" +
"stands for the compound key, tuple[3] stands for the value");
}
Tuple compoundKey1 = (Tuple) t1.get(1);
Tuple compoundKey2 = (Tuple) t2.get(1);
if ((compoundKey1.size() < 2) || (compoundKey2.size() < 2)) {
throw new RuntimeException("compoundKey size must bigger than, compoundKey[0] stands for firstKey," +
"compoundKey[1] stands for secondaryKey");
}
Object secondaryKey1 = compoundKey1.get(1);
Object secondaryKey2 = compoundKey2.get(1);
int res = compareKeys(secondaryKey1, secondaryKey2, secondarySortOrder);
if (LOG.isDebugEnabled()) {
LOG.debug("t1:" + t1 + "t2:" + t2 + " res:" + res);
}
return res;
} catch (ExecException e) {
throw new RuntimeException("Fail to get the compoundKey", e);
}
}
//compare the mainKey and secondaryKey
public int compareCompoundKey(Tuple compoundKey1, Tuple compoundKey2){
try {
if ((compoundKey1.size() < 2) || (compoundKey2.size() < 2)) {
throw new RuntimeException("compoundKey size must bigger than, compoundKey[0] stands for firstKey," +
"compoundKey[1] stands for secondaryKey");
}
Object mainKey1 = compoundKey1.get(0);
Object mainKey2 = compoundKey2.get(0);
int res = compareKeys(mainKey1,mainKey2, null);
if ( res !=0 ){
return res;
} else {
Object secondaryKey1 = compoundKey1.get(1);
Object secondaryKey2 = compoundKey2.get(1);
res = compareKeys(secondaryKey1, secondaryKey2, secondarySortOrder);
if (LOG.isDebugEnabled()) {
LOG.debug("compoundKey1:" + compoundKey1 + "compoundKey2:" + compoundKey2 + " res:" + res);
}
return res;
}
} catch (ExecException e) {
throw new RuntimeException("Fail to get the compoundKey", e);
}
}
private int compareKeys(Object o1, Object o2, boolean[] asc) {
int rc = 0;
if (o1 != null && o2 != null && o1 instanceof Tuple && o2 instanceof Tuple) {
// objects are Tuples, we may need to apply sort order inside them
Tuple t1 = (Tuple) o1;
Tuple t2 = (Tuple) o2;
int sz1 = t1.size();
int sz2 = t2.size();
if (sz2 < sz1) {
return 1;
} else if (sz2 > sz1) {
return -1;
} else {
for (int i = 0; i < sz1; i++) {
try {
rc = DataType.compare(t1.get(i), t2.get(i));
if (rc != 0 && asc != null && asc.length > 1 && !asc[i])
rc *= -1;
if ((t1.get(i) == null) || (t2.get(i) == null)) {
if (LOG.isDebugEnabled()) {
LOG.debug("t1.get(i) is:" + t1.get(i) + " t2.get(i) is:" + t2.get(i));
}
}
if (rc != 0) break;
} catch (ExecException e) {
throw new RuntimeException("Unable to compare tuples", e);
}
}
}
} else {
// objects are NOT Tuples, delegate to DataType.compare()
rc = DataType.compare(o1, o2);
}
// apply sort order for keys that are not tuples or for whole tuples
if (asc != null && asc.length == 1 && !asc[0])
rc *= -1;
return rc;
}
}