blob: 2a902a7fc324814cf16356ccc1e5ad648a606b89 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.export.client;
import static org.apache.rya.export.DBType.ACCUMULO;
import static org.apache.rya.export.MergePolicy.TIMESTAMP;
import java.io.File;
import java.io.IOException;
import java.net.UnknownHostException;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Logger;
import org.apache.log4j.xml.DOMConfigurator;
import org.apache.rya.api.path.PathUtils;
import org.apache.rya.export.accumulo.AccumuloRyaStatementStore;
import org.apache.rya.export.api.MergerException;
import org.apache.rya.export.api.conf.MergeConfiguration;
import org.apache.rya.export.api.conf.MergeConfigurationException;
import org.apache.rya.export.api.conf.policy.TimestampPolicyMergeConfiguration;
import org.apache.rya.export.api.store.RyaStatementStore;
import org.apache.rya.export.client.conf.MergeConfigurationCLI;
import org.apache.rya.export.client.conf.TimeUtils;
import org.apache.rya.export.client.merge.MemoryTimeMerger;
import org.apache.rya.export.client.merge.StatementStoreFactory;
import org.apache.rya.export.client.merge.VisibilityStatementMerger;
import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
import org.eclipse.rdf4j.query.MalformedQueryException;
import org.eclipse.rdf4j.query.UpdateExecutionException;
import org.eclipse.rdf4j.repository.RepositoryException;
import org.eclipse.rdf4j.sail.SailException;
import com.google.common.base.Optional;
/**
* Drives the MergeTool.
*/
public class MergeDriverClient {
private static final Logger LOG = Logger.getLogger(MergeDriverClient.class);
private static MergeConfiguration configuration;
public static void main(final String [] args) throws ParseException,
MergeConfigurationException, UnknownHostException, MergerException,
java.text.ParseException, SailException, AccumuloException,
AccumuloSecurityException, InferenceEngineException, RepositoryException,
MalformedQueryException, UpdateExecutionException {
final String log4jConfiguration = System.getProperties().getProperty("log4j.configuration");
if (StringUtils.isNotBlank(log4jConfiguration)) {
final String parsedConfiguration = PathUtils.clean(StringUtils.removeStart(log4jConfiguration, "file:"));
final File configFile = new File(parsedConfiguration);
if (configFile.exists()) {
DOMConfigurator.configure(parsedConfiguration);
} else {
BasicConfigurator.configure();
}
}
final MergeConfigurationCLI config = new MergeConfigurationCLI(args);
try {
configuration = config.createConfiguration();
} catch (final MergeConfigurationException e) {
LOG.error("Configuration failed.", e);
}
final boolean useTimeSync = configuration.getUseNtpServer();
Optional<Long> offset = Optional.absent();
if (useTimeSync) {
final String tomcat = configuration.getChildTomcatUrl();
final String ntpHost = configuration.getNtpServerHost();
try {
offset = Optional.fromNullable(TimeUtils.getNtpServerAndMachineTimeDifference(ntpHost, tomcat));
} catch (final IOException e) {
LOG.error("Unable to get time difference between time server: " + ntpHost + " and the server: " + tomcat, e);
}
}
final StatementStoreFactory storeFactory = new StatementStoreFactory(configuration);
try {
final RyaStatementStore parentStore = storeFactory.getParentStatementStore();
final RyaStatementStore childStore = storeFactory.getChildStatementStore();
LOG.info("Starting Merge Tool");
if(configuration.getParentDBType() == ACCUMULO && configuration.getChildDBType() == ACCUMULO) {
final AccumuloRyaStatementStore childAStore = (AccumuloRyaStatementStore) childStore;
final AccumuloRyaStatementStore parentAStore = (AccumuloRyaStatementStore) parentStore;
//do map reduce merging.
//TODO: Run Merger
} else {
if(configuration.getMergePolicy() == TIMESTAMP) {
final TimestampPolicyMergeConfiguration timeConfig = (TimestampPolicyMergeConfiguration) configuration;
final Long timeOffset;
if (offset.isPresent()) {
timeOffset = offset.get();
} else {
timeOffset = 0L;
}
final MemoryTimeMerger merger = new MemoryTimeMerger(parentStore, childStore,
new VisibilityStatementMerger(), timeConfig.getToolStartTime(),
configuration.getParentRyaInstanceName(), timeOffset);
merger.runJob();
}
}
} catch (final Exception e) {
LOG.error("Something went wrong creating a Rya Statement Store connection.", e);
}
Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread thread, final Throwable throwable) {
LOG.error("Uncaught exception in " + thread.getName(), throwable);
}
});
LOG.info("Finished running Merge Tool");
System.exit(1);
}
}