blob: 78c46656cdffe25fb407e302a35e35efb94b7419 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.mapreduce.mainframe;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.sqoop.lib.SqoopRecord;
import org.apache.sqoop.util.MainframeFTPClientUtils;
/**
* A RecordReader that returns a record from a mainframe dataset.
*/
public class MainframeDatasetFTPRecordReader <T extends SqoopRecord>
extends MainframeDatasetRecordReader<T> {
private FTPClient ftp = null;
private BufferedReader datasetReader = null;
private BufferedInputStream inputStream = null;
private static final Log LOG = LogFactory.getLog(
MainframeDatasetFTPRecordReader.class.getName());
@Override
public void initialize(InputSplit inputSplit,
TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
super.initialize(inputSplit, taskAttemptContext);
Configuration conf = getConfiguration();
ftp = MainframeFTPClientUtils.getFTPConnection(conf);
initialize(ftp,conf);
}
public void initialize(FTPClient ftpClient, Configuration conf)
throws IOException {
ftp = ftpClient;
if (ftp != null) {
String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE);
MainframeDatasetPath p = null;
try {
p = new MainframeDatasetPath(dsName,conf);
} catch (Exception e) {
LOG.error(e.getMessage());
LOG.error("MainframeDatasetPath helper class incorrectly initialised");
e.printStackTrace();
}
if (dsType != null && p != null) {
dsName = p.getMainframeDatasetFolder();
}
ftp.changeWorkingDirectory("'" + dsName + "'");
}
}
@Override
public void close() throws IOException {
if (datasetReader != null) {
datasetReader.close();
}
if (ftp != null) {
MainframeFTPClientUtils.closeFTPConnection(ftp);
}
}
protected boolean getNextRecord(T sqoopRecord) throws IOException {
String line = null;
Configuration conf = getConfiguration();
if (MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE_BINARY.equals(conf.get(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_MODE))) {
return getNextBinaryRecord(sqoopRecord);
}
try {
do {
if (datasetReader == null) {
String dsName = getNextDataset();
if (dsName == null) {
break;
}
datasetReader = new BufferedReader(new InputStreamReader(
ftp.retrieveFileStream(dsName)));
}
line = datasetReader.readLine();
if (line == null) {
datasetReader.close();
datasetReader = null;
if (!ftp.completePendingCommand()) {
throw new IOException("Failed to complete ftp command.");
} else {
LOG.info("Data transfer completed.");
}
}
} while(line == null);
} catch (IOException ioe) {
throw new IOException("IOException during data transfer: " +
ioe.toString());
}
if (line != null) {
convertToSqoopRecord(line, (SqoopRecord)sqoopRecord);
return true;
}
return false;
}
protected boolean getNextBinaryRecord(T sqoopRecord) throws IOException {
Configuration conf = getConfiguration();
// typical estimated max size for mainframe record
int BUFFER_SIZE = MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE;
if (conf != null) {
BUFFER_SIZE = conf.getInt(MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_BUFFER_SIZE, MainframeConfiguration.MAINFRAME_FTP_TRANSFER_BINARY_DEFAULT_BUFFER_SIZE);
}
byte[] buf = new byte[BUFFER_SIZE];
int bytesRead = -1;
int cumulativeBytesRead = 0;
try {
Boolean streamInited = initInputStream(BUFFER_SIZE);
if (!streamInited) {
LOG.info("No more datasets to process.");
return false;
}
do {
bytesRead = inputStream.read(buf,cumulativeBytesRead,BUFFER_SIZE-cumulativeBytesRead);
if (bytesRead == -1) {
// EOF
closeFtpInputStream();
LOG.info("Data transfer completed.");
return writeBytesToSqoopRecord(buf,cumulativeBytesRead,sqoopRecord);
}
cumulativeBytesRead += bytesRead;
if (cumulativeBytesRead == BUFFER_SIZE) {
return writeBytesToSqoopRecord(buf,cumulativeBytesRead,sqoopRecord);
}
} while (bytesRead != -1);
} catch (IOException ioe) {
throw new IOException("IOException during data transfer: " + ioe);
}
return false;
}
protected Boolean initInputStream(int bufferSize) throws IOException {
if (inputStream == null) {
String dsName = getNextDataset();
if (dsName == null) {
LOG.info("No more datasets to process. Returning.");
return false;
}
LOG.info("Attempting to retrieve file stream for: "+dsName);
LOG.info("Buffer size: "+bufferSize);
inputStream = new BufferedInputStream(ftp.retrieveFileStream(dsName));
if (inputStream == null) {
throw new IOException("Failed to retrieve FTP file stream.");
}
}
return true;
}
protected void closeFtpInputStream() throws IOException {
inputStream.close();
inputStream = null;
if (!ftp.completePendingCommand()) {
throw new IOException("Failed to complete ftp command. FTP Response: "+ftp.getReplyString());
}
}
protected Boolean writeBytesToSqoopRecord(byte[] buf, int cumulativeBytesRead, SqoopRecord sqoopRecord) {
if (cumulativeBytesRead <= 0) {
return false;
}
ByteBuffer buffer = ByteBuffer.allocate(cumulativeBytesRead);
buffer.put(buf,0,cumulativeBytesRead);
convertToSqoopRecord(buffer.array(), sqoopRecord);
return true;
}
private void convertToSqoopRecord(String line, SqoopRecord sqoopRecord) {
String fieldName
= sqoopRecord.getFieldMap().entrySet().iterator().next().getKey();
sqoopRecord.setField(fieldName, line);
}
private void convertToSqoopRecord(byte[] buf, SqoopRecord sqoopRecord) {
String fieldName
= sqoopRecord.getFieldMap().entrySet().iterator().next().getKey();
sqoopRecord.setField(fieldName, buf);
}
}