blob: dd095ddd239e42d1e670f4978bb90ea59420379d [file] [log] [blame]
package org.apache.hawq.pxf.service;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.apache.hawq.pxf.api.BadRecordException;
import org.apache.hawq.pxf.api.OneRow;
import org.apache.hawq.pxf.api.ReadAccessor;
import org.apache.hawq.pxf.api.ReadResolver;
import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.api.utilities.Plugin;
import org.apache.hawq.pxf.api.utilities.Utilities;
import org.apache.hawq.pxf.service.io.Writable;
import org.apache.hawq.pxf.service.utilities.ProtocolData;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.*;
import java.nio.charset.CharacterCodingException;
import java.util.LinkedList;
import java.util.zip.ZipException;
/**
* ReadBridge class creates appropriate accessor and resolver. It will then
* create the correct output conversion class (e.g. Text or GPDBWritable) and
* get records from accessor, let resolver deserialize them and reserialize them
* using the output conversion class. <br>
* The class handles BadRecordException and other exception type and marks the
* record as invalid for HAWQ.
*/
public class ReadBridge implements Bridge {
ReadAccessor fileAccessor = null;
ReadResolver fieldsResolver = null;
BridgeOutputBuilder outputBuilder = null;
LinkedList<Writable> outputQueue = null;
private static final Log LOG = LogFactory.getLog(ReadBridge.class);
/**
* C'tor - set the implementation of the bridge.
*
* @param protData input containing accessor and resolver names
* @throws Exception if accessor or resolver can't be instantiated
*/
public ReadBridge(ProtocolData protData) throws Exception {
outputBuilder = new BridgeOutputBuilder(protData);
outputQueue = new LinkedList<Writable>();
fileAccessor = getFileAccessor(protData);
fieldsResolver = getFieldsResolver(protData);
}
/**
* Accesses the underlying HDFS file.
*/
@Override
public boolean beginIteration() throws Exception {
return fileAccessor.openForRead();
}
/**
* Fetches next object from file and turn it into a record that the HAWQ
* backend can process.
*/
@Override
public Writable getNext() throws Exception {
Writable output = null;
OneRow onerow = null;
if (!outputQueue.isEmpty()) {
return outputQueue.pop();
}
try {
while (outputQueue.isEmpty()) {
onerow = fileAccessor.readNextObject();
if (onerow == null) {
output = outputBuilder.getPartialLine();
if (output != null) {
LOG.warn("A partial record in the end of the fragment");
}
// if there is a partial line, return it now, otherwise it
// will return null
return output;
}
// we checked before that outputQueue is empty, so we can
// override it.
outputQueue = outputBuilder.makeOutput(fieldsResolver.getFields(onerow));
if (!outputQueue.isEmpty()) {
output = outputQueue.pop();
break;
}
}
} catch (IOException ex) {
if (!isDataException(ex)) {
throw ex;
}
output = outputBuilder.getErrorOutput(ex);
} catch (BadRecordException ex) {
String row_info = "null";
if (onerow != null) {
row_info = onerow.toString();
}
if (ex.getCause() != null) {
LOG.debug("BadRecordException " + ex.getCause().toString()
+ ": " + row_info);
} else {
LOG.debug(ex.toString() + ": " + row_info);
}
output = outputBuilder.getErrorOutput(ex);
} catch (Exception ex) {
throw ex;
}
return output;
}
/**
* Close the underlying resource
*/
public void endIteration() throws Exception {
try {
fileAccessor.closeForRead();
} catch (Exception e) {
LOG.error("Failed to close bridge resources: " + e.getMessage());
throw e;
}
}
public static ReadAccessor getFileAccessor(InputData inputData)
throws Exception {
return (ReadAccessor) Utilities.createAnyInstance(InputData.class,
inputData.getAccessor(), inputData);
}
public static ReadResolver getFieldsResolver(InputData inputData)
throws Exception {
return (ReadResolver) Utilities.createAnyInstance(InputData.class,
inputData.getResolver(), inputData);
}
/*
* There are many exceptions that inherit IOException. Some of them like
* EOFException are generated due to a data problem, and not because of an
* IO/connection problem as the father IOException might lead us to believe.
* For example, an EOFException will be thrown while fetching a record from
* a sequence file, if there is a formatting problem in the record. Fetching
* record from the sequence-file is the responsibility of the accessor so
* the exception will be thrown from the accessor. We identify this cases by
* analyzing the exception type, and when we discover that the actual
* problem was a data problem, we return the errorOutput GPDBWritable.
*/
protected boolean isDataException(IOException ex) {
return (ex instanceof EOFException
|| ex instanceof CharacterCodingException
|| ex instanceof CharConversionException
|| ex instanceof UTFDataFormatException || ex instanceof ZipException);
}
@Override
public boolean setNext(DataInputStream inputStream) {
throw new UnsupportedOperationException("setNext is not implemented");
}
@Override
public boolean isThreadSafe() {
boolean result = ((Plugin) fileAccessor).isThreadSafe()
&& ((Plugin) fieldsResolver).isThreadSafe();
LOG.debug("Bridge is " + (result ? "" : "not ") + "thread safe");
return result;
}
}