blob: 7b2b945bed0cd09219978d12012516c88fb53a5d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.newplan.logical.relational;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.streaming.ExecutableManager;
import org.apache.pig.impl.streaming.StreamingCommand;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.PlanVisitor;
public class LOStream extends LogicalRelationalOperator {
private LogicalSchema scriptSchema;
private static final long serialVersionUID = 2L;
//private static Log log = LogFactory.getLog(LOFilter.class);
// the StreamingCommand object for the
// Stream Operator this operator represents
private StreamingCommand command;
transient private ExecutableManager executableManager;
private LogicalSchema uidOnlySchema;
private boolean castInserted = false;
public LOStream(LogicalPlan plan, ExecutableManager exeManager, StreamingCommand cmd, LogicalSchema schema) {
super("LOStream", plan);
command = cmd;
executableManager = exeManager;
scriptSchema = schema;
}
/**
* Get the StreamingCommand object associated
* with this operator
*
* @return the StreamingCommand object
*/
public StreamingCommand getStreamingCommand() {
return command;
}
/**
* @return the ExecutableManager
*/
public ExecutableManager getExecutableManager() {
return executableManager;
}
@Override
public LogicalSchema getSchema() throws FrontendException {
if (schema!=null)
return schema;
if (isCastInserted()) {
schema = new LogicalSchema();
for (int i=0;i<scriptSchema.size();i++) {
LogicalSchema.LogicalFieldSchema fs = scriptSchema.getField(i).deepCopy();
fs.type = DataType.BYTEARRAY;
schema.addField(fs);
}
} else {
if (scriptSchema!=null)
schema = scriptSchema.deepCopy();
}
if (schema!=null)
uidOnlySchema = schema.mergeUid(uidOnlySchema);
return schema;
}
@Override
public void accept(PlanVisitor v) throws FrontendException {
if (!(v instanceof LogicalRelationalNodesVisitor)) {
throw new FrontendException("Expected LogicalPlanVisitor", 2223);
}
((LogicalRelationalNodesVisitor)v).visit(this);
}
@Override
public boolean isEqual(Operator other) throws FrontendException {
if (other != null && other instanceof LOStream) {
return checkEquality((LogicalRelationalOperator)other);
} else {
return false;
}
}
public void setCastInserted(boolean flag) {
castInserted = flag;
}
public boolean isCastInserted() {
return castInserted;
}
@Override
public void resetUid() {
uidOnlySchema = null;
}
public LogicalSchema getScriptSchema() {
return scriptSchema;
}
}