blob: e1ee33cc06716964397eeb164d01e612535391c5 [file] [log] [blame]
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
/**
*
*/
package org.trafodion.sql;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.io.compress.zlib.*;
import org.apache.hadoop.fs.*;
import java.io.*;
import java.util.List;
import org.apache.hadoop.util.*;
import org.apache.hadoop.io.*;
import org.apache.log4j.Logger;
import com.google.common.collect.Lists;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
public class SequenceFileWriter {
static Logger logger = Logger.getLogger(SequenceFileWriter.class.getName());
static Configuration conf = null; // File system configuration
SequenceFile.Writer writer = null;
FSDataOutputStream fsOut = null;
OutputStream outStream = null;
boolean sameStream = true;
FileSystem fs = null;
/**
* Class Constructor
*/
static {
conf = new Configuration(true);
}
SequenceFileWriter() throws IOException
{
}
public String open(String path) throws IOException {
Path filename = new Path(path);
writer = SequenceFile.createWriter(conf,
SequenceFile.Writer.file(filename),
SequenceFile.Writer.keyClass(ByteWritable.class),
SequenceFile.Writer.valueClass(BytesWritable.class),
SequenceFile.Writer.compression(CompressionType.NONE));
return null;
}
public String open(String path, int compressionType) throws IOException {
Path filename = new Path(path);
CompressionType compType=null;
switch (compressionType) {
case 0:
compType = CompressionType.NONE;
break;
case 1:
compType = CompressionType.RECORD;
break;
case 2:
compType = CompressionType.BLOCK;
break;
default:
throw new IOException("Wrong argument for compression type.");
}
writer = SequenceFile.createWriter(conf,
SequenceFile.Writer.file(filename),
SequenceFile.Writer.keyClass(BytesWritable.class),
SequenceFile.Writer.valueClass(Text.class),
SequenceFile.Writer.compression(compType));
return null;
}
public String write(String data) throws IOException {
if (writer == null)
throw new IOException("open() was not called first.");
writer.append(new BytesWritable(), new Text(data.getBytes()));
return null;
}
public String close() throws IOException {
if (writer != null) {
writer.close();
writer = null;
}
return null;
}
boolean hdfsCreate(String fname , boolean compress) throws IOException
{
if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsCreate() - started" );
Path filePath = null;
if (!compress || (compress && fname.endsWith(".gz")))
filePath = new Path(fname);
else
filePath = new Path(fname + ".gz");
fs = FileSystem.get(filePath.toUri(),conf);
fsOut = fs.create(filePath, true);
outStream = fsOut;
if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsCreate() - file created" );
if (compress)
{
GzipCodec gzipCodec = (GzipCodec) ReflectionUtils.newInstance( GzipCodec.class, conf);
Compressor gzipCompressor = CodecPool.getCompressor(gzipCodec);
outStream = gzipCodec.createOutputStream(fsOut, gzipCompressor);
sameStream = false;
}
if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsCreate() - compressed output stream created" );
return true;
}
boolean hdfsWrite(byte[] buff, long len) throws IOException
{
if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsWrite() - started" );
outStream.write(buff);
outStream.flush();
if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsWrite() - bytes written and flushed:" + len );
return true;
}
boolean hdfsClose() throws IOException
{
if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsClose() - started" );
if (sameStream) {
if (outStream != null) {
outStream.close();
outStream = null;
}
fsOut = null;
}
else {
if (outStream != null) {
outStream.close();
outStream = null;
}
if (fsOut != null) {
fsOut.close();
fsOut = null;
}
}
return true;
}
public boolean hdfsMergeFiles(String srcPathStr, String dstPathStr) throws IOException
{
if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsMergeFiles() - start");
if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsMergeFiles() - source Path: " + srcPathStr +
", destination File:" + dstPathStr );
Path srcPath = new Path(srcPathStr );
srcPath = srcPath.makeQualified(srcPath.toUri(), null);
FileSystem srcFs = FileSystem.get(srcPath.toUri(),conf);
Path dstPath = new Path(dstPathStr);
dstPath = dstPath.makeQualified(dstPath.toUri(), null);
FileSystem dstFs = FileSystem.get(dstPath.toUri(),conf);
if (dstFs.exists(dstPath))
{
if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsMergeFiles() - destination files exists" );
// for this prototype we just delete the file-- will change in next code drops
dstFs.delete(dstPath, false);
// The caller should already have checked existence of file-- throw exception
//throw new FileAlreadyExistsException(dstPath.toString());
}
Path tmpSrcPath = new Path(srcPath, "tmp");
FileSystem.mkdirs(srcFs, tmpSrcPath,srcFs.getFileStatus(srcPath).getPermission());
logger.debug("SequenceFileWriter.hdfsMergeFiles() - tmp folder created." );
Path[] files = FileUtil.stat2Paths(srcFs.listStatus(srcPath));
for (Path f : files)
{
srcFs.rename(f, tmpSrcPath);
}
// copyMerge and use false for the delete option since it removes the whole directory
if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsMergeFiles() - copyMerge" );
FileUtil.copyMerge(srcFs, tmpSrcPath, dstFs, dstPath, false, conf, null);
if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsMergeFiles() - delete intermediate files" );
srcFs.delete(tmpSrcPath, true);
return true;
}
public boolean hdfsCleanUnloadPath(String uldPathStr
/*, boolean checkExistence, String mergeFileStr*/) throws IOException
{
if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsCleanUnloadPath() - start");
logger.debug("SequenceFileWriter.hdfsCleanUnloadPath() - unload Path: " + uldPathStr );
Path uldPath = new Path(uldPathStr );
uldPath = uldPath.makeQualified(uldPath.toUri(), null);
FileSystem srcFs = FileSystem.get(uldPath.toUri(),conf);
if (!srcFs.exists(uldPath))
{
//unload location does not exist. hdfscreate will create it later
//nothing to do
logger.debug("SequenceFileWriter.hdfsCleanUnloadPath() -- unload location does not exist." );
return true;
}
Path[] files = FileUtil.stat2Paths(srcFs.listStatus(uldPath));
logger.debug("SequenceFileWriter.hdfsCleanUnloadPath() - delete files" );
for (Path f : files){
srcFs.delete(f, false);
}
return true;
}
public boolean hdfsExists(String filePathStr) throws IOException
{
logger.debug("SequenceFileWriter.hdfsExists() - start");
logger.debug("SequenceFileWriter.hdfsExists() - Path: " + filePathStr);
//check existence of the merge Path
Path filePath = new Path(filePathStr );
filePath = filePath.makeQualified(filePath.toUri(), null);
FileSystem mergeFs = FileSystem.get(filePath.toUri(),conf);
if (mergeFs.exists( filePath))
{
logger.debug("SequenceFileWriter.hdfsExists() - Path: "
+ filePath + " exists" );
return true;
}
return false;
}
public boolean hdfsDeletePath(String pathStr) throws IOException
{
if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsDeletePath() - start - Path: " + pathStr);
Path delPath = new Path(pathStr );
delPath = delPath.makeQualified(delPath.toUri(), null);
FileSystem fs = FileSystem.get(delPath.toUri(),conf);
fs.delete(delPath, true);
return true;
}
}