blob: b8525fe84ceb046ca4af4775dd73df37e6ed2c95 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.lib;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Reader;
import java.io.Writer;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.cloudera.sqoop.io.LobFile;
import com.cloudera.sqoop.util.TaskId;
/**
* Contains a set of methods which can read db columns from a ResultSet into
* Java types, and do serialization of these types to/from DataInput/DataOutput
* for use with Hadoop's Writable implementation. This supports null values
* for all types.
*
* This is a singleton instance class; only one may exist at a time.
* However, its lifetime is limited to the current TaskInputOutputContext's
* life.
*/
public class LargeObjectLoader implements Closeable {
public static final Log LOG = LogFactory.getLog(LargeObjectLoader.class.getName());
// Spill to external storage for BLOB/CLOB objects > 16 MB.
public static final long DEFAULT_MAX_LOB_LENGTH = 16 * 1024 * 1024;
public static final String MAX_INLINE_LOB_LEN_KEY =
"sqoop.inline.lob.length.max";
private Configuration conf;
private Path workPath;
private FileSystem fs;
// Handles to the open BLOB / CLOB file writers.
private LobFile.Writer curBlobWriter;
private LobFile.Writer curClobWriter;
// Counter that is used with the current task attempt id to
// generate unique LOB file names.
private long nextLobFileId = 0;
/**
* Create a new LargeObjectLoader.
* @param conf the Configuration to use
*/
public LargeObjectLoader(Configuration conf, Path workPath)
throws IOException {
this.conf = conf;
this.workPath = workPath;
this.fs = workPath.getFileSystem(conf);
this.curBlobWriter = null;
this.curClobWriter = null;
}
@Override
protected synchronized void finalize() throws Throwable {
close();
super.finalize();
}
@Override
public void close() throws IOException {
if (null != curBlobWriter) {
curBlobWriter.close();
curBlobWriter = null;
}
if (null != curClobWriter) {
curClobWriter.close();
curClobWriter = null;
}
}
/**
* @return a filename to use to put an external LOB in.
*/
private String getNextLobFileName() {
String file = "_lob/large_obj_" + TaskId.get(conf, "unknown_task_id")
+ nextLobFileId + ".lob";
nextLobFileId++;
return file;
}
/**
* Calculates a path to a new LobFile object, creating any
* missing directories.
* @return a Path to a LobFile to write
*/
private Path getNextLobFilePath() throws IOException {
Path p = new Path(workPath, getNextLobFileName());
Path parent = p.getParent();
LOG.info("Using lob file: " + p.toString());
if (!fs.exists(parent)) {
fs.mkdirs(parent);
}
return p;
}
/**
* @return the current LobFile writer for BLOBs, creating one if necessary.
*/
private LobFile.Writer getBlobWriter() throws IOException {
if (null == this.curBlobWriter) {
this.curBlobWriter = LobFile.create(getNextLobFilePath(), conf, false);
}
return this.curBlobWriter;
}
/**
* @return the current LobFile writer for CLOBs, creating one if necessary.
*/
private LobFile.Writer getClobWriter() throws IOException {
if (null == this.curClobWriter) {
this.curClobWriter = LobFile.create(getNextLobFilePath(), conf, true);
}
return this.curClobWriter;
}
/**
* Returns the path being written to by a given LobFile.Writer, relative
* to the working directory of this LargeObjectLoader.
* @param w the LobFile.Writer whose path should be examined.
* @return the path this is writing to, relative to the current working dir.
*/
private String getRelativePath(LobFile.Writer w) {
Path writerPath = w.getPath();
String writerPathStr = writerPath.toString();
String workPathStr = workPath.toString();
if (!workPathStr.endsWith(File.separator)) {
workPathStr = workPathStr + File.separator;
}
if (writerPathStr.startsWith(workPathStr)) {
return writerPathStr.substring(workPathStr.length());
}
// Outside the working dir; return the whole thing.
return writerPathStr;
}
/**
* Copies all character data from the provided Reader to the provided
* Writer. Does not close handles when it's done.
* @param reader data source
* @param writer data sink
* @throws IOException if an I/O error occurs either reading or writing.
*/
private void copyAll(Reader reader, Writer writer) throws IOException {
int bufferSize = conf.getInt("io.file.buffer.size",
4096);
char [] buf = new char[bufferSize];
while (true) {
int charsRead = reader.read(buf);
if (-1 == charsRead) {
break; // no more stream to read.
}
writer.write(buf, 0, charsRead);
}
}
/**
* Copies all byte data from the provided InputStream to the provided
* OutputStream. Does not close handles when it's done.
* @param input data source
* @param output data sink
* @throws IOException if an I/O error occurs either reading or writing.
*/
private void copyAll(InputStream input, OutputStream output)
throws IOException {
int bufferSize = conf.getInt("io.file.buffer.size",
4096);
byte [] buf = new byte[bufferSize];
while (true) {
int bytesRead = input.read(buf, 0, bufferSize);
if (-1 == bytesRead) {
break; // no more stream to read.
}
output.write(buf, 0, bytesRead);
}
}
/**
* Actually read a BlobRef instance from the ResultSet and materialize
* the data either inline or to a file.
*
* @param colNum the column of the ResultSet's current row to read.
* @param r the ResultSet to read from.
* @return a BlobRef encapsulating the data in this field.
* @throws IOException if an error occurs writing to the FileSystem.
* @throws SQLException if an error occurs reading from the database.
*/
public com.cloudera.sqoop.lib.BlobRef readBlobRef(int colNum, ResultSet r)
throws IOException, InterruptedException, SQLException {
long maxInlineLobLen = conf.getLong(
MAX_INLINE_LOB_LEN_KEY,
DEFAULT_MAX_LOB_LENGTH);
Blob b = r.getBlob(colNum);
if (null == b) {
return null;
} else if (b.length() > maxInlineLobLen) {
// Deserialize very large BLOBs into separate files.
long len = b.length();
LobFile.Writer lobWriter = getBlobWriter();
long recordOffset = lobWriter.tell();
InputStream is = null;
OutputStream os = lobWriter.writeBlobRecord(len);
try {
is = b.getBinaryStream();
copyAll(is, os);
} finally {
if (null != os) {
os.close();
}
if (null != is) {
is.close();
}
// Mark the record as finished.
lobWriter.finishRecord();
}
return new com.cloudera.sqoop.lib.BlobRef(
getRelativePath(curBlobWriter), recordOffset, len);
} else {
// This is a 1-based array.
return new com.cloudera.sqoop.lib.BlobRef(
b.getBytes(1, (int) b.length()));
}
}
/**
* Actually read a ClobRef instance from the ResultSet and materialize
* the data either inline or to a file.
*
* @param colNum the column of the ResultSet's current row to read.
* @param r the ResultSet to read from.
* @return a ClobRef encapsulating the data in this field.
* @throws IOException if an error occurs writing to the FileSystem.
* @throws SQLException if an error occurs reading from the database.
*/
public com.cloudera.sqoop.lib.ClobRef readClobRef(int colNum, ResultSet r)
throws IOException, InterruptedException, SQLException {
long maxInlineLobLen = conf.getLong(
MAX_INLINE_LOB_LEN_KEY,
DEFAULT_MAX_LOB_LENGTH);
Clob c = r.getClob(colNum);
if (null == c) {
return null;
} else if (c.length() > maxInlineLobLen) {
// Deserialize large CLOB into separate file.
long len = c.length();
LobFile.Writer lobWriter = getClobWriter();
long recordOffset = lobWriter.tell();
Reader reader = null;
Writer w = lobWriter.writeClobRecord(len);
try {
reader = c.getCharacterStream();
copyAll(reader, w);
} finally {
if (null != w) {
w.close();
}
if (null != reader) {
reader.close();
}
// Mark the record as finished.
lobWriter.finishRecord();
}
return new com.cloudera.sqoop.lib.ClobRef(
getRelativePath(lobWriter), recordOffset, len);
} else {
// This is a 1-based array.
return new com.cloudera.sqoop.lib.ClobRef(
c.getSubString(1, (int) c.length()));
}
}
}