| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.chukwa.extraction.demux; |
| |
| |
| import java.io.IOException; |
| import java.net.URI; |
| import java.text.SimpleDateFormat; |
| import java.util.ArrayList; |
| import java.util.Calendar; |
| import java.util.List; |
| import org.apache.hadoop.chukwa.conf.ChukwaConfiguration; |
| import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; |
| import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; |
| import org.apache.hadoop.chukwa.util.DaemonWatcher; |
| import org.apache.hadoop.chukwa.util.PidFile; |
| import org.apache.hadoop.conf.Configured; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.mapred.FileInputFormat; |
| import org.apache.hadoop.mapred.FileOutputFormat; |
| import org.apache.hadoop.mapred.JobClient; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.JobPriority; |
| import org.apache.hadoop.mapred.SequenceFileInputFormat; |
| import org.apache.hadoop.mapred.SequenceFileOutputFormat; |
| import org.apache.hadoop.mapred.lib.IdentityMapper; |
| import org.apache.hadoop.mapred.lib.IdentityReducer; |
| import org.apache.hadoop.util.Tool; |
| import org.apache.log4j.Logger; |
| |
| // TODO do an abstract class for all rolling |
| public class HourlyChukwaRecordRolling extends Configured implements Tool { |
| static Logger log = Logger.getLogger(HourlyChukwaRecordRolling.class); |
| |
| static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd"); |
| static ChukwaConfiguration conf = null; |
| static FileSystem fs = null; |
| static final String HadoopLogDir = "_logs"; |
| static final String hadoopTempDir = "_temporary"; |
| |
| static boolean rollInSequence = true; |
| static boolean deleteRawdata = false; |
| |
| public static void usage() { |
| System.err |
| .println("usage: java org.apache.hadoop.chukwa.extraction.demux.HourlyChukwaRecordRolling rollInSequence <True/False> deleteRawdata <True/False>"); |
| System.exit(-1); |
| } |
| |
| public static void buildHourlyFiles(String chukwaMainRepository, |
| String tempDir, String rollingFolder, int workingDay, int workingHour) |
| throws IOException { |
| // process |
| Path hourPath = new Path(rollingFolder + "/hourly/" + workingDay + "/" |
| + workingHour); |
| FileStatus[] clustersFS = fs.listStatus(hourPath); |
| for (FileStatus clusterFs : clustersFS) { |
| String cluster = clusterFs.getPath().getName(); |
| |
| Path dataSourceClusterHourPaths = new Path(rollingFolder + "/hourly/" |
| + workingDay + "/" + workingHour + "/" + cluster); |
| FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths); |
| |
| for (FileStatus dataSourceFS : dataSourcesFS) { |
| String dataSource = dataSourceFS.getPath().getName(); |
| // Repo path = reposRootDirectory/<cluster>/<datasource>/<day>/<hour>/*/*.evt |
| |
| // put the rotate flag |
| fs.mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/" |
| + dataSource + "/" + workingDay + "/" + workingHour |
| + "/rotateDone")); |
| |
| // rotate |
| // Merge |
| String[] mergeArgs = new String[5]; |
| // input |
| mergeArgs[0] = chukwaMainRepository + "/" + cluster + "/" + dataSource |
| + "/" + workingDay + "/" + workingHour + "/[0-5]*/*.evt"; |
| // temp dir |
| mergeArgs[1] = tempDir + "/" + cluster + "/" + dataSource + "/" |
| + workingDay + "/" + workingHour + "_" + System.currentTimeMillis(); |
| // final output dir |
| mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource |
| + "/" + workingDay + "/" + workingHour; |
| // final output fileName |
| mergeArgs[3] = dataSource + "_HourlyDone_" + workingDay + "_" + workingHour; |
| // delete rolling directory |
| mergeArgs[4] = rollingFolder + "/hourly/" + workingDay + "/" |
| + workingHour + "/" + cluster + "/" + dataSource; |
| |
| log.info("HourlyChukwaRecordRolling 0: " + mergeArgs[0]); |
| log.info("HourlyChukwaRecordRolling 1: " + mergeArgs[1]); |
| log.info("HourlyChukwaRecordRolling 2: " + mergeArgs[2]); |
| log.info("HourlyChukwaRecordRolling 3: " + mergeArgs[3]); |
| log.info("HourlyChukwaRecordRolling 4: " + mergeArgs[4]); |
| |
| RecordMerger merge = new RecordMerger(conf, fs, |
| new HourlyChukwaRecordRolling(), mergeArgs, deleteRawdata); |
| List<RecordMerger> allMerge = new ArrayList<RecordMerger>(); |
| if (rollInSequence) { |
| merge.run(); |
| } else { |
| allMerge.add(merge); |
| merge.start(); |
| } |
| |
| // join all Threads |
| if (!rollInSequence) { |
| while (allMerge.size() > 0) { |
| RecordMerger m = allMerge.remove(0); |
| try { |
| m.join(); |
| } catch (InterruptedException e) { |
| } |
| } |
| } // End if (!rollInSequence) |
| |
| // Delete the processed dataSourceFS |
| FileUtil.fullyDelete(fs, dataSourceFS.getPath()); |
| |
| } // End for(FileStatus dataSourceFS : dataSourcesFS) |
| |
| // Delete the processed clusterFs |
| FileUtil.fullyDelete(fs, clusterFs.getPath()); |
| |
| } // End for(FileStatus clusterFs : clustersFS) |
| |
| // Delete the processed hour |
| FileUtil.fullyDelete(fs, hourPath); |
| } |
| |
| /** |
| * @param args |
| * @throws Exception |
| */ |
| public static void main(String[] args) throws Exception { |
| DaemonWatcher.createInstance("HourlyChukwaRecordRolling"); |
| |
| conf = new ChukwaConfiguration(); |
| String fsName = conf.get("writer.hdfs.filesystem"); |
| fs = FileSystem.get(new URI(fsName), conf); |
| |
| // TODO read from config |
| String rollingFolder = "/chukwa/rolling/"; |
| String chukwaMainRepository = "/chukwa/repos/"; |
| String tempDir = "/chukwa/temp/hourlyRolling/"; |
| |
| // TODO do a real parameter parsing |
| if (args.length != 4) { |
| usage(); |
| } |
| |
| if (!args[0].equalsIgnoreCase("rollInSequence")) { |
| usage(); |
| } |
| |
| if (!args[2].equalsIgnoreCase("deleteRawdata")) { |
| usage(); |
| } |
| |
| if (args[1].equalsIgnoreCase("true")) { |
| rollInSequence = true; |
| } else { |
| rollInSequence = false; |
| } |
| |
| if (args[3].equalsIgnoreCase("true")) { |
| deleteRawdata = true; |
| } else { |
| deleteRawdata = false; |
| } |
| |
| Calendar calendar = Calendar.getInstance(); |
| int currentDay = Integer.parseInt(sdf.format(calendar.getTime())); |
| int currentHour = calendar.get(Calendar.HOUR_OF_DAY); |
| log.info("CurrentDay: " + currentDay); |
| log.info("currentHour" + currentHour); |
| |
| Path rootFolder = new Path(rollingFolder + "/hourly/"); |
| |
| FileStatus[] daysFS = fs.listStatus(rootFolder); |
| for (FileStatus dayFS : daysFS) { |
| try { |
| log.info("dayFs:" + dayFS.getPath().getName()); |
| int workingDay = Integer.parseInt(dayFS.getPath().getName()); |
| |
| Path hourlySrc = new Path(rollingFolder + "/hourly/" + workingDay); |
| FileStatus[] hoursFS = fs.listStatus(hourlySrc); |
| for (FileStatus hourFS : hoursFS) { |
| String workinhHourStr = hourFS.getPath().getName(); |
| int workingHour = Integer.parseInt(workinhHourStr); |
| if ((workingDay < currentDay) || // all previous days |
| ((workingDay == currentDay) && (workingHour < currentHour)) // Up |
| // to |
| // the |
| // last |
| // hour |
| ) { |
| |
| try { |
| buildHourlyFiles(chukwaMainRepository, tempDir, rollingFolder, |
| workingDay, workingHour); |
| } catch(Throwable e) { |
| e.printStackTrace(); |
| log.warn("Hourly rolling failed on :" + rollingFolder +"/" + workingDay +"/" + workingHour ) ; |
| } |
| |
| } // End if ( (workingDay < currentDay) || ( (workingDay == |
| // currentDay) && (intHour < currentHour) ) ) |
| } // End for(FileStatus hourFS : hoursFS) |
| } // End Try workingDay = |
| // Integer.parseInt(sdf.format(dayFS.getPath().getName())); |
| catch (NumberFormatException e) { /* Not a standard Day directory skip */ |
| log.warn("Exception in hourlyRolling:", e); |
| } |
| |
| } // for(FileStatus dayFS : daysFS) |
| } |
| |
| public int run(String[] args) throws Exception { |
| JobConf conf = new JobConf(new ChukwaConfiguration(), HourlyChukwaRecordRolling.class); |
| |
| conf.setJobName("HourlyChukwa-Rolling"); |
| conf.setInputFormat(SequenceFileInputFormat.class); |
| |
| conf.setMapperClass(IdentityMapper.class); |
| conf.setReducerClass(IdentityReducer.class); |
| |
| conf.setOutputKeyClass(ChukwaRecordKey.class); |
| conf.setOutputValueClass(ChukwaRecord.class); |
| conf.setOutputFormat(SequenceFileOutputFormat.class); |
| |
| log.info("HourlyChukwaRecordRolling input: " + args[0]); |
| log.info("HourlyChukwaRecordRolling output: " + args[1]); |
| |
| FileInputFormat.setInputPaths(conf, args[0]); |
| FileOutputFormat.setOutputPath(conf, new Path(args[1])); |
| conf.setJobPriority(JobPriority.LOW); |
| JobClient.runJob(conf); |
| return 0; |
| } |
| |
| } |