blob: 70049a83a29d2cb96821685620dc45a27a661f07 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.spark.converter;
import java.io.Serializable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.joda.time.DateTime;
/**
* IndexedKey records the index and key info.
* This is used as key for JOINs. It addresses the case where key is
* either empty (or is a tuple with one or more empty fields). In this case,
* we must respect the SQL standard as documented in the equals() method.
*/
public class IndexedKey implements Serializable, Comparable {
private static final Log LOG = LogFactory.getLog(IndexedKey.class);
private byte index;
private Object key;
private boolean useSecondaryKey;
private boolean[] secondarySortOrder;
public IndexedKey(byte index, Object key) {
this.index = index;
this.key = key;
}
public byte getIndex() {
return index;
}
public Object getKey() {
return key;
}
@Override
public String toString() {
return "IndexedKey{" +
"index=" + index +
", key=" + key +
'}';
}
/**
* If key is empty, we'd like compute equality based on key and index.
* If key is not empty, we'd like to compute equality based on just the key (like we normally do).
* There are two possible cases when two tuples are compared:
* 1) Compare tuples of same table (same index)
* 2) Compare tuples of different tables (different index values)
* In 1)
* key1 key2 equal?
* null null Y
* foo null N
* null foo N
* foo foo Y
* (1,1) (1,1) Y
* (1,) (1,) Y
* (1,2) (1,2) Y
* <p/>
* <p/>
* In 2)
* key1 key2 equal?
* null null N
* foo null N
* null foo N
* foo foo Y
* (1,1) (1,1) Y
* (1,) (1,) N
* (1,2) (1,2) Y
*
* @param o
* @return
*/
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
IndexedKey that = (IndexedKey) o;
if (index == that.index) {
if (key == null && that.key == null) {
return true;
} else if (key == null || that.key == null) {
return false;
} else {
if (key instanceof DateTime) {
//In case of DateTime we use the less strict isEqual method so that
//e.g. 2017.01.01T10:00:00.000Z and 2017.01.01T11:00:00.000+01:00 are equal
return ((DateTime) key).isEqual((DateTime)that.key);
} else {
return key.equals(that.key);
}
}
} else {
if (key == null || that.key == null) {
return false;
} else if (key.equals(that.key) && !containNullfields(key)) {
return true;
} else {
return false;
}
}
}
private boolean containNullfields(Object key) {
if (key instanceof Tuple) {
for (int i = 0; i < ((Tuple) key).size(); i++) {
try {
if (((Tuple) key).get(i) == null) {
return true;
}
} catch (ExecException e) {
throw new RuntimeException("exception found in " +
"containNullfields", e);
}
}
}
return false;
}
/**
* Calculate hashCode by index and key
* if key is empty, return index value
* if key is not empty, return the key.hashCode()
*/
@Override
public int hashCode() {
int result = 0;
if (key == null) {
result = (int) index;
} else {
if (key instanceof DateTime) {
//In case of DateTime we use a custom hashCode() to avoid chronology taking part in the hash value
DateTime dt = (DateTime)key;
result = (int) (dt.getMillis() ^ dt.getMillis() >>> 32);
} else {
result = key.hashCode();
}
}
return result;
}
//firstly compare the index
//secondly compare the key (both first and secondary key)
@Override
public int compareTo(Object o) {
IndexedKey that = (IndexedKey) o;
int res = index - that.getIndex();
if (res > 0) {
return 1;
} else if (res < 0) {
return -1;
} else {
if (useSecondaryKey) {
Tuple thisCompoundKey = (Tuple) key;
Tuple thatCompoundKey = (Tuple)that.getKey();
PigSecondaryKeyComparatorSpark comparator = new PigSecondaryKeyComparatorSpark(secondarySortOrder);
return comparator.compareCompoundKey(thisCompoundKey, thatCompoundKey);
} else {
return DataType.compare(key, that.getKey());
}
}
}
public void setUseSecondaryKey(boolean useSecondaryKey) {
this.useSecondaryKey = useSecondaryKey;
}
public void setSecondarySortOrder(boolean[] secondarySortOrder) {
this.secondarySortOrder = secondarySortOrder;
}
}