| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package org.apache.fluo.cluster.runner; |
| |
| import java.io.IOException; |
| import java.lang.reflect.Method; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.List; |
| |
| import javax.inject.Provider; |
| |
| import com.beust.jcommander.JCommander; |
| import com.beust.jcommander.Parameter; |
| import com.beust.jcommander.ParameterException; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.collect.Iterables; |
| import com.google.inject.AbstractModule; |
| import com.google.inject.Guice; |
| import org.apache.accumulo.core.client.Scanner; |
| import org.apache.accumulo.core.client.TableNotFoundException; |
| import org.apache.fluo.api.config.FluoConfiguration; |
| import org.apache.fluo.api.exceptions.FluoException; |
| import org.apache.fluo.core.impl.Environment; |
| import org.apache.fluo.core.impl.Notification; |
| import org.apache.fluo.core.util.ScanUtil; |
| import org.apache.fluo.core.util.ScanUtil.ScanFlags; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Base class for running a Fluo application |
| */ |
| @Deprecated |
| public abstract class AppRunner { |
| |
| private static final Logger log = LoggerFactory.getLogger(AppRunner.class); |
| private static final long MIN_SLEEP_SEC = 10; |
| private static final long MAX_SLEEP_SEC = 300; |
| |
| private String scriptName; |
| |
| public AppRunner(String scriptName) { |
| this.scriptName = scriptName; |
| } |
| |
| public void scan(FluoConfiguration config, String[] args) { |
| ScanOptions options = new ScanOptions(); |
| JCommander jcommand = new JCommander(options); |
| jcommand.setProgramName(scriptName + " scan <app>"); |
| try { |
| jcommand.parse(args); |
| } catch (ParameterException e) { |
| System.err.println(e.getMessage()); |
| jcommand.usage(); |
| System.exit(-1); |
| } |
| |
| if (options.help) { |
| jcommand.usage(); |
| System.exit(0); |
| } |
| |
| try { |
| if (options.scanAccumuloTable) { |
| ScanUtil.scanAccumulo(options.getScanOpts(), config, System.out); |
| } else { |
| ScanUtil.scanFluo(options.getScanOpts(), config, System.out); |
| } |
| } catch (IOException e) { |
| System.err.println(e.getMessage()); |
| System.exit(-1); |
| } |
| } |
| |
| private long calculateSleep(long notifyCount, long numWorkers) { |
| long sleep = notifyCount / numWorkers / 100; |
| if (sleep < MIN_SLEEP_SEC) { |
| return MIN_SLEEP_SEC; |
| } else if (sleep > MAX_SLEEP_SEC) { |
| return MAX_SLEEP_SEC; |
| } |
| return sleep; |
| } |
| |
| @VisibleForTesting |
| public long countNotifications(Environment env) { |
| Scanner scanner = null; |
| try { |
| scanner = env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations()); |
| } catch (TableNotFoundException e) { |
| log.error("An exception was thrown -", e); |
| throw new FluoException(e); |
| } |
| |
| Notification.configureScanner(scanner); |
| |
| return Iterables.size(scanner); |
| } |
| |
| public void waitUntilFinished(FluoConfiguration config) { |
| try (Environment env = new Environment(config)) { |
| log.info("The wait command will exit when all notifications are processed"); |
| while (true) { |
| long ts1 = env.getSharedResources().getOracleClient().getStamp().getTxTimestamp(); |
| long ntfyCount = countNotifications(env); |
| long ts2 = env.getSharedResources().getOracleClient().getStamp().getTxTimestamp(); |
| if (ntfyCount == 0 && ts1 == (ts2 - 1)) { |
| log.info("All processing has finished!"); |
| break; |
| } |
| |
| try { |
| long sleepSec = calculateSleep(ntfyCount, |
| org.apache.fluo.cluster.util.FluoYarnConfig.getWorkerInstances(config)); |
| log.info("{} notifications are still outstanding. Will try again in {} seconds...", |
| ntfyCount, sleepSec); |
| Thread.sleep(1000 * sleepSec); |
| } catch (InterruptedException e) { |
| log.error("Sleep was interrupted! Exiting..."); |
| System.exit(-1); |
| } |
| } |
| } catch (FluoException e) { |
| log.error(e.getMessage()); |
| System.exit(-1); |
| } catch (Exception e) { |
| log.error("An exception was thrown -", e); |
| System.exit(-1); |
| } |
| } |
| |
| private static class FluoConfigModule extends AbstractModule { |
| |
| private Class<?> clazz; |
| private FluoConfiguration fluoConfig; |
| |
| FluoConfigModule(Class<?> clazz, FluoConfiguration fluoConfig) { |
| this.clazz = clazz; |
| this.fluoConfig = fluoConfig; |
| } |
| |
| @Override |
| protected void configure() { |
| requestStaticInjection(clazz); |
| bind(FluoConfiguration.class).toProvider((Provider<FluoConfiguration>) () -> fluoConfig); |
| } |
| } |
| |
| public void exec(FluoConfiguration fluoConfig, String[] args) throws Exception { |
| |
| String className = args[0]; |
| Arrays.copyOfRange(args, 1, args.length); |
| |
| Class<?> clazz = Class.forName(className); |
| |
| // inject fluo configuration |
| Guice.createInjector(new FluoConfigModule(clazz, fluoConfig)); |
| |
| Method method = clazz.getMethod("main", String[].class); |
| method.invoke(null, (Object) Arrays.copyOfRange(args, 1, args.length)); |
| } |
| |
| public static class ScanOptions { |
| |
| @Parameter(names = "-s", description = "Start row (inclusive) of scan") |
| private String startRow; |
| |
| @Parameter(names = "-e", description = "End row (inclusive) of scan") |
| private String endRow; |
| |
| @Parameter(names = "-c", description = "Columns of scan in comma separated format: " |
| + "<<columnfamily>[:<columnqualifier>]{,<columnfamily>[:<columnqualifier>]}> ") |
| private List<String> columns; |
| |
| @Parameter(names = "-r", description = "Exact row to scan") |
| private String exactRow; |
| |
| @Parameter(names = "-p", description = "Row prefix to scan") |
| private String rowPrefix; |
| |
| @Parameter(names = {"-h", "-help", "--help"}, help = true, description = "Prints help") |
| public boolean help; |
| |
| @Parameter(names = {"-esc", "--escape-non-ascii"}, help = true, |
| description = "Hex encode non ascii bytes", arity = 1) |
| public boolean hexEncNonAscii = true; |
| |
| @Parameter(names = "--raw", help = true, |
| description = "Show underlying key/values stored in Accumulo. Interprets the data using Fluo " |
| + "internal schema, making it easier to comprehend.") |
| public boolean scanAccumuloTable = false; |
| |
| public String getStartRow() { |
| return startRow; |
| } |
| |
| public String getEndRow() { |
| return endRow; |
| } |
| |
| public String getExactRow() { |
| return exactRow; |
| } |
| |
| public String getRowPrefix() { |
| return rowPrefix; |
| } |
| |
| public List<String> getColumns() { |
| if (columns == null) { |
| return Collections.emptyList(); |
| } |
| return columns; |
| } |
| |
| public ScanUtil.ScanOpts getScanOpts() { |
| EnumSet<ScanFlags> flags = EnumSet.noneOf(ScanFlags.class); |
| |
| ScanUtil.setFlag(flags, help, ScanFlags.HELP); |
| ScanUtil.setFlag(flags, hexEncNonAscii, ScanFlags.HEX); |
| ScanUtil.setFlag(flags, scanAccumuloTable, ScanFlags.ACCUMULO); |
| |
| return new ScanUtil.ScanOpts(startRow, endRow, columns, exactRow, rowPrefix, flags); |
| } |
| } |
| } |