blob: bd61fc8ccfd59646964fc613f8d2136f2892261b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.accumulo.mr.merge.driver;
import static org.apache.rya.accumulo.mr.merge.util.TestUtils.LAST_MONTH;
import static org.apache.rya.accumulo.mr.merge.util.TestUtils.TODAY;
import static org.apache.rya.accumulo.mr.merge.util.TestUtils.createRyaStatement;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.log4j.Logger;
import org.apache.rya.accumulo.mr.merge.util.AccumuloRyaUtils;
import org.apache.rya.accumulo.mr.merge.util.TestUtils;
import org.apache.rya.api.RdfCloudTripleStoreConstants;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.persist.RyaDAOException;
/**
* Runs a {@link MiniAccumuloCluster}.
*/
public class MiniAccumuloClusterDriver extends AccumuloDualInstanceDriver {
private static final Logger log = Logger.getLogger(MiniAccumuloClusterDriver.class);
/**
* {@code true} to configure the cluster for merging. {@code false} to configure
* the cluster for copying.
*/
private static final boolean IS_MERGE_SETUP = true;
private static boolean keepRunning = true;
/**
* Creates a new instance of {@link MiniAccumuloClusterDriver}.
*/
public MiniAccumuloClusterDriver() {
super(false, !IS_MERGE_SETUP, !IS_MERGE_SETUP, IS_MERGE_SETUP, IS_MERGE_SETUP);
}
private void writeStatements() throws RyaDAOException, IOException {
// This statement was in both parent/child instances a month ago and is before the start time of yesterday
// but it was left alone. It should remain in the parent after merging.
final RyaStatement ryaStatementOutOfTimeRange = createRyaStatement("coach", "called", "timeout", LAST_MONTH);
// This statement was in both parent/child instances a month ago but after the start time of yesterday
// the parent deleted it and the child still has it. It should stay deleted in the parent after merging.
final RyaStatement ryaStatementParentDeletedAfter = createRyaStatement("parent", "deleted", "after", LAST_MONTH);
// This statement was added by the parent after the start time of yesterday and doesn't exist in the child.
// It should stay in the parent after merging.
final RyaStatement ryaStatementParentAddedAfter = createRyaStatement("parent", "added", "after", TODAY);
// This statement was in both parent/child instances a month ago but after the start time of yesterday
// the child deleted it and the parent still has it. It should be deleted from the parent after merging.
final RyaStatement ryaStatementChildDeletedAfter = createRyaStatement("child", "deleted", "after", LAST_MONTH);
// This statement was added by the child after the start time of yesterday and doesn't exist in the parent.
// It should be added to the parent after merging.
final RyaStatement ryaStatementChildAddedAfter = createRyaStatement("child", "added", "after", TODAY);
// This statement was modified by the child after the start of yesterday (The timestamp changes after updating)
// It should be updated in the parent to match the child.
final RyaStatement ryaStatementUpdatedByChild = createRyaStatement("bob", "catches", "ball", LAST_MONTH);
final RyaStatement ryaStatementUntouchedByChild = createRyaStatement("bill", "talks to", "john", LAST_MONTH);
final RyaStatement ryaStatementDeletedByChild = createRyaStatement("susan", "eats", "burgers", LAST_MONTH);
final RyaStatement ryaStatementAddedByChild = createRyaStatement("ronnie", "plays", "guitar", TODAY);
// This statement was modified by the child to change the column visibility.
// The parent should combine the child's visibility with its visibility.
final RyaStatement ryaStatementVisibilityDifferent = createRyaStatement("I", "see", "you", LAST_MONTH);
ryaStatementVisibilityDifferent.setColumnVisibility(PARENT_COLUMN_VISIBILITY.getExpression());
final List<RyaStatement> parentStatements = new ArrayList<>();
final List<RyaStatement> childStatements = new ArrayList<>();
// Setup initial parent instance with 7 rows
// This is the state of the parent data (as it is today) before merging occurs which will use the specified start time of yesterday.
parentStatements.add(ryaStatementOutOfTimeRange); // Merging should keep statement
parentStatements.add(ryaStatementUpdatedByChild); // Merging should update statement
parentStatements.add(ryaStatementUntouchedByChild); // Merging should keep statement
parentStatements.add(ryaStatementDeletedByChild); // Merging should delete statement
parentStatements.add(ryaStatementVisibilityDifferent); // Merging should update statement
parentStatements.add(ryaStatementParentAddedAfter); // Merging should keep statement
parentStatements.add(ryaStatementChildDeletedAfter); // Merging should delete statement
addParentRyaStatements(parentStatements);
// Simulate the child coming back with a modified data set before the merging occurs.
// (1 updated row, 1 row left alone because it was unchanged, 1 row outside time range,
// 1 row deleted, 1 new row added, 1 modified visibility, 1 deleted by child, 1 added by child).
// There should be 5 rows in the child instance (4 which will be scanned over from the start time).
ryaStatementUpdatedByChild.setObject(TestUtils.createRyaIri("football"));
ryaStatementUpdatedByChild.setTimestamp(TODAY.getTime());
ryaStatementVisibilityDifferent.setColumnVisibility(CHILD_COLUMN_VISIBILITY.getExpression());
childStatements.add(ryaStatementOutOfTimeRange);
childStatements.add(ryaStatementUpdatedByChild);
childStatements.add(ryaStatementUntouchedByChild);
childStatements.add(ryaStatementAddedByChild); // Merging should add statement
childStatements.add(ryaStatementVisibilityDifferent);
childStatements.add(ryaStatementParentDeletedAfter);
childStatements.add(ryaStatementChildAddedAfter); // Merging should add statement
if (IS_MERGE_SETUP) {
addChildRyaStatements(childStatements);
}
AccumuloRyaUtils.printTable(PARENT_TABLE_PREFIX + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, getParentConfig());
AccumuloRyaUtils.printTable(CHILD_TABLE_PREFIX + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, getChildConfig());
}
public static void main(final String args[]) {
log.info("Setting up MiniAccumulo Cluster");
Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread thread, final Throwable throwable) {
log.fatal("Uncaught exception in " + thread.getName(), throwable);
}
});
final MiniAccumuloClusterDriver macDriver = new MiniAccumuloClusterDriver();
try {
macDriver.setUp();
log.info("Populating clusters");
macDriver.writeStatements();
log.info("MiniAccumuloClusters running and populated");
} catch (final Exception e) {
log.error("Error setting up and writing statements", e);
keepRunning = false;
}
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
log.info("Shutting down...");
try {
macDriver.tearDown();
} catch (final Exception e) {
log.error("Error while shutting down", e);
} finally {
keepRunning = false;
log.info("Done shutting down");
}
}
});
while(keepRunning) {
try {
Thread.sleep(2000);
} catch (final InterruptedException e) {
log.error("Interrupted exception while running MiniAccumuloClusterDriver", e);
keepRunning = false;
}
}
}
}