blob: d4a697f5b95fcabdc66f348943883410ca5baa30 [file] [log] [blame]
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
package org.trafodion.sql;
import java.io.IOException;
import java.io.FileNotFoundException;
import java.io.EOFException;
import java.io.OutputStream;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.PropertyConfigurator;
import org.apache.log4j.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.compress.CompressionInputStream;
import java.io.EOFException;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
//
// To read a range in a Hdfs file, use the constructor
// public HDFSClient(int bufNo, int rangeNo, String filename, ByteBuffer buffer, long position, int length, CompressionInputStream inStream)
//
// For instance methods like hdfsListDirectory use the constructor
// public HDFSClient()
//
// For all static methods use
// HDFSClient::<static_method_name>
//
public class HDFSClient
{
// Keep the constants and string array below in sync with
// enum CompressionMethod at sql/comexe/ComCompressionInfo.h
static final short UNKNOWN_COMPRESSION = 0;
static final short UNCOMPRESSED = 1;
static final short LZOP = 5;
static final String COMPRESSION_TYPE[] = {
"UNKNOWN_COMPRESSION", // unable to determine compression method
"UNCOMPRESSED", // file is not compressed
"LZO_DEFLATE", // using LZO deflate compression
"DEFLATE", // using DEFLATE compression
"GZIP", // using GZIP compression
"LZOP"}; // using LZOP compression
static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
private static Configuration config_ = null;
private static ExecutorService executorService_ = null;
private static FileSystem defaultFs_ = null;
private static CompressionCodecFactory codecFactory_ = null;
private static boolean alluxioNotInstalled_ = false;
private FileSystem fs_ = null;
private int bufNo_;
private int rangeNo_;
private FSDataInputStream fsdis_;
CompressionInputStream inStream_;
private OutputStream outStream_;
private String filename_;
private ByteBuffer buf_;
private ByteBuffer savedBuf_;
private byte[] bufArray_;
private int bufLen_;
private int bufOffset_ = 0;
private long pos_ = 0;
private int len_ = 0;
private int lenRemain_ = 0;
private int blockSize_;
private int bytesRead_;
private Future future_ = null;
private int isEOF_ = 0;
private int totalBytesWritten_ = 0;
private Path filepath_ = null;
boolean compressed_ = false;
private CompressionCodec codec_ = null;
private short compressionType_;
private int ioByteArraySizeInKB_;
static {
String confFile = System.getProperty("trafodion.log4j.configFile");
System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
if (confFile == null) {
confFile = System.getenv("TRAF_CONF") + "/log4j.sql.config";
}
PropertyConfigurator.configure(confFile);
config_ = TrafConfiguration.create(TrafConfiguration.HDFS_CONF);
executorService_ = Executors.newCachedThreadPool();
try {
defaultFs_ = FileSystem.get(config_);
}
catch (IOException ioe) {
throw new RuntimeException("Exception in HDFSClient static block", ioe);
}
try {
boolean alluxioFs = defaultFs_ instanceof alluxio.hadoop.FileSystem;
}
catch (Throwable rte)
{
// Ignore the exception. It is not needed for alluxio to be installed
// for the methods of this class to work if
// alluxio filesystem is NOT required
alluxioNotInstalled_ = true;
}
codecFactory_ = new CompressionCodecFactory(config_);
System.loadLibrary("executor");
}
// The object instance that runs in the threadpool to read
// the requested chunk in the range
// FSDataInputStream.read method may not read the requested length in one shot
// Loop to read the requested length or EOF is reached
// Requested length can never be larger than the buffer size
class HDFSRead implements Callable
{
HDFSRead()
{
}
public Object call() throws IOException
{
int bytesRead;
int totalBytesRead = 0;
if (compressed_) {
bufArray_ = new byte[ioByteArraySizeInKB_ * 1024];
}
else {
// alluxio doesn't support direct ByteBuffer reads
// Hence, create a non-direct ByteBuffer, read into
// byteArray backing up this ByteBuffer and
// then copy the data read to direct ByteBuffer for the
// native layer to process the data
if ((! alluxioNotInstalled_) && fs_ instanceof alluxio.hadoop.FileSystem) {
savedBuf_ = buf_;
buf_ = ByteBuffer.allocate(savedBuf_.capacity());
}
if (! buf_.hasArray()) {
try {
fsdis_.seek(pos_);
} catch (EOFException e) {
isEOF_ = 1;
return new Integer(totalBytesRead);
}
}
}
do
{
if (compressed_) {
bytesRead = compressedFileRead(lenRemain_);
} else {
if (buf_.hasArray())
bytesRead = fsdis_.read(pos_, buf_.array(), bufOffset_, lenRemain_);
else
bytesRead = fsdis_.read(buf_);
}
if (bytesRead == -1) {
isEOF_ = 1;
break;
}
if (bytesRead == 0)
break;
totalBytesRead += bytesRead;
if (totalBytesRead == bufLen_)
break;
bufOffset_ += bytesRead;
pos_ += bytesRead;
lenRemain_ -= bytesRead;
} while (lenRemain_ > 0);
if ((! alluxioNotInstalled_) && fs_ instanceof alluxio.hadoop.FileSystem) {
if (totalBytesRead > 0) {
byte[] temp = buf_.array();
savedBuf_.put(temp, 0, totalBytesRead);
}
}
return new Integer(totalBytesRead);
}
}
int compressedFileRead(int readLenRemain) throws IOException
{
int totalReadLen = 0;
int readLen;
int offset = 0;
int retcode;
int lenRemain = ((readLenRemain > bufArray_.length) ? bufArray_.length : readLenRemain);
do
{
readLen = inStream_.read(bufArray_, offset, lenRemain);
if (readLen == -1 || readLen == 0)
break;
totalReadLen += readLen;
offset += readLen;
lenRemain -= readLen;
} while (lenRemain > 0);
if (totalReadLen > 0) {
if ((retcode = copyToByteBuffer(buf_, bufOffset_, bufArray_, totalReadLen)) != 0)
throw new IOException("Failure to copy to the DirectByteBuffer in the native layer with error code " + retcode);
}
else
totalReadLen = -1;
return totalReadLen;
}
native int copyToByteBuffer(ByteBuffer buf, int bufOffset, byte[] bufArray, int copyLen);
public HDFSClient()
{
}
// This constructor enables the hdfs data to be read in another thread while the previously
// read buffer is being processed by the SQL engine
// Opens the file and hands over the needed info to HdfsRead instance to read
// The passed in length can never be more than the size of the buffer
// If the range has a length more than the buffer length, the range is chunked
// in HdfsScan
public HDFSClient(int bufNo, int ioByteArraySizeInKB, int rangeNo, String filename, ByteBuffer buffer, long position,
int length, short compressionType, CompressionInputStream inStream) throws IOException
{
bufNo_ = bufNo;
rangeNo_ = rangeNo;
filename_ = filename;
ioByteArraySizeInKB_ = ioByteArraySizeInKB;
filepath_ = new Path(filename_);
fs_ = FileSystem.get(filepath_.toUri(),config_);
compressionType_ = compressionType;
inStream_ = inStream;
codec_ = codecFactory_.getCodec(filepath_);
if (codec_ != null) {
compressed_ = true;
if (inStream_ == null)
inStream_ = codec_.createInputStream(fs_.open(filepath_));
}
else {
if ((compressionType_ != UNCOMPRESSED) && (compressionType_ != UNKNOWN_COMPRESSION))
throw new IOException(COMPRESSION_TYPE[compressionType_] + " compression codec is not configured in Hadoop");
if (filename_.endsWith(".lzo"))
throw new IOException(COMPRESSION_TYPE[LZOP] + " compression codec is not configured in Hadoop");
fsdis_ = fs_.open(filepath_);
}
blockSize_ = (int)fs_.getDefaultBlockSize(filepath_);
buf_ = buffer;
bufOffset_ = 0;
pos_ = position;
len_ = length;
if (buffer.hasArray())
bufLen_ = buffer.array().length;
else {
bufLen_ = buffer.capacity();
buf_.position(0);
}
lenRemain_ = (len_ > bufLen_) ? bufLen_ : len_;
if (lenRemain_ != 0) {
future_ = executorService_.submit(new HDFSRead());
}
}
// This method waits for the read to complete. Read can complete due to one of the following
// a) buffer is full
// b) EOF is reached
// c) An exception is encountered while reading the file
public int trafHdfsReadBuffer() throws IOException, InterruptedException, ExecutionException
{
Integer retObject = 0;
int bytesRead;
retObject = (Integer)future_.get();
bytesRead = retObject.intValue();
if (! compressed_)
fsdis_.close();
fsdis_ = null;
return bytesRead;
}
public int getRangeNo()
{
return rangeNo_;
}
public int isEOF()
{
return isEOF_;
}
boolean hdfsCreate(String fname , boolean overwrite, boolean compress) throws IOException
{
filename_ = fname;
if (logger_.isDebugEnabled())
logger_.debug("HDFSClient.hdfsCreate() - started" );
if (!compress || (compress && fname.endsWith(".gz")))
filepath_ = new Path(fname);
else
filepath_ = new Path(fname + ".gz");
fs_ = FileSystem.get(filepath_.toUri(),config_);
compressed_ = compress;
fsdis_ = null;
if (fs_.exists(filepath_))
{
if (overwrite)
fs_.delete(filepath_);
else
throw new IOException(filepath_ + " already exists");
}
FSDataOutputStream fsOut = null;
fsOut = fs_.create(filepath_);
fsOut.close();
return true;
}
boolean hdfsOpen(String fname , boolean compress) throws IOException
{
filename_ = fname;
if (logger_.isDebugEnabled())
logger_.debug("HDFSClient.hdfsOpen() - started" );
if (!compress || (compress && fname.endsWith(".gz")))
filepath_ = new Path(fname);
else
filepath_ = new Path(fname + ".gz");
fs_ = FileSystem.get(filepath_.toUri(),config_);
compressed_ = compress;
outStream_ = null;
fsdis_ = null;
return true;
}
long hdfsSize() throws IOException
{
FileStatus filestatus;
try
{
filestatus = fs_.getFileStatus(filepath_);
} catch (java.io.FileNotFoundException e)
{
return 0;
}
if (filestatus.isFile())
return filestatus.getLen();
else
return -1;
}
long hdfsWriteImmediate(byte[] buff) throws IOException
{
if (logger_.isDebugEnabled())
logger_.debug("HDFSClient.hdfsWriteClose() - started" );
FSDataOutputStream fsOut;
FileStatus filestatus;
long writeOffset;
if (fs_.exists(filepath_)) {
filestatus = fs_.getFileStatus(filepath_);
fsOut = fs_.append(filepath_);
writeOffset = filestatus.getLen();
}
else {
fsOut = fs_.create(filepath_);
writeOffset = 0;
}
fsOut.write(buff);
fsOut.close();
return writeOffset;
}
int hdfsWrite(byte[] buff) throws IOException
{
if (logger_.isDebugEnabled())
logger_.debug("HDFSClient.hdfsWrite() - started" );
FSDataOutputStream fsOut;
if (outStream_ == null) {
if (fs_.exists(filepath_))
fsOut = fs_.append(filepath_);
else
fsOut = fs_.create(filepath_);
if (compressed_) {
GzipCodec gzipCodec = (GzipCodec) ReflectionUtils.newInstance( GzipCodec.class, config_);
Compressor gzipCompressor = CodecPool.getCompressor(gzipCodec);
outStream_= gzipCodec.createOutputStream(fsOut, gzipCompressor);
}
else
outStream_ = fsOut;
if (logger_.isDebugEnabled())
logger_.debug("HDFSClient.hdfsWrite() - output stream created" );
}
outStream_.write(buff);
if (outStream_ instanceof FSDataOutputStream)
((FSDataOutputStream)outStream_).hsync();
if (logger_.isDebugEnabled())
logger_.debug("HDFSClient.hdfsWrite() - bytes written " + buff.length);
return buff.length;
}
int hdfsRead(long pos, ByteBuffer buffer) throws IOException
{
if (logger_.isDebugEnabled())
logger_.debug("HDFSClient.hdfsRead() - started" );
if (fsdis_ == null && inStream_ == null ) {
try {
codec_ = codecFactory_.getCodec(filepath_);
if (codec_ != null) {
compressed_ = true;
inStream_ = codec_.createInputStream(fs_.open(filepath_));
}
else
fsdis_ = fs_.open(filepath_);
} catch (java.io.FileNotFoundException e) {
return 0;
}
}
int lenRemain;
int bytesRead;
int totalBytesRead = 0;
int bufLen;
int bufOffset = 0;
if (compressed_) {
if (pos != 0 && pos != -1)
throw new IOException("Compressed files can be read from a non-zero position");
else
pos_ = 0;
}
else
if (pos != -1)
pos_ = pos;
if (compressed_ && bufArray_ != null)
bufArray_ = new byte[ioByteArraySizeInKB_ * 1024];
if (buffer.hasArray())
bufLen = buffer.array().length;
else
{
if (pos_ != -1)
fsdis_.seek(pos_);
bufLen = buffer.capacity();
}
lenRemain = bufLen;
do
{
if (compressed_) {
bytesRead = compressedFileRead(lenRemain);
} else {
if (buffer.hasArray())
bytesRead = fsdis_.read(pos_, buffer.array(), bufOffset, lenRemain);
else
bytesRead = fsdis_.read(buffer);
}
if (bytesRead == -1 || bytesRead == 0)
break;
totalBytesRead += bytesRead;
pos_ += bytesRead;
lenRemain -= bytesRead;
} while (lenRemain > 0);
return totalBytesRead;
}
boolean hdfsClose() throws IOException
{
if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsClose() - started" );
if (outStream_ != null) {
outStream_.close();
outStream_ = null;
}
if (fsdis_ != null)
fsdis_.close();
return true;
}
static long hdfsSize(String filename) throws IOException
{
Path filepath = new Path(filename);
FileSystem fs = FileSystem.get(filepath.toUri(),config_);
FileStatus filestatus;
try
{
filestatus = fs.getFileStatus(filepath);
} catch (java.io.FileNotFoundException e)
{
return 0;
}
if (filestatus.isFile())
return filestatus.getLen();
else
return -1;
}
public static boolean hdfsMergeFiles(String srcPathStr, String dstPathStr) throws IOException
{
if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsMergeFiles() - start");
if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsMergeFiles() - source Path: " + srcPathStr +
", destination File:" + dstPathStr );
Path srcPath = new Path(srcPathStr );
srcPath = srcPath.makeQualified(srcPath.toUri(), null);
FileSystem srcFs = FileSystem.get(srcPath.toUri(),config_);
Path dstPath = new Path(dstPathStr);
dstPath = dstPath.makeQualified(dstPath.toUri(), null);
FileSystem dstFs = FileSystem.get(dstPath.toUri(),config_);
if (dstFs.exists(dstPath))
{
if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsMergeFiles() - destination files exists" );
// for this prototype we just delete the file-- will change in next code drops
dstFs.delete(dstPath, false);
// The caller should already have checked existence of file-- throw exception
//throw new FileAlreadyExistsException(dstPath.toString());
}
Path tmpSrcPath = new Path(srcPath, "tmp");
FileSystem.mkdirs(srcFs, tmpSrcPath,srcFs.getFileStatus(srcPath).getPermission());
logger_.debug("HDFSClient.hdfsMergeFiles() - tmp folder created." );
Path[] files = FileUtil.stat2Paths(srcFs.listStatus(srcPath));
for (Path f : files)
{
srcFs.rename(f, tmpSrcPath);
}
// copyMerge and use false for the delete option since it removes the whole directory
if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsMergeFiles() - copyMerge" );
FileUtil.copyMerge(srcFs, tmpSrcPath, dstFs, dstPath, false, config_, null);
if (logger_.isDebugEnabled()) logger_.debug("HDFSClient.hdfsMergeFiles() - delete intermediate files" );
srcFs.delete(tmpSrcPath, true);
return true;
}
public static boolean hdfsCleanUnloadPath(String uldPathStr
/*, boolean checkExistence, String mergeFileStr*/) throws IOException
{
if (logger_.isDebugEnabled())
logger_.debug("HDFSClient.hdfsCleanUnloadPath() - unload Path: " + uldPathStr );
Path uldPath = new Path(uldPathStr );
FileSystem fs = FileSystem.get(uldPath.toUri(), config_);
if (!fs.exists(uldPath))
{
//unload location does not exist. hdfscreate will create it later
//nothing to do
return true;
}
Path[] files = FileUtil.stat2Paths(fs.listStatus(uldPath));
if (logger_.isDebugEnabled())
logger_.debug("HDFSClient.hdfsCleanUnloadPath() - delete files" );
for (Path f : files){
fs.delete(f, false);
}
return true;
}
public static boolean hdfsExists(String filePathStr) throws IOException
{
if (logger_.isDebugEnabled())
logger_.debug("HDFSClient.hdfsExists() - Path: " + filePathStr);
Path filePath = new Path(filePathStr );
FileSystem fs = FileSystem.get(filePath.toUri(), config_);
if (fs.exists(filePath))
return true;
return false;
}
public static boolean hdfsDeletePath(String pathStr) throws IOException
{
if (logger_.isDebugEnabled())
logger_.debug("HDFSClient.hdfsDeletePath() - start - Path: " + pathStr);
Path delPath = new Path(pathStr );
FileSystem fs = FileSystem.get(delPath.toUri(), config_);
fs.delete(delPath, true);
return true;
}
public int hdfsListDirectory(String pathStr, long hdfsClientJniObj) throws IOException
{
if (logger_.isDebugEnabled())
logger_.debug("HDFSClient.hdfsListDirectory() - start - Path: " + pathStr);
Path listPath = new Path(pathStr );
FileSystem fs = FileSystem.get(listPath.toUri(), config_);
FileStatus[] fileStatus;
if (fs.isDirectory(listPath))
fileStatus = fs.listStatus(listPath);
else
throw new IOException("The path " + listPath + "is not a directory");
FileStatus aFileStatus;
int retcode;
if (fileStatus != null) {
for (int i = 0; i < fileStatus.length; i++)
{
aFileStatus = fileStatus[i];
retcode = sendFileStatus(hdfsClientJniObj, fileStatus.length,
i,
aFileStatus.isDirectory(),
aFileStatus.getPath().toString(),
aFileStatus.getModificationTime(),
aFileStatus.getLen(),
aFileStatus.getReplication(),
aFileStatus.getBlockSize(),
aFileStatus.getOwner(),
aFileStatus.getGroup(),
aFileStatus.getPermission().toShort(),
aFileStatus.getAccessTime());
if (retcode != 0)
throw new IOException("Error " + retcode + " while sending the file status info for file " + aFileStatus.getPath().toString());
}
return fileStatus.length;
}
else
return 0;
}
public void stop() throws IOException
{
if (future_ != null) {
try {
future_.get(30, TimeUnit.SECONDS);
} catch(TimeoutException e) {
logger_.error("Asynchronous Thread of HdfsScan is Cancelled (timeout), ", e);
future_.cancel(true);
} catch(InterruptedException e) {
logger_.error("Asynchronous Thread of HdfsScan is Cancelled (interrupt), ", e);
future_.cancel(true); // Interrupt the thread
} catch (ExecutionException ee)
{
}
future_ = null;
}
}
public static void shutdown() throws InterruptedException
{
executorService_.awaitTermination(100, TimeUnit.MILLISECONDS);
executorService_.shutdown();
}
private static FileSystem getFileSystem() throws IOException
{
return defaultFs_;
}
// if levelDeep = 0, return the max modification timestamp of the passed-in HDFS URIs
// (a tab-separated list of 0 or more paths)
// if levelDeep > 0, also check all directories "levelDeep" levels below. Exclude
// directories that start with a dot (hidden directories)
public static long getHiveTableMaxModificationTs( String stableDirPaths, int levelDeep) throws FileNotFoundException, IOException
{
long result = 0;
if (logger_.isDebugEnabled())
logger_.debug("HDFSClient:getHiveTableMaxModificationTs enter");
String[] tableDirPaths = stableDirPaths.split("\t");
// account for root dir
for (int i=0; i<tableDirPaths.length; i++) {
FileStatus r = getFileSystem().getFileStatus(new Path(tableDirPaths[i]));// super fast API, return in .2ms
if (r != null && r.getModificationTime() > result)
result = r.getModificationTime();
}
if (levelDeep>0)
{
Path[] paths = new Path[tableDirPaths.length];
for (int i=0; i<tableDirPaths.length; i++)
paths[i] = new Path(tableDirPaths[i]);
long l = getHiveTableMaxModificationTs2(paths,levelDeep);
if (l > result)
result = l;
}
if (logger_.isDebugEnabled())
logger_.debug("HDFSClient:getHiveTableMaxModificationTs "+stableDirPaths+" levelDeep"+levelDeep+":"+result);
return result;
}
private static long getHiveTableMaxModificationTs2(Path[] paths, int levelDeep)throws FileNotFoundException, IOException
{
long result = 0;
PathFilter filter = new PathFilter(){
public boolean accept(Path file){
return !file.getName().startsWith(".");//filter out hidden files and directories
}
};
FileStatus[] fileStatuss=null;
if (levelDeep == 1){ // stop condition on recursive function
//check parent level (important for deletes):
for (Path path : paths){
FileStatus r = getFileSystem().getFileStatus(path);// super fast API, return in .2ms
if (r != null && r.getModificationTime()>result)
result = r.getModificationTime();
}
if (paths.length==1)
fileStatuss = getFileSystem().listStatus(paths[0],filter);// minor optimization. avoid using list based API when not needed
else
fileStatuss = getFileSystem().listStatus(paths,filter);
for(int i=0;i<fileStatuss.length;i++)
if (fileStatuss[i].isDirectory() && fileStatuss[i].getModificationTime()>result)
result = fileStatuss[i].getModificationTime();
}else{//here levelDeep >1
List<Path> pathList = new ArrayList<Path>();
if (paths.length==1)
fileStatuss = getFileSystem().listStatus(paths[0],filter);// minor optimization. avoid using list based API when not needed
else
fileStatuss = getFileSystem().listStatus(paths,filter);
for(int i=0;i<fileStatuss.length;i++)
if (fileStatuss[i].isDirectory())
{
pathList.add(fileStatuss[i].getPath());
if (fileStatuss[i].getModificationTime()>result)
result = fileStatuss[i].getModificationTime();// make sure level n-1 is accounted for for delete partition case
}
long l = getHiveTableMaxModificationTs2(pathList.toArray(new Path[pathList.size()]),levelDeep-1);
if (l>result) result = l;
}
return result;
}
public static String getFsDefaultName()
{
String uri = config_.get("fs.defaultFS");
return uri;
}
public static boolean hdfsCreateDirectory(String pathStr) throws IOException
{
if (logger_.isDebugEnabled())
logger_.debug("HDFSClient.hdfsCreateDirectory()" + pathStr);
Path dirPath = new Path(pathStr );
FileSystem fs = FileSystem.get(dirPath.toUri(), config_);
fs.mkdirs(dirPath);
return true;
}
public static boolean hdfsRename(String fromPathStr, String toPathStr) throws IOException
{
if (logger_.isDebugEnabled())
logger_.debug("HDFSClient.hdfsRename(" + fromPathStr + ", " + toPathStr + ")");
Path fromPath = new Path(fromPathStr );
Path toPath = new Path(toPathStr );
FileSystem fs = FileSystem.get(fromPath.toUri(), config_);
fs.rename(fromPath, toPath);
return true;
}
private native int sendFileStatus(long jniObj, int numFiles, int fileNo, boolean isDir,
String filename, long modTime, long len,
short numReplicas, long blockSize, String owner, String group,
short permissions, long accessTime);
}