blob: f5cb539b8ce3f8dbe77d802a7ed466539ec0695b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.datastore.filesystem;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.exception.CarbonFileException;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.log4j.Logger;
public abstract class AbstractDFSCarbonFile implements CarbonFile {
/**
* LOGGER
*/
private static final Logger LOGGER =
LogServiceFactory.getLogService(AbstractDFSCarbonFile.class.getName());
protected FileSystem fileSystem;
protected Configuration hadoopConf;
protected Path path;
protected FileStatus fileStatus;
AbstractDFSCarbonFile(String filePath) {
this(filePath, FileFactory.getConfiguration());
}
AbstractDFSCarbonFile(String filePath, Configuration hadoopConf) {
this(new Path(filePath), hadoopConf);
}
AbstractDFSCarbonFile(Path path) {
this(path, FileFactory.getConfiguration());
}
AbstractDFSCarbonFile(Path path, Configuration hadoopConf) {
this.hadoopConf = hadoopConf;
try {
Path newPath = new Path(path.toString().replace("\\", "/"));
fileSystem = newPath.getFileSystem(this.hadoopConf);
this.path = newPath;
} catch (IOException e) {
throw new CarbonFileException("Error while getting File System: ", e);
}
}
AbstractDFSCarbonFile(FileStatus fileStatus) {
this(fileStatus.getPath());
this.fileStatus = fileStatus;
}
@Override
public boolean createNewFile() {
try {
return fileSystem.createNewFile(path);
} catch (IOException e) {
throw new CarbonFileException("Unable to create file: " + path.toString(), e);
}
}
@Override
public String getAbsolutePath() {
try {
return fileSystem.getFileStatus(path).getPath().toString();
} catch (IOException e) {
throw new CarbonFileException("Unable to get file status: ", e);
}
}
@Override
public String getName() {
return path.getName();
}
@Override
public boolean isDirectory() {
try {
return fileSystem.getFileStatus(path).isDirectory();
} catch (IOException e) {
throw new CarbonFileException("Unable to get file status: ", e);
}
}
@Override
public CarbonFile getParentFile() {
Path parentPath = path.getParent();
return parentPath == null ? null : FileFactory.getCarbonFile(parentPath.toString());
}
@Override
public boolean exists() {
try {
return fileSystem.exists(path);
} catch (IOException e) {
throw new CarbonFileException(e);
}
}
@Override
public String getCanonicalPath() {
return getAbsolutePath();
}
@Override
public String getPath() {
return getAbsolutePath();
}
@Override
public long getSize() {
try {
return fileSystem.getFileStatus(path).getLen();
} catch (IOException e) {
throw new CarbonFileException("Unable to get file status for " + path.toString(), e);
}
}
@Override
public boolean renameTo(String changeToName) {
try {
return fileSystem.rename(path, new Path(changeToName));
} catch (IOException e) {
throw new CarbonFileException("Failed to rename file: ", e);
}
}
public boolean delete() {
try {
return fileSystem.delete(path, true);
} catch (IOException e) {
throw new CarbonFileException("Failed to delete file:", e);
}
}
@Override
public long getLastModifiedTime() {
try {
if (this.fileStatus != null) {
return this.fileStatus.getModificationTime();
}
return fileSystem.getFileStatus(path).getModificationTime();
} catch (IOException e) {
throw new CarbonFileException("Unable to get file status: ", e);
}
}
@Override
public boolean setLastModifiedTime(long timestamp) {
try {
fileSystem.setTimes(path, timestamp, timestamp);
return true;
} catch (IOException e) {
throw new CarbonFileException("Error while setting modified time: ", e);
}
}
/**
* This method will delete the data in file data from a given offset
*/
@Override
public boolean truncate(String fileName, long validDataEndOffset) {
DataOutputStream dataOutputStream = null;
DataInputStream dataInputStream = null;
boolean fileTruncatedSuccessfully = false;
// if bytes to read less than 1024 then buffer size should be equal to the given offset
int bufferSize = validDataEndOffset > CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR ?
CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR :
(int) validDataEndOffset;
// temporary file name
String tempWriteFilePath = fileName + CarbonCommonConstants.TEMPWRITEFILEEXTENSION;
try {
CarbonFile tempFile;
// delete temporary file if it already exists at a given path
if (FileFactory.isFileExist(tempWriteFilePath)) {
tempFile = FileFactory.getCarbonFile(tempWriteFilePath);
tempFile.delete();
}
// create new temporary file
FileFactory.createNewFile(tempWriteFilePath);
tempFile = FileFactory.getCarbonFile(tempWriteFilePath);
byte[] buff = new byte[bufferSize];
dataInputStream = FileFactory.getDataInputStream(fileName);
// read the data
int read = dataInputStream.read(buff, 0, buff.length);
dataOutputStream = FileFactory.getDataOutputStream(tempWriteFilePath);
dataOutputStream.write(buff, 0, read);
long remaining = validDataEndOffset - read;
// anytime we should not cross the offset to be read
while (remaining > 0) {
if (remaining > bufferSize) {
buff = new byte[bufferSize];
} else {
buff = new byte[(int) remaining];
}
read = dataInputStream.read(buff, 0, buff.length);
dataOutputStream.write(buff, 0, read);
remaining = remaining - read;
}
CarbonUtil.closeStreams(dataInputStream, dataOutputStream);
// rename the temp file to original file
tempFile.renameForce(fileName);
fileTruncatedSuccessfully = true;
} catch (IOException e) {
LOGGER.error("Exception occurred while truncating the file " + e.getMessage(), e);
} finally {
CarbonUtil.closeStreams(dataOutputStream, dataInputStream);
}
return fileTruncatedSuccessfully;
}
/**
* This method will be used to check whether a file has been modified or not
*
* @param fileTimeStamp time to be compared with latest timestamp of file
* @param endOffset file length to be compared with current length of file
* @return whether a file has been modified or not
*/
@Override
public boolean isFileModified(long fileTimeStamp, long endOffset) {
boolean isFileModified = false;
if (getLastModifiedTime() > fileTimeStamp || getSize() > endOffset) {
isFileModified = true;
}
return isFileModified;
}
@Override
public DataOutputStream getDataOutputStream(int bufferSize, boolean append)
throws IOException {
FSDataOutputStream stream = null;
if (append) {
// append to a file only if file already exists else file not found
// exception will be thrown by hdfs
if (CarbonUtil.isFileExists(path.toString())) {
stream = fileSystem.append(path, bufferSize);
} else {
stream = fileSystem.create(path, true, bufferSize);
}
} else {
stream = fileSystem.create(path, true, bufferSize);
}
return stream;
}
@Override
public DataInputStream getDataInputStream(int bufferSize) throws IOException {
return getDataInputStream(bufferSize,
CarbonUtil.inferCompressorFromFileName(path.toString()));
}
/**
* return the DataInputStream which is seek to the offset of file
*
* @return DataInputStream
* @throws IOException
*/
@Override
public DataInputStream getDataInputStream(int bufferSize, long offset)
throws IOException {
FSDataInputStream stream = fileSystem.open(path, bufferSize);
stream.seek(offset);
return new DataInputStream(new BufferedInputStream(stream));
}
@Override
public DataInputStream getDataInputStream(int bufferSize, String compressor)
throws IOException {
InputStream inputStream;
if (bufferSize <= 0) {
inputStream = fileSystem.open(path);
} else {
inputStream = fileSystem.open(path, bufferSize);
}
String codecName = getCodecNameFromCompressor(compressor);
if (!codecName.isEmpty()) {
CompressionCodec codec = new CompressionCodecFactory(hadoopConf).getCodecByName(codecName);
inputStream = codec.createInputStream(inputStream);
}
if (bufferSize <= 0 && inputStream instanceof FSDataInputStream) {
return (DataInputStream) inputStream;
} else {
return new DataInputStream(new BufferedInputStream(inputStream));
}
}
/**
* get codec name from user specified compressor name
*
* @param compressorName user specified compressor name
* @return name of codec
* @throws IOException
*/
private String getCodecNameFromCompressor(String compressorName) throws IOException {
if (compressorName.isEmpty()) {
return "";
} else if ("GZIP".equalsIgnoreCase(compressorName)) {
return GzipCodec.class.getName();
} else if ("BZIP2".equalsIgnoreCase(compressorName)) {
return BZip2Codec.class.getName();
} else if ("SNAPPY".equalsIgnoreCase(compressorName)) {
return SnappyCodec.class.getName();
} else if ("LZ4".equalsIgnoreCase(compressorName)) {
return Lz4Codec.class.getName();
} else {
throw new IOException("Unsupported compressor: " + compressorName);
}
}
@Override
public DataOutputStream getDataOutputStream() throws IOException {
return fileSystem.create(path, true);
}
@Override
public DataOutputStream getDataOutputStream(int bufferSize, long blockSize)
throws IOException {
short replication = fileSystem.getDefaultReplication(path);
return getDataOutputStream(bufferSize, blockSize, replication);
}
@Override
public DataOutputStream getDataOutputStream(int bufferSize, long blockSize, short replication)
throws IOException {
return fileSystem.create(path, true, bufferSize, replication, blockSize);
}
@Override
public DataOutputStream getDataOutputStream(int bufferSize, String compressor)
throws IOException {
OutputStream outputStream;
if (bufferSize <= 0) {
outputStream = fileSystem.create(path);
} else {
outputStream = fileSystem.create(path, true, bufferSize);
}
String codecName = getCodecNameFromCompressor(compressor);
if (!codecName.isEmpty()) {
CompressionCodec codec = new CompressionCodecFactory(hadoopConf).getCodecByName(codecName);
outputStream = codec.createOutputStream(outputStream);
}
return new DataOutputStream(new BufferedOutputStream(outputStream));
}
@Override
public boolean isFileExist(boolean performFileCheck) throws IOException {
if (performFileCheck) {
return fileSystem.exists(path) && fileSystem.isFile(path);
} else {
return fileSystem.exists(path);
}
}
@Override
public boolean isFileExist() throws IOException {
return isFileExist(false);
}
@Override
public boolean createNewFile(FsPermission permission) throws IOException {
if (fileSystem.exists(path)) {
return false;
} else {
if (permission == null) {
permission =
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(fileSystem.getConf()));
}
// Pass the permissions during file creation itself
fileSystem
.create(path, permission, false, fileSystem.getConf().getInt("io.file.buffer.size", 4096),
fileSystem.getDefaultReplication(path), fileSystem.getDefaultBlockSize(path), null)
.close();
return true;
}
}
@Override
public boolean deleteFile() throws IOException {
return fileSystem.delete(path, true);
}
@Override
public boolean mkdirs() throws IOException {
return fileSystem.mkdirs(path);
}
@Override
public DataOutputStream getDataOutputStreamUsingAppend() throws IOException {
return fileSystem.append(path);
}
@Override
public boolean createNewLockFile() throws IOException {
if (fileSystem.exists(path)) {
return false;
} else {
// Pass the permissions during file creation itself
fileSystem.create(path, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL), false,
fileSystem.getConf().getInt("io.file.buffer.size", 4096),
fileSystem.getDefaultReplication(path), fileSystem.getDefaultBlockSize(path), null)
.close();
// hadoop masks the permission according to configured permission, so need to set permission
// forcefully
fileSystem.setPermission(path, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
return true;
}
}
@Override
public CarbonFile[] listFiles() {
FileStatus[] listStatus;
try {
listStatus = fileSystem.listStatus(path);
} catch (IOException e) {
LOGGER.warn("Exception occurred: " + e.getMessage(), e);
return new CarbonFile[0];
}
return getFiles(listStatus);
}
/**
* Get the CarbonFiles from FileStatus array
*/
protected abstract CarbonFile[] getFiles(FileStatus[] listStatus);
@Override
public List<CarbonFile> listFiles(Boolean recursive) throws IOException {
RemoteIterator<LocatedFileStatus> listStatus = fileSystem.listFiles(path, recursive);
return getFiles(listStatus);
}
@Override
public CarbonFile[] locationAwareListFiles(PathFilter pathFilter) throws IOException {
List<FileStatus> listStatus = new ArrayList<>();
RemoteIterator<LocatedFileStatus> iter = fileSystem.listLocatedStatus(path);
while (iter.hasNext()) {
LocatedFileStatus fileStatus = iter.next();
if (pathFilter.accept(fileStatus.getPath()) && fileStatus.getLen() > 0) {
listStatus.add(fileStatus);
}
}
return getFiles(listStatus.toArray(new FileStatus[listStatus.size()]));
}
protected List<CarbonFile> getFiles(RemoteIterator<LocatedFileStatus> listStatus)
throws IOException {
List<CarbonFile> carbonFiles = new ArrayList<>();
while (listStatus.hasNext()) {
Path filePath = listStatus.next().getPath();
carbonFiles.add(FileFactory.getCarbonFile(filePath.toString()));
}
return carbonFiles;
}
/**
* Method used to list files recursively and apply file filter on the result.
*
*/
@Override
public List<CarbonFile> listFiles(boolean recursive, CarbonFileFilter fileFilter)
throws IOException {
List<CarbonFile> carbonFiles = new ArrayList<>();
FileStatus fileStatus = fileSystem.getFileStatus(path);
if (null != fileStatus && fileStatus.isDirectory()) {
RemoteIterator<LocatedFileStatus> listStatus = fileSystem.listFiles(fileStatus.getPath(),
recursive);
while (listStatus.hasNext()) {
LocatedFileStatus locatedFileStatus = listStatus.next();
CarbonFile carbonFile = FileFactory.getCarbonFile(locatedFileStatus.getPath().toString());
if (fileFilter.accept(carbonFile)) {
carbonFiles.add(carbonFile);
}
}
}
return carbonFiles;
}
@Override
public CarbonFile[] listFiles(boolean recursive, int maxCount)
throws IOException {
List<CarbonFile> carbonFiles = new ArrayList<>();
FileStatus fileStatus = fileSystem.getFileStatus(path);
if (null != fileStatus && fileStatus.isDirectory()) {
RemoteIterator<LocatedFileStatus> listStatus = fileSystem.listFiles(path, recursive);
int counter = 0;
while (counter < maxCount && listStatus.hasNext()) {
LocatedFileStatus locatedFileStatus = listStatus.next();
CarbonFile carbonFile = FileFactory.getCarbonFile(locatedFileStatus.getPath().toString());
carbonFiles.add(carbonFile);
counter++;
}
}
return carbonFiles.toArray(new CarbonFile[0]);
}
@Override
public String[] getLocations() throws IOException {
BlockLocation[] blkLocations;
FileStatus fileStatus = fileSystem.getFileStatus(path);
if (fileStatus instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) fileStatus).getBlockLocations();
} else {
blkLocations =
fileSystem.getFileBlockLocations(fileStatus.getPath(), 0L, fileStatus.getLen());
}
return blkLocations[0].getHosts();
}
@Override
public boolean setReplication(short replication) throws IOException {
return fileSystem.setReplication(path, replication);
}
@Override
public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
CarbonFile[] files = listFiles();
if (files != null && files.length >= 1) {
List<CarbonFile> fileList = new ArrayList<CarbonFile>(files.length);
for (int i = 0; i < files.length; i++) {
if (fileFilter.accept(files[i])) {
fileList.add(files[i]);
}
}
if (fileList.size() >= 1) {
return fileList.toArray(new CarbonFile[fileList.size()]);
} else {
return new CarbonFile[0];
}
}
return files;
}
@Override
public short getDefaultReplication() {
return fileSystem.getDefaultReplication(path);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AbstractDFSCarbonFile that = (AbstractDFSCarbonFile) o;
if (path == null || that.path == null) {
return false;
}
return path.equals(that.path);
}
@Override
public int hashCode() {
if (path == null) {
return 0;
}
return Objects.hash(path);
}
@Override
public long getLength() throws IOException {
return fileSystem.getFileStatus(path).getLen();
}
}