| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.chukwa.extraction.demux; |
| |
| |
| import java.io.IOException; |
| import java.net.URI; |
| import java.text.SimpleDateFormat; |
| import java.util.ArrayList; |
| import java.util.Calendar; |
| import java.util.List; |
| import org.apache.hadoop.chukwa.conf.ChukwaConfiguration; |
| import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; |
| import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; |
| import org.apache.hadoop.chukwa.util.ExceptionUtil; |
| import org.apache.hadoop.chukwa.util.HierarchyDataType; |
| import org.apache.hadoop.conf.Configured; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.mapred.FileInputFormat; |
| import org.apache.hadoop.mapred.FileOutputFormat; |
| import org.apache.hadoop.mapred.JobClient; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.JobPriority; |
| import org.apache.hadoop.mapred.SequenceFileInputFormat; |
| import org.apache.hadoop.mapred.SequenceFileOutputFormat; |
| import org.apache.hadoop.mapred.lib.IdentityMapper; |
| import org.apache.hadoop.mapred.lib.IdentityReducer; |
| import org.apache.hadoop.util.Tool; |
| import org.apache.log4j.Logger; |
| |
| // TODO do an abstract class for all rolling |
| public class DailyChukwaRecordRolling extends Configured implements Tool { |
| static Logger log = Logger.getLogger(DailyChukwaRecordRolling.class); |
| |
| static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd"); |
| static ChukwaConfiguration conf = null; |
| static FileSystem fs = null; |
| static final String HadoopLogDir = "_logs"; |
| static final String hadoopTempDir = "_temporary"; |
| |
| static boolean rollInSequence = true; |
| static boolean deleteRawdata = false; |
| |
| public static void usage() { |
| System.err |
| .println("usage: java org.apache.hadoop.chukwa.extraction.demux.DailyChukwaRecordRolling rollInSequence <True/False> deleteRawdata <True/False>"); |
| } |
| |
| public static boolean hourlyRolling(String dailyStreamDirectory) { |
| |
| Path pHour = null; |
| try { |
| log.info("Checking for HourlyRolling in " + dailyStreamDirectory); |
| |
| for (int i=0;i<24;i++) { |
| pHour = new Path(dailyStreamDirectory + "/" + i); |
| if (! fs.exists(pHour)) { |
| log.info("HourlyData is missing for:" + pHour); |
| continue; |
| } else { |
| FileStatus[] files = fs.listStatus(pHour); |
| boolean containsHourly = false; |
| for(FileStatus file: files) { |
| log.info("Debug checking" + file.getPath()); |
| if (file.getPath().getName().indexOf("_HourlyDone_") > 0) { |
| containsHourly = true; |
| break; |
| } |
| } |
| if (containsHourly == false) { |
| log.info("HourlyDone is missing for : " + pHour); |
| return false; |
| } |
| } |
| } |
| return true; |
| }catch(Exception e) { |
| e.printStackTrace(); |
| return false; |
| } |
| } |
| public static void buildDailyFiles(String chukwaMainRepository, |
| String tempDir, String rollingFolder, int workingDay) throws IOException { |
| // process |
| |
| boolean alldone = true; |
| |
| Path dayPath = new Path(rollingFolder + "/daily/" + workingDay); |
| FileStatus[] clustersFS = fs.listStatus(dayPath); |
| for (FileStatus clusterFs : clustersFS) { |
| String cluster = clusterFs.getPath().getName(); |
| |
| Path dataSourceClusterHourPaths = new Path(rollingFolder + "/daily/" |
| + workingDay + "/" + cluster); |
| FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths); |
| for (FileStatus dataSourceFS : dataSourcesFS) { |
| //CHUKWA-648: Make Chukwa Reduce Type to support hierarchy format |
| for (FileStatus dataSourcePath : HierarchyDataType.globStatus(fs, |
| dataSourceFS.getPath(), true)) { |
| String dataSource = HierarchyDataType.getDataType( |
| dataSourcePath.getPath(), |
| fs.getFileStatus(dataSourceClusterHourPaths).getPath()); |
| // Repo path = reposRootDirectory/<cluster>/<day>/*/*.evt |
| |
| |
| // put the rotate flag |
| fs.mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/" |
| + dataSource + "/" + workingDay + "/rotateDone")); |
| |
| if (hourlyRolling(chukwaMainRepository + "/" + cluster + "/" + dataSource + "/" + workingDay) == false) { |
| log.warn("Skipping this directory, hourly not done. " + chukwaMainRepository + "/" + cluster + "/" |
| + dataSource + "/" + workingDay ); |
| alldone = false; |
| continue; |
| } |
| |
| log.info("Running Daily rolling for " + chukwaMainRepository + "/" + cluster + "/" |
| + dataSource + "/" + workingDay + "/rotateDone"); |
| |
| // rotate |
| // Merge |
| String[] mergeArgs = new String[5]; |
| // input |
| mergeArgs[0] = chukwaMainRepository + "/" + cluster + "/" + dataSource |
| + "/" + workingDay + "/[0-9]*/*.evt"; |
| // temp dir |
| mergeArgs[1] = tempDir + "/" + cluster + "/" + dataSource + "/" |
| + workingDay + "_" + System.currentTimeMillis(); |
| // final output dir |
| mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource |
| + "/" + workingDay; |
| // final output fileName |
| mergeArgs[3] = dataSource + "_DailyDone_" + workingDay; |
| // delete rolling directory |
| mergeArgs[4] = rollingFolder + "/daily/" + workingDay + "/" + cluster |
| + "/" + dataSource; |
| |
| log.info("DailyChukwaRecordRolling 0: " + mergeArgs[0]); |
| log.info("DailyChukwaRecordRolling 1: " + mergeArgs[1]); |
| log.info("DailyChukwaRecordRolling 2: " + mergeArgs[2]); |
| log.info("DailyChukwaRecordRolling 3: " + mergeArgs[3]); |
| log.info("DailyChukwaRecordRolling 4: " + mergeArgs[4]); |
| |
| RecordMerger merge = new RecordMerger(conf, fs, |
| new DailyChukwaRecordRolling(), mergeArgs, deleteRawdata); |
| List<RecordMerger> allMerge = new ArrayList<RecordMerger>(); |
| if (rollInSequence) { |
| merge.mergeRecords(); |
| } else { |
| allMerge.add(merge); |
| merge.start(); |
| } |
| |
| // join all Threads |
| if (!rollInSequence) { |
| while (allMerge.size() > 0) { |
| RecordMerger m = allMerge.remove(0); |
| try { |
| m.join(); |
| } catch (InterruptedException e) { |
| } |
| } |
| } // End if (!rollInSequence) |
| |
| // Delete the processed dataSourceFS |
| FileUtil.fullyDelete(fs, dataSourceFS.getPath()); |
| |
| } // End for(FileStatus dataSourceFS : dataSourcesFS) |
| |
| // Delete the processed clusterFs |
| if (alldone == true) { |
| FileUtil.fullyDelete(fs, clusterFs.getPath()); |
| } |
| |
| |
| } // End for(FileStatus clusterFs : clustersFS) |
| } |
| // Delete the processed dayPath |
| if (alldone == true) { |
| FileUtil.fullyDelete(fs, dayPath); |
| } |
| |
| } |
| |
| /** |
| * @param args is command line parameters |
| * @throws Exception if unable to process data |
| */ |
| public static void main(String[] args) throws Exception { |
| |
| |
| conf = new ChukwaConfiguration(); |
| String fsName = conf.get("writer.hdfs.filesystem"); |
| fs = FileSystem.get(new URI(fsName), conf); |
| |
| // TODO read from config |
| String rollingFolder = "/chukwa/rolling/"; |
| String chukwaMainRepository = "/chukwa/repos/"; |
| String tempDir = "/chukwa/temp/dailyRolling/"; |
| |
| // TODO do a real parameter parsing |
| if (args.length != 4) { |
| usage(); |
| return; |
| } |
| |
| if (!args[0].equalsIgnoreCase("rollInSequence")) { |
| usage(); |
| return; |
| } |
| |
| if (!args[2].equalsIgnoreCase("deleteRawdata")) { |
| usage(); |
| return; |
| } |
| |
| if (args[1].equalsIgnoreCase("true")) { |
| rollInSequence = true; |
| } else { |
| rollInSequence = false; |
| } |
| |
| if (args[3].equalsIgnoreCase("true")) { |
| deleteRawdata = true; |
| } else { |
| deleteRawdata = false; |
| } |
| |
| log.info("rollInSequence: " + rollInSequence); |
| log.info("deleteRawdata: " + deleteRawdata); |
| |
| Calendar calendar = Calendar.getInstance(); |
| int currentDay = Integer.parseInt(sdf.format(calendar.getTime())); |
| int currentHour = calendar.get(Calendar.HOUR_OF_DAY); |
| log.info("CurrentDay: " + currentDay); |
| log.info("currentHour" + currentHour); |
| |
| Path rootFolder = new Path(rollingFolder + "/daily/"); |
| |
| FileStatus[] daysFS = fs.listStatus(rootFolder); |
| for (FileStatus dayFS : daysFS) { |
| try { |
| int workingDay = Integer.parseInt(dayFS.getPath().getName()); |
| log.info("Daily working on :" + workingDay); |
| if (workingDay < currentDay) { |
| |
| try { |
| buildDailyFiles(chukwaMainRepository, tempDir, rollingFolder, |
| workingDay); |
| } catch(Throwable e) { |
| e.printStackTrace(); |
| log.warn("Daily rolling failed on :" + rollingFolder +"/" + workingDay ) ; |
| } |
| |
| } // End if ( workingDay < currentDay) |
| } // End Try workingDay = |
| // Integer.parseInt(sdf.format(dayFS.getPath().getName())); |
| catch (NumberFormatException e) { /* Not a standard Day directory skip */ |
| log.debug(ExceptionUtil.getStackTrace(e)); |
| } |
| |
| } // for(FileStatus dayFS : daysFS) |
| } |
| |
| public int run(String[] args) throws Exception { |
| JobConf conf = new JobConf(new ChukwaConfiguration(), DailyChukwaRecordRolling.class); |
| |
| conf.setJobName("DailyChukwa-Rolling"); |
| conf.setInputFormat(SequenceFileInputFormat.class); |
| |
| conf.setMapperClass(IdentityMapper.class); |
| conf.setReducerClass(IdentityReducer.class); |
| |
| conf.setOutputKeyClass(ChukwaRecordKey.class); |
| conf.setOutputValueClass(ChukwaRecord.class); |
| conf.setOutputFormat(SequenceFileOutputFormat.class); |
| |
| log.info("DailyChukwaRecordRolling input: " + args[0]); |
| log.info("DailyChukwaRecordRolling output: " + args[1]); |
| |
| FileInputFormat.setInputPaths(conf, args[0]); |
| FileOutputFormat.setOutputPath(conf, new Path(args[1])); |
| conf.setJobPriority(JobPriority.LOW); |
| conf.setNumReduceTasks(1); |
| JobClient.runJob(conf); |
| return 0; |
| } |
| |
| } |