blob: 6c369ae0e00514b62d502f461417d4efedd7f609 [file] [log] [blame]
package org.apache.samoa.utils;
/*
* #%L
* SAMOA
* %%
* Copyright (C) 2014 - 2015 Apache Software Foundation
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.file.FileSystems;
import java.util.Map;
import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Utilities methods for: - Kafka - HDFS - Handling files on local FS
*
* @author Anh Thu Vu
*/
public class SystemsUtils {
private static final Logger logger = LoggerFactory.getLogger(SystemsUtils.class);
public static final String HDFS = "hdfs";
public static final String LOCAL_FS = "local";
private static final String TEMP_FILE = "samoaTemp";
private static final String TEMP_FILE_SUFFIX = ".dat";
/*
* Kafka
*/
private static class KafkaUtils {
private static ZkClient zkClient;
static void setZookeeper(String zk) {
zkClient = new ZkClient(zk, 30000, 30000, new ZKStringSerializerWrapper());
}
/*
* Create Kafka topic/stream
*/
static void createKafkaTopic(String name, int partitions, int replicas) {
AdminUtils.createTopic(zkClient, name, partitions, replicas, new Properties());
}
static class ZKStringSerializerWrapper implements ZkSerializer {
@Override
public Object deserialize(byte[] byteArray) throws ZkMarshallingError {
return ZKStringSerializer.deserialize(byteArray);
}
@Override
public byte[] serialize(Object obj) throws ZkMarshallingError {
return ZKStringSerializer.serialize(obj);
}
}
}
/*
* HDFS
*/
private static class HDFSUtils {
private static String coreConfPath;
private static String hdfsConfPath;
private static String configHomePath;
private static String samoaDir = null;
static void setHadoopConfigHome(String hadoopConfPath) {
logger.info("Hadoop config home:{}", hadoopConfPath);
configHomePath = hadoopConfPath;
java.nio.file.Path coreSitePath = FileSystems.getDefault().getPath(hadoopConfPath, "core-site.xml");
java.nio.file.Path hdfsSitePath = FileSystems.getDefault().getPath(hadoopConfPath, "hdfs-site.xml");
coreConfPath = coreSitePath.toAbsolutePath().toString();
hdfsConfPath = hdfsSitePath.toAbsolutePath().toString();
}
static String getNameNodeUri() {
Configuration config = new Configuration();
config.addResource(new Path(coreConfPath));
config.addResource(new Path(hdfsConfPath));
return config.get("fs.defaultFS");
}
static String getHadoopConfigHome() {
return configHomePath;
}
static void setSAMOADir(String dir) {
if (dir != null)
samoaDir = getNameNodeUri() + dir;
else
samoaDir = null;
}
static String getDefaultSAMOADir() throws IOException {
Configuration config = new Configuration();
config.addResource(new Path(coreConfPath));
config.addResource(new Path(hdfsConfPath));
FileSystem fs = FileSystem.get(config);
Path defaultDir = new Path(fs.getHomeDirectory(), ".samoa");
return defaultDir.toString();
}
static boolean deleteFileIfExist(String absPath) {
Path p = new Path(absPath);
return deleteFileIfExist(p);
}
static boolean deleteFileIfExist(Path p) {
Configuration config = new Configuration();
config.addResource(new Path(coreConfPath));
config.addResource(new Path(hdfsConfPath));
FileSystem fs;
try {
fs = FileSystem.get(config);
if (fs.exists(p)) {
return fs.delete(p, false);
}
else
return true;
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return false;
}
/*
* Write to HDFS
*/
static String writeToHDFS(File file, String dstPath) {
Configuration config = new Configuration();
config.addResource(new Path(coreConfPath));
config.addResource(new Path(hdfsConfPath));
logger.info("Filesystem name:{}", config.get("fs.defaultFS"));
// Default samoaDir
if (samoaDir == null) {
try {
samoaDir = getDefaultSAMOADir();
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
// Setup src and dst paths
// java.nio.file.Path tempPath =
// FileSystems.getDefault().getPath(samoaDir, dstPath);
Path dst = new Path(samoaDir, dstPath);
Path src = new Path(file.getAbsolutePath());
// Delete file if already exists in HDFS
if (deleteFileIfExist(dst) == false)
return null;
// Copy to HDFS
FileSystem fs;
try {
fs = FileSystem.get(config);
fs.copyFromLocalFile(src, dst);
} catch (IOException e) {
e.printStackTrace();
return null;
}
return dst.toString(); // abs path to file
}
/*
* Read from HDFS
*/
static Object deserializeObjectFromFile(String filePath) {
logger.info("Deserialize HDFS file:{}", filePath);
Configuration config = new Configuration();
config.addResource(new Path(coreConfPath));
config.addResource(new Path(hdfsConfPath));
Path file = new Path(filePath);
FSDataInputStream dataInputStream = null;
ObjectInputStream ois = null;
Object obj = null;
FileSystem fs;
try {
fs = FileSystem.get(config);
dataInputStream = fs.open(file);
ois = new ObjectInputStream(dataInputStream);
obj = ois.readObject();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ClassNotFoundException e) {
try {
if (dataInputStream != null)
dataInputStream.close();
if (ois != null)
ois.close();
} catch (IOException ioException) {
// TODO auto-generated catch block
e.printStackTrace();
}
}
return obj;
}
}
private static class LocalFileSystemUtils {
static boolean serializObjectToFile(Object obj, String fn) {
FileOutputStream fos = null;
ObjectOutputStream oos = null;
try {
fos = new FileOutputStream(fn);
oos = new ObjectOutputStream(fos);
oos.writeObject(obj);
} catch (FileNotFoundException e) {
e.printStackTrace();
return false;
} catch (IOException e) {
e.printStackTrace();
return false;
} finally {
try {
if (fos != null)
fos.close();
if (oos != null)
oos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return true;
}
static Object deserializeObjectFromLocalFile(String filename) {
logger.info("Deserialize local file:{}", filename);
FileInputStream fis = null;
ObjectInputStream ois = null;
Object obj = null;
try {
fis = new FileInputStream(filename);
ois = new ObjectInputStream(fis);
obj = ois.readObject();
} catch (IOException e) {
// TODO auto-generated catch block
e.printStackTrace();
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
try {
if (fis != null)
fis.close();
if (ois != null)
ois.close();
} catch (IOException e) {
// TODO auto-generated catch block
e.printStackTrace();
}
}
return obj;
}
}
/*
* Create streams
*/
public static void createKafkaTopic(String name, int partitions) {
createKafkaTopic(name, partitions, 1);
}
public static void createKafkaTopic(String name, int partitions, int replicas) {
KafkaUtils.createKafkaTopic(name, partitions, replicas);
}
/*
* Serialize object
*/
public static boolean serializeObjectToLocalFileSystem(Object object, String path) {
return LocalFileSystemUtils.serializObjectToFile(object, path);
}
public static String serializeObjectToHDFS(Object object, String path) {
File tmpDatFile;
try {
tmpDatFile = File.createTempFile(TEMP_FILE, TEMP_FILE_SUFFIX);
if (serializeObjectToLocalFileSystem(object, tmpDatFile.getAbsolutePath())) {
return HDFSUtils.writeToHDFS(tmpDatFile, path);
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}
/*
* Deserialize object
*/
@SuppressWarnings("unchecked")
public static Map<String, Object> deserializeMapFromFile(String filesystem, String filename) {
Map<String, Object> map;
if (filesystem.equals(HDFS)) {
map = (Map<String, Object>) HDFSUtils.deserializeObjectFromFile(filename);
}
else {
map = (Map<String, Object>) LocalFileSystemUtils.deserializeObjectFromLocalFile(filename);
}
return map;
}
public static Object deserializeObjectFromFileAndKey(String filesystem, String filename, String key) {
Map<String, Object> map = deserializeMapFromFile(filesystem, filename);
if (map == null)
return null;
return map.get(key);
}
/*
* Setup
*/
public static void setZookeeper(String zookeeper) {
KafkaUtils.setZookeeper(zookeeper);
}
public static void setHadoopConfigHome(String hadoopHome) {
HDFSUtils.setHadoopConfigHome(hadoopHome);
}
public static void setSAMOADir(String samoaDir) {
HDFSUtils.setSAMOADir(samoaDir);
}
/*
* Others
*/
public static String getHDFSNameNodeUri() {
return HDFSUtils.getNameNodeUri();
}
public static String getHadoopConfigHome() {
return HDFSUtils.getHadoopConfigHome();
}
public static String copyToHDFS(File file, String dstPath) {
return HDFSUtils.writeToHDFS(file, dstPath);
}
}