blob: 5a9f0ca0ff573159695c3da8487430672152130a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.fluo.mini;
import java.io.File;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.accumulo.minicluster.MiniAccumuloConfig;
import org.apache.fluo.api.client.FluoAdmin;
import org.apache.fluo.api.client.FluoAdmin.InitializationOptions;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.api.mini.MiniFluo;
import org.apache.fluo.core.client.FluoClientImpl;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.FluoConfigurationImpl;
import org.apache.fluo.core.impl.Notification;
import org.apache.fluo.core.oracle.OracleServer;
import org.apache.fluo.core.worker.NotificationFinder;
import org.apache.fluo.core.worker.NotificationFinderFactory;
import org.apache.fluo.core.worker.NotificationProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation of MiniFluo
*/
public class MiniFluoImpl implements MiniFluo {
private static final Logger log = LoggerFactory.getLogger(MiniFluoImpl.class);
private static final AtomicInteger reporterCounter = new AtomicInteger(1);
private final Environment env;
private OracleServer oserver;
private NotificationProcessor mnp;
private NotificationFinder notificationFinder;
private FluoConfiguration config;
private MiniAccumuloCluster cluster = null;
protected static final String USER = "root";
protected static final String PASSWORD = "secret";
private AutoCloseable reporter;
public static String clientPropsPath(FluoConfiguration config) {
return config.getMiniDataDir() + "/client.properties";
}
@VisibleForTesting
public NotificationProcessor getNotificationProcessor() {
return mnp;
}
public MiniFluoImpl(FluoConfiguration fluoConfig) {
if (!fluoConfig.hasRequiredMiniFluoProps()) {
throw new IllegalArgumentException("MiniFluo configuration is not valid");
}
config = fluoConfig;
try {
if (config.getMiniStartAccumulo()) {
startMiniAccumulo();
}
config.setProperty(FluoConfigurationImpl.NTFY_FINDER_MIN_SLEEP_TIME_PROP, 50);
config.setProperty(FluoConfigurationImpl.NTFY_FINDER_MAX_SLEEP_TIME_PROP, 100);
env = new Environment(config);
reporter = FluoClientImpl.setupReporters(env, "mini", reporterCounter);
oserver = new OracleServer(env);
oserver.start();
mnp = new NotificationProcessor(env);
notificationFinder = NotificationFinderFactory.newNotificationFinder(env.getConfiguration());
notificationFinder.init(env, mnp);
notificationFinder.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void startMiniAccumulo() {
try {
// start mini accumulo cluster
MiniAccumuloConfig cfg = new MiniAccumuloConfig(new File(config.getMiniDataDir()), PASSWORD);
cluster = new MiniAccumuloCluster(cfg);
cluster.start();
log.debug("Started MiniAccumulo(accumulo=" + cluster.getInstanceName() + " zk="
+ cluster.getZooKeepers() + ")");
// configuration that must overridden
config.setAccumuloInstance(cluster.getInstanceName());
config.setAccumuloUser(USER);
config.setAccumuloPassword(PASSWORD);
config.setAccumuloZookeepers(cluster.getZooKeepers());
config.setInstanceZookeepers(cluster.getZooKeepers() + "/fluo");
// configuration that only needs to be set if not by user
if ((config.containsKey(FluoConfiguration.ACCUMULO_TABLE_PROP) == false)
|| config.getAccumuloTable().trim().isEmpty()) {
config.setAccumuloTable("fluo");
}
InitializationOptions opts = new InitializationOptions();
try (FluoAdmin admin = FluoFactory.newAdmin(config)) {
admin.initialize(opts);
}
File miniProps = new File(clientPropsPath(config));
config.getClientConfiguration().save(miniProps);
log.debug("Wrote MiniFluo client properties to {}", miniProps.getAbsolutePath());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public SimpleConfiguration getClientConfiguration() {
return config.getClientConfiguration();
}
@Override
public void close() {
try {
if (oserver != null) {
notificationFinder.stop();
mnp.close();
oserver.stop();
env.close();
reporter.close();
if (cluster != null) {
cluster.stop();
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void waitForObservers() {
try {
Scanner scanner =
env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations());
Notification.configureScanner(scanner);
while (true) {
long ts1 = env.getSharedResources().getOracleClient().getStamp().getTxTimestamp();
long ntfyCount = Iterables.size(scanner);
long ts2 = env.getSharedResources().getOracleClient().getStamp().getTxTimestamp();
if (ntfyCount == 0 && ts1 == (ts2 - 1)) {
break;
}
long sleepTime = ntfyCount / 2;
sleepTime = Math.min(Math.max(10, sleepTime), 10000);
Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}