blob: d854bdd7818a4d5428044488ed55346efc757bc0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.latedata;
import org.apache.commons.cli.*;
import org.apache.falcon.FalconException;
import org.apache.falcon.catalog.CatalogPartition;
import org.apache.falcon.catalog.CatalogServiceFactory;
import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.URISyntaxException;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* A tool for late data handling.
*/
public class LateDataHandler extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(LateDataHandler.class);
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Path confPath = new Path("file:///"
+ System.getProperty("oozie.action.conf.xml"));
LOG.info("{} found ? {}", confPath, confPath.getFileSystem(conf).exists(confPath));
conf.addResource(confPath);
ToolRunner.run(conf, new LateDataHandler(), args);
}
private static CommandLine getCommand(String[] args) throws ParseException {
Options options = new Options();
Option opt = new Option("out", true, "Out file name");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("paths", true,
"Comma separated path list, further separated by #");
opt.setRequired(true);
options.addOption(opt);
opt = new Option(WorkflowExecutionArgs.INPUT_NAMES.getName(), true,
"Input feed names, further separated by #");
opt.setRequired(true);
options.addOption(opt);
opt = new Option(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName(), true,
"Feed storage types corresponding to Input feed names, separated by #");
opt.setRequired(true);
options.addOption(opt);
return new GnuParser().parse(options, args);
}
@Override
public int run(String[] args) throws Exception {
CommandLine command = getCommand(args);
String pathStr = getOptionValue(command, "paths");
if (pathStr == null) {
return 0;
}
String[] inputFeeds = getOptionValue(command, WorkflowExecutionArgs.INPUT_NAMES.getName()).split("#");
String[] pathGroups = pathStr.split("#");
String[] inputFeedStorageTypes =
getOptionValue(command, WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName()).split("#");
Map<String, Long> metrics = computeMetrics(inputFeeds, pathGroups, inputFeedStorageTypes);
Path file = new Path(command.getOptionValue("out"));
LOG.info("Persisting late data metrics: {} to file: {}", metrics, file);
persistMetrics(metrics, file);
return 0;
}
private String getOptionValue(CommandLine command, String option) {
String value = command.getOptionValue(option);
if (value.equals("null")) {
return null;
}
return value;
}
private Map<String, Long> computeMetrics(String[] inputFeeds, String[] pathGroups,
String[] inputFeedStorageTypes)
throws IOException, FalconException, URISyntaxException {
Map<String, Long> computedMetrics = new LinkedHashMap<String, Long>();
for (int index = 0; index < pathGroups.length; index++) {
long storageMetric = computeStorageMetric(pathGroups[index], inputFeedStorageTypes[index], getConf());
computedMetrics.put(inputFeeds[index], storageMetric);
}
return computedMetrics;
}
private void persistMetrics(Map<String, Long> metrics, Path file) throws IOException, FalconException {
OutputStream out = null;
try {
FileSystem fs = HadoopClientFactory.get().createFileSystem(file.toUri(), getConf());
out = fs.create(file);
// making sure falcon can read this file
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
fs.setPermission(file, permission);
for (Map.Entry<String, Long> entry : metrics.entrySet()) {
out.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
}
} finally {
if (out != null) {
try {
out.close();
} catch (IOException ignore) {
// ignore
}
}
}
}
/**
* This method computes the storage metrics for a given feed's instance or partition.
* It uses size on disk as the metric for File System Storage.
* It uses create time as the metric for Catalog Table Storage.
*
* The assumption is that if a partition has changed or reinstated, the underlying
* metric would change, either size or create time.
*
* @param feedUriTemplate URI for the feed storage, filesystem path or table uri
* @param feedStorageType feed storage type
* @param conf configuration
* @return computed metric
* @throws IOException
* @throws FalconException
* @throws URISyntaxException
*/
public long computeStorageMetric(String feedUriTemplate, String feedStorageType, Configuration conf)
throws IOException, FalconException, URISyntaxException {
Storage.TYPE storageType = Storage.TYPE.valueOf(feedStorageType);
if (storageType == Storage.TYPE.FILESYSTEM) {
// usage on file system is the metric
return getFileSystemUsageMetric(feedUriTemplate, conf);
} else if (storageType == Storage.TYPE.TABLE) {
// todo: this should have been done in oozie mapper but el ${coord:dataIn('input')} returns hcat scheme
feedUriTemplate = feedUriTemplate.replace("hcat", "thrift");
// creation time of the given partition is the metric
return getTablePartitionCreateTimeMetric(feedUriTemplate);
}
throw new IllegalArgumentException("Unknown storage type: " + feedStorageType);
}
/**
* The storage metric for File System Storage is the size of content
* this feed's instance represented by the path uses on the file system.
*
* If this instance was reinstated, the assumption is that the size of
* this instance on disk would change.
*
* @param pathGroup path on file system
* @param conf configuration
* @return metric as the size of data on file system
* @throws IOException
*/
private long getFileSystemUsageMetric(String pathGroup, Configuration conf)
throws IOException, FalconException {
long usage = 0;
for (String pathElement : pathGroup.split(",")) {
Path inPath = new Path(pathElement);
usage += usage(inPath, conf);
}
return usage;
}
private long usage(Path inPath, Configuration conf) throws IOException, FalconException {
FileSystem fs = HadoopClientFactory.get().createFileSystem(inPath.toUri(), conf);
FileStatus[] fileStatuses = fs.globStatus(inPath);
if (fileStatuses == null || fileStatuses.length == 0) {
return 0;
}
long totalSize = 0;
for (FileStatus fileStatus : fileStatuses) {
totalSize += fs.getContentSummary(fileStatus.getPath()).getLength();
}
return totalSize;
}
/**
* The storage metric for Table Storage is the create time of the given partition
* since there is API in Hive nor HCatalog to find if a partition has changed.
*
* If this partition was reinstated, the assumption is that the create time of
* this partition would change.
*
* @param feedUriTemplate catalog table uri
* @return metric as creation time of the given partition
* @throws IOException
* @throws URISyntaxException
* @throws FalconException
*/
private long getTablePartitionCreateTimeMetric(String feedUriTemplate)
throws IOException, URISyntaxException, FalconException {
CatalogStorage storage = (CatalogStorage)
FeedHelper.createStorage(Storage.TYPE.TABLE.name(), feedUriTemplate);
CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition(
storage.getCatalogUrl(), storage.getDatabase(), storage.getTable(), storage.getPartitions());
return partition == null ? 0 : partition.getCreateTime();
}
/**
* This method compares the recorded metrics persisted in file against
* the recently computed metrics and returns the list of feeds that has changed.
*
* @param file persisted metrics from the first run
* @param metrics newly computed metrics
* @param conf configuration
* @return list if feed names which has changed, empty string is none has changed
* @throws Exception
*/
public String detectChanges(Path file, Map<String, Long> metrics, Configuration conf)
throws Exception {
StringBuilder buffer = new StringBuilder();
FileSystem fs = HadoopClientFactory.get().createFileSystem(file.toUri(), conf);
BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(file)));
String line;
try {
Map<String, Long> recordedMetrics = new LinkedHashMap<String, Long>();
while ((line = in.readLine()) != null) {
if (line.isEmpty()) {
continue;
}
int index = line.indexOf('=');
String key = line.substring(0, index);
long size = Long.parseLong(line.substring(index + 1));
recordedMetrics.put(key, size);
}
for (Map.Entry<String, Long> entry : metrics.entrySet()) {
if (recordedMetrics.get(entry.getKey()) == null) {
LOG.info("No matching key {}", entry.getKey());
continue;
}
if (!recordedMetrics.get(entry.getKey()).equals(entry.getValue())) {
LOG.info("Recorded size: {} is different from new size {}",
recordedMetrics.get(entry.getKey()), entry.getValue());
buffer.append(entry.getKey()).append(',');
}
}
if (buffer.length() == 0) {
return "";
} else {
return buffer.substring(0, buffer.length() - 1);
}
} finally {
in.close();
}
}
}