| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.contrib.failmon; |
| |
| import java.io.BufferedOutputStream; |
| import java.io.InputStream; |
| import java.io.BufferedReader; |
| import java.io.BufferedWriter; |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.FileReader; |
| import java.io.FileWriter; |
| import java.io.IOException; |
| import java.io.FileNotFoundException; |
| import java.net.InetAddress; |
| import java.util.ArrayList; |
| import java.util.Calendar; |
| import java.util.zip.CRC32; |
| import java.util.zip.CheckedOutputStream; |
| import java.util.zip.ZipEntry; |
| import java.util.zip.ZipInputStream; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| |
| public class HDFSMerger { |
| |
| Configuration hadoopConf; |
| FileSystem hdfs; |
| |
| String hdfsDir; |
| |
| FileStatus [] inputFiles; |
| |
| Path outputFilePath; |
| FSDataOutputStream outputFile; |
| |
| boolean compress; |
| |
| FileWriter fw; |
| |
| BufferedWriter writer; |
| |
| public HDFSMerger() throws IOException { |
| |
| String hadoopConfPath; |
| |
| if (Environment.getProperty("hadoop.conf.path") == null) |
| hadoopConfPath = "../../../conf"; |
| else |
| hadoopConfPath = Environment.getProperty("hadoop.conf.path"); |
| |
| // Read the configuration for the Hadoop environment |
| Configuration hadoopConf = new Configuration(); |
| hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-default.xml")); |
| hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-site.xml")); |
| |
| // determine the local output file name |
| if (Environment.getProperty("local.tmp.filename") == null) |
| Environment.setProperty("local.tmp.filename", "failmon.dat"); |
| |
| // determine the upload location |
| hdfsDir = Environment.getProperty("hdfs.upload.dir"); |
| if (hdfsDir == null) |
| hdfsDir = "/failmon"; |
| |
| hdfs = FileSystem.get(hadoopConf); |
| |
| Path hdfsDirPath = new Path(hadoopConf.get("fs.default.name") + hdfsDir); |
| |
| try { |
| if (!hdfs.getFileStatus(hdfsDirPath).isDir()) { |
| Environment.logInfo("HDFSMerger: Not an HDFS directory: " + hdfsDirPath.toString()); |
| System.exit(0); |
| } |
| } catch (FileNotFoundException e) { |
| Environment.logInfo("HDFSMerger: Directory not found: " + hdfsDirPath.toString()); |
| } |
| |
| inputFiles = hdfs.listStatus(hdfsDirPath); |
| |
| outputFilePath = new Path(hdfsDirPath.toString() + "/" + "merge-" |
| + Calendar.getInstance().getTimeInMillis() + ".dat"); |
| outputFile = hdfs.create(outputFilePath); |
| |
| for (FileStatus fstatus : inputFiles) { |
| appendFile(fstatus.getPath()); |
| hdfs.delete(fstatus.getPath()); |
| } |
| |
| outputFile.close(); |
| |
| Environment.logInfo("HDFS file merging complete!"); |
| } |
| |
| private void appendFile (Path inputPath) throws IOException { |
| |
| FSDataInputStream anyInputFile = hdfs.open(inputPath); |
| InputStream inputFile; |
| byte buffer[] = new byte[4096]; |
| |
| if (inputPath.toString().endsWith(LocalStore.COMPRESSION_SUFFIX)) { |
| // the file is compressed |
| inputFile = new ZipInputStream(anyInputFile); |
| ((ZipInputStream) inputFile).getNextEntry(); |
| } else { |
| inputFile = anyInputFile; |
| } |
| |
| try { |
| int bytesRead = 0; |
| while ((bytesRead = inputFile.read(buffer)) > 0) { |
| outputFile.write(buffer, 0, bytesRead); |
| } |
| } catch (IOException e) { |
| Environment.logInfo("Error while copying file:" + inputPath.toString()); |
| } finally { |
| inputFile.close(); |
| } |
| } |
| |
| |
| public static void main(String [] args) { |
| |
| Environment.prepare("./conf/failmon.properties"); |
| |
| try { |
| new HDFSMerger(); |
| } catch (IOException e) { |
| e.printStackTrace(); |
| } |
| |
| } |
| } |