blob: 1d8bec495e3dbb808d48ff838cc1680fd0c08c0d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.vxquery.hdfs2;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringReader;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.hdfs.ContextFactory;
import org.apache.hyracks.hdfs2.dataflow.FileSplitsFactory;
import org.w3c.dom.Document;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
public class HDFSFunctions {
private Configuration conf;
private FileSystem fs;
private String confPath;
private Job job;
private InputFormat inputFormat;
private List<InputSplit> splits;
private ArrayList<ArrayList<String>> nodes;
private HashMap<Integer, String> schedule;
private static final String TEMP = "java.io.tmpdir";
private static final String DFS_PATH = "vxquery_splits_schedule.txt";
private static final String FILEPATH = System.getProperty(TEMP) + "splits_schedule.txt";
protected static final Logger LOGGER = Logger.getLogger(HDFSFunctions.class.getName());
private final Map<String, NodeControllerInfo> nodeControllerInfos;
/**
* Create the configuration and add the paths for core-site and hdfs-site as resources.
* Initialize an instance of HDFS FileSystem for this configuration.
*
* @param nodeControllerInfos
* Map of the node to its attributes
* @param hdfsConf
* Hdfs path to config
*/
public HDFSFunctions(Map<String, NodeControllerInfo> nodeControllerInfos, String hdfsConf) {
this.conf = new Configuration();
this.nodeControllerInfos = nodeControllerInfos;
this.confPath = hdfsConf;
}
/**
* Create the needed objects for reading the splits of the filepath given as argument.
* This method should run before the scheduleSplits method.
*
* @param filepath
* Path to config.
* @param tag
* Tag to read.
*/
@SuppressWarnings({ "deprecation", "unchecked" })
public void setJob(String filepath, String tag) {
try {
conf.set("start_tag", "<" + tag + ">");
conf.set("end_tag", "</" + tag + ">");
job = new Job(conf, "Read from HDFS");
Path input = new Path(filepath);
FileInputFormat.addInputPath(job, input);
job.setInputFormatClass(XmlCollectionWithTagInputFormat.class);
inputFormat = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
splits = inputFormat.getSplits(job);
} catch (IOException | ClassNotFoundException | InterruptedException e) {
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe(e.getMessage());
}
}
}
/**
* Returns true if the file path exists or it is located somewhere in the home directory of the user that called the function.
* Searches in subdirectories of the home directory too.
*
* @param filename
* HDFS file path.
* @return boolean
* True if located in HDFS.
* @throws IOException
* If searching for the filepath throws {@link IOException}
*/
public boolean isLocatedInHDFS(String filename) throws IllegalArgumentException, IOException {
//search file path
if (fs.exists(new Path(filename))) {
return true;
}
return searchInDirectory(fs.getHomeDirectory(), filename) != null;
}
/**
* Searches the given directory for the file.
*
* @param directory
* to search
* @param filename
* of file we want
* @return path if file exists in this directory.else return null.
*/
public Path searchInDirectory(Path directory, String filename) {
//Search the files and folder in this Path to find the one matching the filename.
try {
RemoteIterator<LocatedFileStatus> it = fs.listFiles(directory, true);
String[] parts;
Path path;
while (it.hasNext()) {
path = it.next().getPath();
parts = path.toString().split("/");
if (parts[parts.length - 1].equals(filename)) {
return path;
}
}
} catch (IOException e) {
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe(e.getMessage());
}
}
return null;
}
/**
* Read the cluster properties file and locate the HDFS_CONF variable that is the directory path for the
* hdfs configuration if the system environment variable HDFS_CONF is not set.
*
* @return true if is successfully finds the Hadoop/HDFS home directory
*/
private boolean locateConf() {
if (this.confPath == null) {
//As a last resort, try getting the configuration from the system environment
//Some systems won't have this set.
this.confPath = System.getenv("HADOOP_CONF_DIR");
}
return this.confPath != null;
}
/**
* Upload a file/directory to HDFS.Filepath is the path in the local file system.dir is the destination path.
*
* @param filepath
* file to upload
* @param dir
* HDFS directory to save the file
* @return boolean
*/
public boolean put(String filepath, String dir) {
if (this.fs != null) {
Path path = new Path(filepath);
Path dest = new Path(dir);
try {
if (fs.exists(dest)) {
fs.delete(dest, true); //recursive delete
}
} catch (IOException e) {
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe(e.getMessage());
}
}
try {
fs.copyFromLocalFile(path, dest);
} catch (IOException e) {
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe(e.getMessage());
}
}
}
return false;
}
/**
* Get instance of the HDFSfile system if it is configured correctly.
* Return null if there is no instance.
*
* @return FileSystem
*/
public FileSystem getFileSystem() {
if (locateConf()) {
conf.addResource(new Path(this.confPath + "/core-site.xml"));
conf.addResource(new Path(this.confPath + "/hdfs-site.xml"));
try {
fs = FileSystem.get(conf);
return this.fs;
} catch (IOException e) {
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe(e.getMessage());
}
}
} else {
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe("Could not locate HDFS configuration folder.");
}
}
return null;
}
public HashMap<String, ArrayList<Integer>> getLocationsOfSplits() throws IOException {
HashMap<String, ArrayList<Integer>> splitsMap = new HashMap<>();
ArrayList<Integer> temp;
int i = 0;
String hostname;
for (InputSplit s : this.splits) {
SplitLocationInfo[] info = s.getLocationInfo();
hostname = info[0].getLocation();
if (splitsMap.containsKey(hostname)) {
temp = splitsMap.get(hostname);
temp.add(i);
} else {
temp = new ArrayList<>();
temp.add(i);
splitsMap.put(hostname, temp);
}
i++;
}
return splitsMap;
}
public void scheduleSplits() throws IOException, ParserConfigurationException, SAXException {
schedule = new HashMap<>();
ArrayList<String> empty = new ArrayList<>();
HashMap<String, ArrayList<Integer>> splitsMap = this.getLocationsOfSplits();
readNodesFromXML();
int count = this.splits.size();
String node;
for (ArrayList<String> info : this.nodes) {
node = info.get(1);
if (splitsMap.containsKey(node)) {
for (Integer split : splitsMap.get(node)) {
schedule.put(split, node);
count--;
}
splitsMap.remove(node);
} else {
empty.add(node);
}
}
//Check if every split got assigned to a node
if (count != 0) {
ArrayList<Integer> remaining = new ArrayList<>();
// Find remaining splits
for (InputSplit s : this.splits) {
int i = 0;
if (!schedule.containsKey(i)) {
remaining.add(i);
}
}
if (!empty.isEmpty()) {
int nodeNumber = 0;
for (int split : remaining) {
if (nodeNumber == empty.size()) {
nodeNumber = 0;
}
schedule.put(split, empty.get(nodeNumber));
nodeNumber++;
}
}
}
}
/**
* Read the hostname and the ip address of every node from the xml cluster configuration file.
* Save the information inside nodes.
*/
public void readNodesFromXML() {
nodes = new ArrayList<>();
for (NodeControllerInfo ncInfo : nodeControllerInfos.values()) {
//Will this include the master node? Is that bad?
ArrayList<String> info = new ArrayList<>();
info.add(ncInfo.getNodeId());
info.add(ncInfo.getNetworkAddress().getAddress());
nodes.add(info);
}
}
/**
* Writes the schedule to a temporary file, then uploads the file to the HDFS.
*
* @throws UnsupportedEncodingException
* The encoding of the file is not correct
* @throws FileNotFoundException
* The file doesn't exist
*/
public void addScheduleToDistributedCache() throws FileNotFoundException, UnsupportedEncodingException {
PrintWriter writer = new PrintWriter(FILEPATH, "UTF-8");
for (int split : this.schedule.keySet()) {
writer.write(split + "," + this.schedule.get(split));
}
writer.close();
// Add file to HDFS
this.put(FILEPATH, DFS_PATH);
}
public RecordReader getReader() {
List<FileSplit> fileSplits = new ArrayList<>();
for (int i = 0; i < splits.size(); i++) {
fileSplits.add((FileSplit) splits.get(i));
}
FileSplitsFactory splitsFactory;
try {
splitsFactory = new FileSplitsFactory(fileSplits);
List<FileSplit> inputSplits = splitsFactory.getSplits();
ContextFactory ctxFactory = new ContextFactory();
int size = inputSplits.size();
for (int i = 0; i < size; i++) {
/**
* read the split
*/
TaskAttemptContext context;
try {
context = ctxFactory.createContext(job.getConfiguration(), i);
RecordReader reader = inputFormat.createRecordReader(inputSplits.get(i), context);
reader.initialize(inputSplits.get(i), context);
return reader;
} catch (IOException | InterruptedException e) {
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe(e.getMessage());
}
}
}
} catch (HyracksDataException e) {
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe(e.getMessage());
}
}
return null;
}
/**
* @return schedule.
*/
public HashMap<Integer, String> getSchedule() {
return this.schedule;
}
/**
* Return the splits belonging to this node for the existing schedule.
*
* @param node
* HDFS node
* @return List
*/
public ArrayList<Integer> getScheduleForNode(String node) {
ArrayList<Integer> nodeSchedule = new ArrayList<>();
for (int split : this.schedule.keySet()) {
if (node.equals(this.schedule.get(split))) {
nodeSchedule.add(split);
}
}
return nodeSchedule;
}
public List<InputSplit> getSplits() {
return this.splits;
}
public Job getJob() {
return this.job;
}
public InputFormat getinputFormat() {
return this.inputFormat;
}
public Document convertStringToDocument(String xmlStr) {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder;
try {
builder = factory.newDocumentBuilder();
Document doc = builder.parse(new InputSource(new StringReader(xmlStr)));
return doc;
} catch (Exception e) {
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe(e.getMessage());
}
}
return null;
}
}