blob: eaf479257e44aafa9d5ad5605c1818d42273806e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.exec;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.Explain;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.OpTraits;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.Statistics;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Base operator implementation.
**/
public abstract class Operator<T extends OperatorDesc> implements Serializable,Cloneable,
Node {
// Bean methods
private static final long serialVersionUID = 1L;
public static final String HIVECOUNTERCREATEDFILES = "CREATED_FILES";
public static final String HIVECOUNTERFATAL = "FATAL_ERROR";
public static final String CONTEXT_NAME_KEY = "__hive.context.name";
private transient Configuration configuration;
protected transient CompilationOpContext cContext;
protected List<Operator<? extends OperatorDesc>> childOperators;
protected List<Operator<? extends OperatorDesc>> parentOperators;
protected String operatorId;
protected final AtomicBoolean abortOp;
private transient ExecMapperContext execContext;
private transient boolean rootInitializeCalled = false;
protected final transient Collection<Future<?>> asyncInitOperations = new HashSet<>();
// It can be optimized later so that an operator operator (init/close) is performed
// only after that operation has been performed on all the parents. This will require
// initializing the whole tree in all the mappers (which might be required for mappers
// spanning multiple files anyway, in future)
/**
* State.
*
*/
public static enum State {
UNINIT, // initialize() has not been called
INIT, // initialize() has been called and close() has not been called,
// or close() has been called but one of its parent is not closed.
CLOSE
// all its parents operators are in state CLOSE and called close()
// to children. Note: close() being called and its state being CLOSE is
// difference since close() could be called but state is not CLOSE if
// one of its parent is not in state CLOSE..
}
protected transient State state = State.UNINIT;
private boolean useBucketizedHiveInputFormat;
// dummy operator (for not increasing seqId)
protected Operator(String name, CompilationOpContext cContext) {
this();
this.cContext = cContext;
this.id = name;
initOperatorId();
}
protected Operator() {
childOperators = new ArrayList<Operator<? extends OperatorDesc>>();
parentOperators = new ArrayList<Operator<? extends OperatorDesc>>();
abortOp = new AtomicBoolean(false);
}
public Operator(CompilationOpContext cContext) {
this(String.valueOf(cContext.nextOperatorId()), cContext);
}
public void setChildOperators(
List<Operator<? extends OperatorDesc>> childOperators) {
if (childOperators == null) {
childOperators = new ArrayList<Operator<? extends OperatorDesc>>();
}
this.childOperators = childOperators;
}
public Configuration getConfiguration() {
return configuration;
}
public List<Operator<? extends OperatorDesc>> getChildOperators() {
return childOperators;
}
public int getNumChild() {
return childOperators == null ? 0 : childOperators.size();
}
/**
* Implements the getChildren function for the Node Interface.
*/
@Override
public ArrayList<Node> getChildren() {
if (getChildOperators() == null) {
return null;
}
ArrayList<Node> ret_vec = new ArrayList<Node>();
for (Operator<? extends OperatorDesc> op : getChildOperators()) {
ret_vec.add(op);
}
return ret_vec;
}
public void setParentOperators(
List<Operator<? extends OperatorDesc>> parentOperators) {
if (parentOperators == null) {
parentOperators = new ArrayList<Operator<? extends OperatorDesc>>();
}
this.parentOperators = parentOperators;
}
public List<Operator<? extends OperatorDesc>> getParentOperators() {
return parentOperators;
}
public int getNumParent() {
return parentOperators == null ? 0 : parentOperators.size();
}
protected T conf;
protected boolean done;
public void setConf(T conf) {
this.conf = conf;
}
@Explain
public T getConf() {
return conf;
}
public boolean getDone() {
return done;
}
protected final void setDone(boolean done) {
this.done = done;
}
// non-bean fields needed during compilation
private RowSchema rowSchema;
public void setSchema(RowSchema rowSchema) {
this.rowSchema = rowSchema;
}
public RowSchema getSchema() {
return rowSchema;
}
// non-bean ..
protected transient Map<String, LongWritable> statsMap = new HashMap<String, LongWritable>();
@SuppressWarnings("rawtypes")
protected transient OutputCollector out;
protected transient final Logger LOG = LoggerFactory.getLogger(getClass().getName());
protected transient final Logger PLOG = LoggerFactory.getLogger(Operator.class.getName()); // for simple disabling logs from all operators
protected transient final boolean isLogInfoEnabled = LOG.isInfoEnabled() && PLOG.isInfoEnabled();
protected transient final boolean isLogDebugEnabled = LOG.isDebugEnabled() && PLOG.isDebugEnabled();
protected transient final boolean isLogTraceEnabled = LOG.isTraceEnabled() && PLOG.isTraceEnabled();
protected transient String alias;
protected transient Reporter reporter;
protected String id;
// object inspectors for input rows
// We will increase the size of the array on demand
protected transient ObjectInspector[] inputObjInspectors = new ObjectInspector[1];
// for output rows of this operator
protected transient ObjectInspector outputObjInspector;
/**
* A map of output column name to input expression map. This is used by
* optimizer and built during semantic analysis contains only key elements for
* reduce sink and group by op
*/
protected Map<String, ExprNodeDesc> colExprMap;
public void setId(String id) {
this.id = id;
}
/**
* This function is not named getId(), to make sure java serialization does
* NOT serialize it. Some TestParse tests will fail if we serialize this
* field, since the Operator ID will change based on the number of query
* tests.
*/
public String getIdentifier() {
return id;
}
public void setReporter(Reporter rep) {
reporter = rep;
for (Operator<? extends OperatorDesc> op : childOperators) {
op.setReporter(rep);
}
}
@SuppressWarnings("rawtypes")
public void setOutputCollector(OutputCollector out) {
this.out = out;
for (Operator<? extends OperatorDesc> op : childOperators) {
op.setOutputCollector(out);
}
}
/**
* Store the alias this operator is working on behalf of.
*/
public void setAlias(String alias) {
this.alias = alias;
for (Operator<? extends OperatorDesc> op : childOperators) {
op.setAlias(alias);
}
}
public Map<String, Long> getStats() {
HashMap<String, Long> ret = new HashMap<String, Long>();
for (String one : statsMap.keySet()) {
ret.put(one, Long.valueOf(statsMap.get(one).get()));
}
return (ret);
}
/**
* checks whether all parent operators are initialized or not.
*
* @return true if there are no parents or all parents are initialized. false
* otherwise
*/
protected boolean areAllParentsInitialized() {
for (Operator<? extends OperatorDesc> parent : parentOperators) {
if (parent == null) {
//return true;
continue;
}
if (parent.state != State.INIT) {
return false;
}
}
return true;
}
/**
* Initializes operators only if all parents have been initialized. Calls
* operator specific initializer which then initializes child ops.
*
* @param hconf
* @param inputOIs
* input object inspector array indexes by tag id. null value is
* ignored.
* @throws HiveException
*/
@SuppressWarnings("unchecked")
public final void initialize(Configuration hconf, ObjectInspector[] inputOIs)
throws HiveException {
this.done = false;
if (state == State.INIT) {
return;
}
this.configuration = hconf;
if (!areAllParentsInitialized()) {
return;
}
if (isLogInfoEnabled) {
LOG.info("Initializing operator " + this);
}
if (inputOIs != null) {
inputObjInspectors = inputOIs;
}
// initialize structure to maintain child op info. operator tree changes
// while initializing so this need to be done here instead of constructor
childOperatorsArray = new Operator[childOperators.size()];
for (int i = 0; i < childOperatorsArray.length; i++) {
childOperatorsArray[i] = childOperators.get(i);
}
childOperatorsTag = new int[childOperatorsArray.length];
for (int i = 0; i < childOperatorsArray.length; i++) {
List<Operator<? extends OperatorDesc>> parentOperators =
childOperatorsArray[i].getParentOperators();
childOperatorsTag[i] = parentOperators.indexOf(this);
if (childOperatorsTag[i] == -1) {
throw new HiveException("Hive internal error: cannot find parent in the child operator!");
}
}
if (inputObjInspectors.length == 0) {
throw new HiveException("Internal Error during operator initialization.");
}
// derived classes can set this to different object if needed
outputObjInspector = inputObjInspectors[0];
boolean isInitOk = false;
try {
initializeOp(hconf);
// sanity checks
if (!rootInitializeCalled
|| childOperatorsArray.length != childOperators.size()) {
throw new AssertionError("Internal error during operator initialization");
}
if (isLogDebugEnabled) {
LOG.debug("Initialization Done " + id + " " + getName());
}
initializeChildren(hconf);
isInitOk = true;
} finally {
// TODO: ugly hack because Java doesn't have dtors and Tez input hangs on shutdown.
if (!isInitOk) {
cancelAsyncInitOps();
}
}
if (isLogDebugEnabled) {
LOG.debug("Initialization Done " + id + " " + getName() + " done is reset.");
}
// let's wait on the async ops before continuing
completeInitialization(asyncInitOperations);
}
private void cancelAsyncInitOps() {
for (Future<?> f : asyncInitOperations) {
f.cancel(true);
}
asyncInitOperations.clear();
}
private void completeInitialization(Collection<Future<?>> fs) throws HiveException {
Object[] os = new Object[fs.size()];
int i = 0;
Throwable asyncEx = null;
// Wait for all futures to complete. Check for an abort while waiting for each future. If any of the futures is cancelled / aborted - cancel all subsequent futures.
boolean cancelAll = false;
for (Future<?> f : fs) {
// If aborted - break out of the loop, and cancel all subsequent futures.
if (cancelAll) {
break;
}
if (abortOp.get()) {
cancelAll = true;
break;
} else {
// Wait for the current future.
while (true) {
if (abortOp.get()) {
cancelAll = true;
break;
} else {
try {
// Await future result with a timeout to check the abort field occasionally.
// It's possible that the interrupt which comes in along with an abort, is suppressed
// by some other operator.
Object futureResult = f.get(200l, TimeUnit.MILLISECONDS);
os[i++] = futureResult;
break;
} catch (TimeoutException e) {
// Expected if the operation takes time. Continue the loop, and wait for op completion.
} catch (InterruptedException | CancellationException e) {
asyncEx = e;
cancelAll = true;
break;
} catch (ExecutionException e) {
if (e.getCause() == null) {
asyncEx = e;
} else {
asyncEx = e.getCause();
}
cancelAll = true;
break;
}
}
}
}
}
if (cancelAll || asyncEx != null) {
for (Future<?> f : fs) {
// It's ok to send a cancel to an already completed future. Is a no-op
f.cancel(true);
}
throw new HiveException("Async Initialization failed. abortRequested=" + abortOp.get(), asyncEx);
}
completeInitializationOp(os);
}
/**
* This method can be used to retrieve the results from async operations
* started at init time - before the operator pipeline is started.
*
* @param os
* @throws HiveException
*/
protected void completeInitializationOp(Object[] os) throws HiveException {
// no-op default
}
public void initializeLocalWork(Configuration hconf) throws HiveException {
if (childOperators != null) {
for (int i =0; i<childOperators.size();i++) {
Operator<? extends OperatorDesc> childOp = this.childOperators.get(i);
childOp.initializeLocalWork(hconf);
}
}
}
/**
* Operator specific initialization.
*/
protected void initializeOp(Configuration hconf) throws HiveException {
rootInitializeCalled = true;
}
/**
* Calls initialize on each of the children with outputObjetInspector as the
* output row format.
*/
protected void initializeChildren(Configuration hconf) throws HiveException {
state = State.INIT;
if (isLogDebugEnabled) {
LOG.debug("Operator " + id + " " + getName() + " initialized");
}
if (childOperators == null || childOperators.isEmpty()) {
return;
}
if (isLogDebugEnabled) {
LOG.debug("Initializing children of " + id + " " + getName());
}
for (int i = 0; i < childOperatorsArray.length; i++) {
childOperatorsArray[i].initialize(hconf, outputObjInspector, childOperatorsTag[i]);
if (reporter != null) {
childOperatorsArray[i].setReporter(reporter);
}
}
}
public void abort() {
LOG.info("Received abort in operator: {}", getName());
abortOp.set(true);
}
/**
* Pass the execContext reference to every child operator
*/
public void passExecContext(ExecMapperContext execContext) {
this.setExecContext(execContext);
for (int i = 0; i < childOperators.size(); i++) {
childOperators.get(i).passExecContext(execContext);
}
}
/**
* Collects all the parent's output object inspectors and calls actual
* initialization method.
*
* @param hconf
* @param inputOI
* OI of the row that this parent will pass to this op
* @param parentId
* parent operator id
* @throws HiveException
*/
protected void initialize(Configuration hconf, ObjectInspector inputOI,
int parentId) throws HiveException {
if (isLogDebugEnabled) {
LOG.debug("Initializing child " + id + " " + getName());
}
// Double the size of the array if needed
if (parentId >= inputObjInspectors.length) {
int newLength = inputObjInspectors.length * 2;
while (parentId >= newLength) {
newLength *= 2;
}
inputObjInspectors = Arrays.copyOf(inputObjInspectors, newLength);
}
inputObjInspectors[parentId] = inputOI;
// call the actual operator initialization function
initialize(hconf, null);
}
public ObjectInspector[] getInputObjInspectors() {
return inputObjInspectors;
}
public void setInputObjInspectors(ObjectInspector[] inputObjInspectors) {
this.inputObjInspectors = inputObjInspectors;
}
public ObjectInspector getOutputObjInspector() {
return outputObjInspector;
}
/**
* Process the row.
*
* @param row
* The object representing the row.
* @param tag
* The tag of the row usually means which parent this row comes from.
* Rows with the same tag should have exactly the same rowInspector
* all the time.
*/
public abstract void process(Object row, int tag) throws HiveException;
protected final void defaultStartGroup() throws HiveException {
if (isLogDebugEnabled) {
LOG.debug("Starting group");
}
if (childOperators == null) {
return;
}
if (isLogDebugEnabled) {
LOG.debug("Starting group for children:");
}
for (Operator<? extends OperatorDesc> op : childOperators) {
op.startGroup();
}
if (isLogDebugEnabled) {
LOG.debug("Start group Done");
}
}
protected final void defaultEndGroup() throws HiveException {
if (isLogDebugEnabled) {
LOG.debug("Ending group");
}
if (childOperators == null) {
return;
}
if (isLogDebugEnabled) {
LOG.debug("Ending group for children:");
}
for (Operator<? extends OperatorDesc> op : childOperators) {
op.endGroup();
}
if (isLogDebugEnabled) {
LOG.debug("End group Done");
}
}
// If a operator wants to do some work at the beginning of a group
public void startGroup() throws HiveException {
defaultStartGroup();
}
// If an operator wants to do some work at the end of a group
public void endGroup() throws HiveException {
defaultEndGroup();
}
// an blocking operator (e.g. GroupByOperator and JoinOperator) can
// override this method to forward its outputs
public void flush() throws HiveException {
}
public void processGroup(int tag) throws HiveException {
if (childOperators == null || childOperators.isEmpty()) {
return;
}
for (int i = 0; i < childOperatorsArray.length; i++) {
childOperatorsArray[i].processGroup(childOperatorsTag[i]);
}
}
protected boolean allInitializedParentsAreClosed() {
if (parentOperators != null) {
for (Operator<? extends OperatorDesc> parent : parentOperators) {
if(parent==null){
continue;
}
if (isLogDebugEnabled) {
LOG.debug("allInitializedParentsAreClosed? parent.state = " + parent.state);
}
if (!(parent.state == State.CLOSE || parent.state == State.UNINIT)) {
return false;
}
}
}
return true;
}
// This close() function does not need to be synchronized
// since it is called by its parents' main thread, so no
// more than 1 thread should call this close() function.
public void close(boolean abort) throws HiveException {
if (state == State.CLOSE) {
return;
}
// check if all parents are finished
if (!allInitializedParentsAreClosed()) {
if (isLogDebugEnabled) {
LOG.debug("Not all parent operators are closed. Not closing.");
}
return;
}
// set state as CLOSE as long as all parents are closed
// state == CLOSE doesn't mean all children are also in state CLOSE
state = State.CLOSE;
if (isLogDebugEnabled) {
LOG.debug(id + " finished. closing... ");
}
abort |= abortOp.get();
// call the operator specific close routine
closeOp(abort);
reporter = null;
try {
logStats();
if (childOperators == null) {
return;
}
for (Operator<? extends OperatorDesc> op : childOperators) {
if (isLogDebugEnabled) {
LOG.debug("Closing child = " + op);
}
op.close(abort);
}
if (isLogDebugEnabled) {
LOG.debug(id + " Close done");
}
} catch (HiveException e) {
LOG.warn("Caught exception while closing operator: " + e.getMessage(), e);
throw e;
}
}
/**
* Operator specific close routine. Operators which inherents this class
* should overwrite this funtion for their specific cleanup routine.
*/
protected void closeOp(boolean abort) throws HiveException {
}
private boolean jobCloseDone = false;
// Operator specific logic goes here
public void jobCloseOp(Configuration conf, boolean success)
throws HiveException {
}
/**
* Unlike other operator interfaces which are called from map or reduce task,
* jobClose is called from the jobclient side once the job has completed.
*
* @param conf
* Configuration with with which job was submitted
* @param success
* whether the job was completed successfully or not
*/
public void jobClose(Configuration conf, boolean success)
throws HiveException {
// JobClose has already been performed on this operator
if (jobCloseDone) {
return;
}
jobCloseOp(conf, success);
jobCloseDone = true;
if (childOperators != null) {
for (Operator<? extends OperatorDesc> op : childOperators) {
op.jobClose(conf, success);
}
}
}
/**
* Cache childOperators in an array for faster access. childOperatorsArray is
* accessed per row, so it's important to make the access efficient.
*/
protected transient Operator<? extends OperatorDesc>[] childOperatorsArray = null;
protected transient int[] childOperatorsTag;
/**
* Replace one child with another at the same position. The parent of the
* child is not changed
*
* @param child
* the old child
* @param newChild
* the new child
*/
public void replaceChild(Operator<? extends OperatorDesc> child,
Operator<? extends OperatorDesc> newChild) {
int childIndex = childOperators.indexOf(child);
assert childIndex != -1;
childOperators.set(childIndex, newChild);
}
public void removeChild(Operator<? extends OperatorDesc> child) {
int childIndex = childOperators.indexOf(child);
assert childIndex != -1;
if (childOperators.size() == 1) {
setChildOperators(null);
} else {
childOperators.remove(childIndex);
}
int parentIndex = child.getParentOperators().indexOf(this);
assert parentIndex != -1;
if (child.getParentOperators().size() == 1) {
child.setParentOperators(null);
} else {
child.getParentOperators().remove(parentIndex);
}
}
/**
* Remove a child and add all of the child's children to the location of the child
*
* @param child If this operator is not the only parent of the child. There can be unpredictable result.
* @throws SemanticException
*/
public void removeChildAndAdoptItsChildren(
Operator<? extends OperatorDesc> child) throws SemanticException {
int childIndex = childOperators.indexOf(child);
if (childIndex == -1) {
throw new SemanticException(
"Exception when trying to remove partition predicates: fail to find child from parent");
}
childOperators.remove(childIndex);
if (child.getChildOperators() != null &&
child.getChildOperators().size() > 0) {
childOperators.addAll(childIndex, child.getChildOperators());
}
for (Operator<? extends OperatorDesc> gc : child.getChildOperators()) {
List<Operator<? extends OperatorDesc>> parents = gc.getParentOperators();
int index = parents.indexOf(child);
if (index == -1) {
throw new SemanticException(
"Exception when trying to remove partition predicates: fail to find parent from child");
}
parents.set(index, this);
}
}
public void removeParent(Operator<? extends OperatorDesc> parent) {
int parentIndex = parentOperators.indexOf(parent);
assert parentIndex != -1;
if (parentOperators.size() == 1) {
setParentOperators(null);
} else {
parentOperators.remove(parentIndex);
}
int childIndex = parent.getChildOperators().indexOf(this);
assert childIndex != -1;
if (parent.getChildOperators().size() == 1) {
parent.setChildOperators(null);
} else {
parent.getChildOperators().remove(childIndex);
}
}
/**
* Replace one parent with another at the same position. Chilren of the new
* parent are not updated
*
* @param parent
* the old parent
* @param newParent
* the new parent
*/
public void replaceParent(Operator<? extends OperatorDesc> parent,
Operator<? extends OperatorDesc> newParent) {
int parentIndex = parentOperators.indexOf(parent);
assert parentIndex != -1;
parentOperators.set(parentIndex, newParent);
}
protected long getNextCntr(long cntr) {
// A very simple counter to keep track of number of rows processed by an
// operator. It dumps
// every 1 million times, and quickly before that
if (cntr >= 1000000) {
return cntr + 1000000;
}
return 10 * cntr;
}
protected void forward(Object row, ObjectInspector rowInspector)
throws HiveException {
if (getDone()) {
return;
}
int childrenDone = 0;
for (int i = 0; i < childOperatorsArray.length; i++) {
Operator<? extends OperatorDesc> o = childOperatorsArray[i];
if (o.getDone()) {
childrenDone++;
} else {
o.process(row, childOperatorsTag[i]);
}
}
// if all children are done, this operator is also done
if (childrenDone != 0 && childrenDone == childOperatorsArray.length) {
setDone(true);
}
}
public void resetStats() {
for (String e : statsMap.keySet()) {
statsMap.get(e).set(0L);
}
}
public void reset(){
this.state=State.INIT;
if (childOperators != null) {
for (Operator<? extends OperatorDesc> o : childOperators) {
o.reset();
}
}
}
/**
* OperatorFunc.
*
*/
public static interface OperatorFunc {
void func(Operator<? extends OperatorDesc> op);
}
public void preorderMap(OperatorFunc opFunc) {
opFunc.func(this);
if (childOperators != null) {
for (Operator<? extends OperatorDesc> o : childOperators) {
o.preorderMap(opFunc);
}
}
}
public void logStats() {
if (isLogInfoEnabled && !statsMap.isEmpty()) {
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, LongWritable> e : statsMap.entrySet()) {
sb.append(e.getKey()).append(":").append(e.getValue()).append(", ");
}
LOG.info(sb.toString());
}
}
@Override
public abstract String getName();
static public String getOperatorName() {
return "OP";
}
/**
* Returns a map of output column name to input expression map Note that
* currently it returns only key columns for ReduceSink and GroupBy operators.
*
* @return null if the operator doesn't change columns
*/
public Map<String, ExprNodeDesc> getColumnExprMap() {
return colExprMap;
}
public void setColumnExprMap(Map<String, ExprNodeDesc> colExprMap) {
this.colExprMap = colExprMap;
}
private String getLevelString(int level) {
if (level == 0) {
return "\n";
}
StringBuilder s = new StringBuilder();
s.append("\n");
while (level > 0) {
s.append(" ");
level--;
}
return s.toString();
}
public String dump(int level) {
return dump(level, new HashSet<Integer>());
}
public String dump(int level, HashSet<Integer> seenOpts) {
if (seenOpts.contains(new Integer(id))) {
return null;
}
seenOpts.add(new Integer(id));
StringBuilder s = new StringBuilder();
String ls = getLevelString(level);
s.append(ls);
s.append("<" + getName() + ">");
s.append("Id =" + id);
if (childOperators != null) {
s.append(ls);
s.append(" <Children>");
for (Operator<? extends OperatorDesc> o : childOperators) {
s.append(o.dump(level + 2, seenOpts));
}
s.append(ls);
s.append(" <\\Children>");
}
if (parentOperators != null) {
s.append(ls);
s.append(" <Parent>");
for (Operator<? extends OperatorDesc> o : parentOperators) {
s.append("Id = " + o.id + " ");
s.append(o.dump(level, seenOpts));
}
s.append("<\\Parent>");
}
s.append(ls);
s.append("<\\" + getName() + ">");
return s.toString();
}
/**
* Initialize an array of ExprNodeEvaluator and return the result
* ObjectInspectors.
*/
protected static ObjectInspector[] initEvaluators(ExprNodeEvaluator<?>[] evals,
ObjectInspector rowInspector) throws HiveException {
ObjectInspector[] result = new ObjectInspector[evals.length];
for (int i = 0; i < evals.length; i++) {
result[i] = evals[i].initialize(rowInspector);
}
return result;
}
/**
* Initialize an array of ExprNodeEvaluator from start, for specified length
* and return the result ObjectInspectors.
*/
protected static ObjectInspector[] initEvaluators(ExprNodeEvaluator<?>[] evals,
int start, int length,
ObjectInspector rowInspector) throws HiveException {
ObjectInspector[] result = new ObjectInspector[length];
for (int i = 0; i < length; i++) {
result[i] = evals[start + i].initialize(rowInspector);
}
return result;
}
/**
* Initialize an array of ExprNodeEvaluator and put the return values into a
* StructObjectInspector with integer field names.
*/
protected static StructObjectInspector initEvaluatorsAndReturnStruct(
ExprNodeEvaluator<?>[] evals, List<String> outputColName,
ObjectInspector rowInspector) throws HiveException {
ObjectInspector[] fieldObjectInspectors = initEvaluators(evals,
rowInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(
outputColName, Arrays.asList(fieldObjectInspectors));
}
protected transient Object groupKeyObject;
public String getOperatorId() {
return operatorId;
}
public void initOperatorId() {
setOperatorId(getName() + "_" + this.id);
}
public void setOperatorId(String operatorId) {
this.operatorId = operatorId;
}
/*
* By default, the list is empty - if an operator wants to add more counters,
* it should override this method and provide the new list. Counter names returned
* by this method should be wrapped counter names (i.e the strings should be passed
* through getWrappedCounterName).
*/
protected List<String> getAdditionalCounters() {
return null;
}
/**
* Return the type of the specific operator among the
* types in OperatorType.
*
* @return OperatorType.*
*/
abstract public OperatorType getType();
public void setGroupKeyObject(Object keyObject) {
this.groupKeyObject = keyObject;
}
public Object getGroupKeyObject() {
return groupKeyObject;
}
/**
* Called during semantic analysis as operators are being added
* in order to give them a chance to compute any additional plan information
* needed. Does nothing by default.
*/
public void augmentPlan() {
}
public ExecMapperContext getExecContext() {
return execContext;
}
public void setExecContext(ExecMapperContext execContext) {
this.execContext = execContext;
}
// The input file has changed - every operator can invoke specific action
// for each input file
public void cleanUpInputFileChanged() throws HiveException {
this.cleanUpInputFileChangedOp();
if(this.childOperators != null) {
for (int i = 0; i<this.childOperators.size();i++) {
Operator<? extends OperatorDesc> op = this.childOperators.get(i);
op.cleanUpInputFileChanged();
}
}
}
// If a operator needs to invoke specific cleanup, that operator can override
// this method
public void cleanUpInputFileChangedOp() throws HiveException {
}
// called by map operator. propagated recursively to single parented descendants
public void setInputContext(String tableName, String partitionName) {
if (childOperators != null) {
for (Operator<? extends OperatorDesc> child : childOperators) {
if (child.getNumParent() == 1) {
child.setInputContext(tableName, partitionName);
}
}
}
}
public boolean supportSkewJoinOptimization() {
return false;
}
@Override
public Operator<? extends OperatorDesc> clone()
throws CloneNotSupportedException {
List<Operator<? extends OperatorDesc>> parents = getParentOperators();
List<Operator<? extends OperatorDesc>> parentClones =
new ArrayList<Operator<? extends OperatorDesc>>();
if (parents != null) {
for (Operator<? extends OperatorDesc> parent : parents) {
parentClones.add((parent.clone()));
}
}
@SuppressWarnings("unchecked")
T descClone = (T)conf.clone();
// also clone the colExprMap by default
Operator<? extends OperatorDesc> ret = OperatorFactory.getAndMakeChild(
cContext, descClone, getSchema(), getColumnExprMap(), parentClones);
return ret;
}
/**
* Clones only the operator. The children and parent are set
* to null.
* @return Cloned operator
* @throws CloneNotSupportedException
*/
@SuppressWarnings("unchecked")
public Operator<? extends OperatorDesc> cloneOp() throws CloneNotSupportedException {
T descClone = (T) conf.clone();
Operator<? extends OperatorDesc> ret =
OperatorFactory.getAndMakeChild(cContext, descClone, getSchema());
return ret;
}
/**
* Recursively clones all the children of the tree,
* Fixes the pointers to children, parents and the pointers to itself coming from the children.
* It does not fix the pointers to itself coming from parents, parents continue to point to
* the original child.
* @return Cloned operator
* @throws CloneNotSupportedException
*/
public Operator<? extends OperatorDesc> cloneRecursiveChildren()
throws CloneNotSupportedException {
Operator<? extends OperatorDesc> newOp = this.cloneOp();
newOp.setParentOperators(this.parentOperators);
List<Operator<? extends OperatorDesc>> newChildren =
new ArrayList<Operator<? extends OperatorDesc>>();
for (Operator<? extends OperatorDesc> childOp : this.getChildOperators()) {
List<Operator<? extends OperatorDesc>> parentList =
new ArrayList<Operator<? extends OperatorDesc>>();
for (Operator<? extends OperatorDesc> parent : childOp.getParentOperators()) {
if (parent.equals(this)) {
parentList.add(newOp);
} else {
parentList.add(parent);
}
}
// Recursively clone the children
Operator<? extends OperatorDesc> clonedChildOp = childOp.cloneRecursiveChildren();
clonedChildOp.setParentOperators(parentList);
}
newOp.setChildOperators(newChildren);
return newOp;
}
/*
* True only for operators which produce atmost 1 output row per input
* row to it. This will allow the output column names to be directly
* translated to input column names.
*/
public boolean columnNamesRowResolvedCanBeObtained() {
return false;
}
public boolean isUseBucketizedHiveInputFormat() {
return useBucketizedHiveInputFormat;
}
/**
* Before setting this to {@code true} make sure it's not reading ACID tables
* @param useBucketizedHiveInputFormat
*/
public void setUseBucketizedHiveInputFormat(boolean useBucketizedHiveInputFormat) {
this.useBucketizedHiveInputFormat = useBucketizedHiveInputFormat;
}
/**
* Whether this operator supports automatic sort merge join.
* The stack is traversed, and this method is invoked for all the operators.
* @return TRUE if yes, FALSE otherwise.
*/
public boolean supportAutomaticSortMergeJoin() {
return false;
}
public boolean supportUnionRemoveOptimization() {
return false;
}
/*
* This operator is allowed before mapjoin. Eventually, mapjoin hint should be done away with.
* But, since bucketized mapjoin and sortmerge join depend on it completely. it is needed.
* Check the operators which are allowed before mapjoin.
*/
public boolean opAllowedBeforeMapJoin() {
return true;
}
/*
* This operator is allowed after mapjoin. Eventually, mapjoin hint should be done away with.
* But, since bucketized mapjoin and sortmerge join depend on it completely. it is needed.
* Check the operators which are allowed after mapjoin.
*/
public boolean opAllowedAfterMapJoin() {
return true;
}
/*
* If this task contains a join, it can be converted to a map-join task if this operator is
* present in the mapper. For eg. if a sort-merge join operator is present followed by a regular
* join, it cannot be converted to a auto map-join.
*/
public boolean opAllowedConvertMapJoin() {
return true;
}
/*
* If this task contains a sortmergejoin, it can be converted to a map-join task if this operator
* is present in the mapper. For eg. if a sort-merge join operator is present followed by a
* regular join, it cannot be converted to a auto map-join.
*/
public boolean opAllowedBeforeSortMergeJoin() {
return true;
}
/**
* used for LimitPushdownOptimizer
*
* if all of the operators between limit and reduce-sink does not remove any input rows
* in the range of limit count, limit can be pushed down to reduce-sink operator.
* forward, select, etc.
*/
public boolean acceptLimitPushdown() {
return false;
}
@Override
public String toString() {
return getName() + "[" + getIdentifier() + "]";
}
public static String toString(Collection<TableScanOperator> top) {
StringBuilder builder = new StringBuilder();
Set<String> visited = new HashSet<String>();
for (Operator<?> op : top) {
if (builder.length() > 0) {
builder.append('\n');
}
toString(builder, visited, op, 0);
}
return builder.toString();
}
static boolean toString(StringBuilder builder, Set<String> visited, Operator<?> op, int start) {
String name = op.toString();
boolean added = visited.add(name);
if (start > 0) {
builder.append("-");
start++;
}
builder.append(name);
start += name.length();
if (added) {
if (op.getNumChild() > 0) {
List<Operator<?>> children = op.getChildOperators();
for (int i = 0; i < children.size(); i++) {
if (i > 0) {
builder.append('\n');
for (int j = 0; j < start; j++) {
builder.append(' ');
}
}
toString(builder, visited, children.get(i), start);
}
}
return true;
}
return false;
}
public Statistics getStatistics() {
if (conf != null) {
return conf.getStatistics();
}
return null;
}
public OpTraits getOpTraits() {
if (conf != null) {
return conf.getTraits();
}
return null;
}
public void setOpTraits(OpTraits metaInfo) {
if (isLogDebugEnabled) {
LOG.debug("Setting traits (" + metaInfo + ") on " + this);
}
if (conf != null) {
conf.setTraits(metaInfo);
} else {
LOG.warn("Cannot set traits when there's no descriptor: " + this);
}
}
public void setStatistics(Statistics stats) {
if (isLogDebugEnabled) {
LOG.debug("Setting stats (" + stats + ") on " + this);
}
if (conf != null) {
conf.setStatistics(stats);
} else {
LOG.warn("Cannot set stats when there's no descriptor: " + this);
}
}
@SuppressWarnings("rawtypes")
public static Operator createDummy() {
return new DummyOperator();
}
@SuppressWarnings({ "serial", "unchecked", "rawtypes" })
private static class DummyOperator extends Operator {
public DummyOperator() { super("dummy", null); }
@Override
public void process(Object row, int tag) {
}
@Override
public OperatorType getType() {
return null;
}
@Override
public String getName() {
return DummyOperator.getOperatorName();
}
public static String getOperatorName() {
return "DUMMY";
}
@Override
protected void initializeOp(Configuration conf) {
}
}
public void removeParents() {
for (Operator<?> parent : new ArrayList<Operator<?>>(getParentOperators())) {
removeParent(parent);
}
}
public boolean getIsReduceSink() {
return false;
}
public String getReduceOutputName() {
return null;
}
public void setCompilationOpContext(CompilationOpContext ctx) {
cContext = ctx;
}
/** @return Compilation operator context. Only available during compilation. */
public CompilationOpContext getCompilationOpContext() {
return cContext;
}
}