| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.rya.accumulo.mr.merge; |
| |
| import static org.apache.rya.accumulo.mr.merge.util.TestUtils.LAST_MONTH; |
| import static org.apache.rya.accumulo.mr.merge.util.TestUtils.TODAY; |
| import static org.apache.rya.accumulo.mr.merge.util.TestUtils.YESTERDAY; |
| import static org.apache.rya.accumulo.mr.merge.util.TestUtils.createRyaStatement; |
| import static org.apache.rya.accumulo.mr.merge.util.ToolConfigUtils.makeArgument; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.fail; |
| |
| import java.io.File; |
| import java.util.Date; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map.Entry; |
| |
| import org.apache.accumulo.core.client.AccumuloException; |
| import org.apache.accumulo.core.client.AccumuloSecurityException; |
| import org.apache.accumulo.core.client.Connector; |
| import org.apache.accumulo.core.client.Scanner; |
| import org.apache.accumulo.core.client.admin.SecurityOperations; |
| import org.apache.accumulo.core.data.Key; |
| import org.apache.accumulo.core.data.Value; |
| import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm; |
| import org.apache.accumulo.core.security.Authorizations; |
| import org.apache.accumulo.core.security.ColumnVisibility; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.log4j.Logger; |
| import org.apache.rya.accumulo.AccumuloRdfConfiguration; |
| import org.apache.rya.accumulo.AccumuloRyaDAO; |
| import org.apache.rya.accumulo.mr.MRUtils; |
| import org.apache.rya.accumulo.mr.merge.common.InstanceType; |
| import org.apache.rya.accumulo.mr.merge.driver.AccumuloDualInstanceDriver; |
| import org.apache.rya.accumulo.mr.merge.util.AccumuloRyaUtils; |
| import org.apache.rya.accumulo.mr.merge.util.TestUtils; |
| import org.apache.rya.accumulo.mr.merge.util.TimeUtils; |
| import org.apache.rya.api.RdfCloudTripleStoreConfiguration; |
| import org.apache.rya.api.RdfCloudTripleStoreConstants; |
| import org.apache.rya.api.domain.RyaStatement; |
| import org.apache.rya.api.persist.RyaDAOException; |
| import org.apache.rya.indexing.accumulo.ConfigUtils; |
| import org.eclipse.rdf4j.common.iteration.CloseableIteration; |
| import org.junit.After; |
| import org.junit.AfterClass; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| /** |
| * Tests for {@link CopyTool}. |
| */ |
| public class CopyToolTest { |
| private static final Logger log = Logger.getLogger(CopyToolTest.class); |
| |
| private static final boolean IS_MOCK = true; |
| private static final boolean USE_TIME_SYNC = false; |
| private static final boolean USE_COPY_FILE_OUTPUT = false; |
| |
| private static final String CHILD_SUFFIX = MergeTool.CHILD_SUFFIX; |
| |
| private static final String PARENT_PASSWORD = AccumuloDualInstanceDriver.PARENT_PASSWORD; |
| private static final String PARENT_INSTANCE = AccumuloDualInstanceDriver.PARENT_INSTANCE; |
| private static final String PARENT_TABLE_PREFIX = AccumuloDualInstanceDriver.PARENT_TABLE_PREFIX; |
| private static final String PARENT_AUTH = AccumuloDualInstanceDriver.PARENT_AUTH; |
| private static final ColumnVisibility PARENT_COLUMN_VISIBILITY = new ColumnVisibility(PARENT_AUTH); |
| private static final String PARENT_TOMCAT_URL = "http://rya-example-box:8080"; |
| |
| private static final String CHILD_PASSWORD = AccumuloDualInstanceDriver.CHILD_PASSWORD; |
| private static final String CHILD_INSTANCE = AccumuloDualInstanceDriver.CHILD_INSTANCE; |
| private static final String CHILD_TABLE_PREFIX = AccumuloDualInstanceDriver.CHILD_TABLE_PREFIX; |
| private static final String CHILD_TOMCAT_URL = "http://localhost:8080"; |
| |
| private static AccumuloRyaDAO parentDao; |
| private static AccumuloRyaDAO childDao; |
| |
| private static AccumuloRdfConfiguration parentConfig; |
| private static AccumuloRdfConfiguration childConfig; |
| |
| private static AccumuloDualInstanceDriver accumuloDualInstanceDriver; |
| private static CopyTool copyTool = null; |
| private boolean isImporting = false; |
| |
| public static String getProjectRootDir() { |
| String rootDir = System.getProperty("basedir"); |
| if(rootDir == null) { |
| rootDir = System.getProperty("user.dir"); |
| } |
| if(rootDir == null) { |
| throw new RuntimeException("Expected user.dir to contain a value"); |
| } |
| return rootDir; |
| } |
| |
| private static File getUnitTestScratchDirectory(final String testName) { |
| final File dir = new File(getProjectRootDir() + File.separator + "target" |
| + File.separator + "TestScratch" + File.separator |
| + testName+ "-" + System.currentTimeMillis()); |
| Assert.assertTrue("Unable to make TestScratchDirectory:"+ dir.getAbsolutePath(), dir.mkdirs()); |
| return dir; |
| } |
| |
| @BeforeClass |
| public static void setUp() throws Exception { |
| accumuloDualInstanceDriver = new AccumuloDualInstanceDriver(IS_MOCK, true, true, false, false); |
| accumuloDualInstanceDriver.setUpInstances(); |
| } |
| |
| @Before |
| public void setUpPerTest() throws Exception { |
| accumuloDualInstanceDriver.setUpTables(); |
| |
| accumuloDualInstanceDriver.setUpDaos(); |
| |
| accumuloDualInstanceDriver.setUpConfigs(); |
| |
| parentConfig = accumuloDualInstanceDriver.getParentConfig(); |
| childConfig = accumuloDualInstanceDriver.getChildConfig(); |
| parentDao = accumuloDualInstanceDriver.getParentDao(); |
| childDao = accumuloDualInstanceDriver.getChildDao(); |
| } |
| |
| @After |
| public void tearDownPerTest() throws Exception { |
| log.info("tearDownPerTest(): tearing down now."); |
| accumuloDualInstanceDriver.tearDownTables(); |
| accumuloDualInstanceDriver.tearDownDaos(); |
| if (copyTool != null) { |
| copyTool.shutdown(); |
| } |
| } |
| |
| @AfterClass |
| public static void tearDownPerClass() throws Exception { |
| log.info("tearDownPerClass(): tearing down now."); |
| accumuloDualInstanceDriver.tearDown(); |
| } |
| |
| private void assertStatementInChild(final String description, final int verifyResultCount, final RyaStatement matchStatement) throws RyaDAOException { |
| TestUtils.assertStatementInInstance(description, verifyResultCount, matchStatement, childDao, childConfig); |
| } |
| |
| private void copyToolRun(final Date startDate) throws AccumuloException, AccumuloSecurityException { |
| copyTool = new CopyTool(); |
| copyTool.setupAndRun(new String[] { |
| makeArgument(MRUtils.AC_MOCK_PROP, Boolean.toString(IS_MOCK)), |
| makeArgument(MRUtils.AC_INSTANCE_PROP, PARENT_INSTANCE), |
| makeArgument(MRUtils.AC_USERNAME_PROP, accumuloDualInstanceDriver.getParentUser()), |
| makeArgument(MRUtils.AC_PWD_PROP, PARENT_PASSWORD), |
| makeArgument(MRUtils.TABLE_PREFIX_PROPERTY, PARENT_TABLE_PREFIX), |
| makeArgument(MRUtils.AC_AUTH_PROP, accumuloDualInstanceDriver.getParentAuths().toString()), |
| makeArgument(MRUtils.AC_ZK_PROP, accumuloDualInstanceDriver.getParentZooKeepers()), |
| makeArgument(CopyTool.PARENT_TOMCAT_URL_PROP, PARENT_TOMCAT_URL), |
| makeArgument(MRUtils.AC_MOCK_PROP + CHILD_SUFFIX, Boolean.toString(IS_MOCK)), |
| makeArgument(MRUtils.AC_INSTANCE_PROP + CHILD_SUFFIX, CHILD_INSTANCE), |
| makeArgument(MRUtils.AC_USERNAME_PROP + CHILD_SUFFIX, accumuloDualInstanceDriver.getChildUser()), |
| makeArgument(MRUtils.AC_PWD_PROP + CHILD_SUFFIX, CHILD_PASSWORD), |
| makeArgument(MRUtils.TABLE_PREFIX_PROPERTY + CHILD_SUFFIX, CHILD_TABLE_PREFIX), |
| makeArgument(MRUtils.AC_AUTH_PROP + CHILD_SUFFIX, accumuloDualInstanceDriver.getChildAuths() != null ? accumuloDualInstanceDriver.getChildAuths().toString() : null), |
| makeArgument(MRUtils.AC_ZK_PROP + CHILD_SUFFIX, accumuloDualInstanceDriver.getChildZooKeepers() != null ? accumuloDualInstanceDriver.getChildZooKeepers() : "localhost"), |
| makeArgument(CopyTool.CHILD_TOMCAT_URL_PROP, CHILD_TOMCAT_URL), |
| makeArgument(CopyTool.CREATE_CHILD_INSTANCE_TYPE_PROP, (IS_MOCK ? InstanceType.MOCK : InstanceType.MINI).toString()), |
| makeArgument(CopyTool.NTP_SERVER_HOST_PROP, TimeUtils.DEFAULT_TIME_SERVER_HOST), |
| makeArgument(CopyTool.USE_NTP_SERVER_PROP, Boolean.toString(USE_TIME_SYNC)), |
| makeArgument(CopyTool.USE_COPY_FILE_OUTPUT, Boolean.toString(USE_COPY_FILE_OUTPUT)), |
| makeArgument(CopyTool.COPY_FILE_OUTPUT_PATH, getUnitTestScratchDirectory(CopyToolTest.class.getSimpleName() + "-copyFileOutput").getAbsolutePath()), |
| makeArgument(CopyTool.COPY_FILE_OUTPUT_COMPRESSION_TYPE, Algorithm.GZ.getName()), |
| makeArgument(CopyTool.USE_COPY_FILE_OUTPUT_DIRECTORY_CLEAR, Boolean.toString(true)), |
| makeArgument(CopyTool.COPY_FILE_IMPORT_DIRECTORY, "resources/test/copy_tool_file_output/"), |
| makeArgument(CopyTool.USE_COPY_FILE_IMPORT, Boolean.toString(isImporting)), |
| makeArgument(MergeTool.START_TIME_PROP, MergeTool.getStartTimeString(startDate)), |
| makeArgument("hadoop.tmp.dir", getUnitTestScratchDirectory(CopyToolTest.class.getSimpleName()).getAbsolutePath()) |
| }); |
| |
| final Configuration toolConfig = copyTool.getConf(); |
| final String zooKeepers = toolConfig.get(MRUtils.AC_ZK_PROP + CHILD_SUFFIX); |
| MergeTool.setDuplicateKeysForProperty(childConfig, MRUtils.AC_ZK_PROP, zooKeepers); |
| |
| log.info("Finished running tool."); |
| } |
| |
| @Test |
| public void testCopyTool() throws Exception { |
| final RyaStatement ryaStatementOutOfTimeRange = createRyaStatement("coach", "called", "timeout", LAST_MONTH); |
| |
| final RyaStatement ryaStatementShouldCopy1 = createRyaStatement("bob", "catches", "ball", YESTERDAY); |
| final RyaStatement ryaStatementShouldCopy2 = createRyaStatement("bill", "talks to", "john", YESTERDAY); |
| final RyaStatement ryaStatementShouldCopy3 = createRyaStatement("susan", "eats", "burgers", TODAY); |
| final RyaStatement ryaStatementShouldCopy4 = createRyaStatement("ronnie", "plays", "guitar", TODAY); |
| |
| final RyaStatement ryaStatementDoesNotExist1 = createRyaStatement("nobody", "was", "here", LAST_MONTH); |
| final RyaStatement ryaStatementDoesNotExist2 = createRyaStatement("statement", "not", "found", YESTERDAY); |
| final RyaStatement ryaStatementDoesNotExist3 = createRyaStatement("key", "does not", "exist", TODAY); |
| |
| // This statement was modified by the child to change the column visibility. |
| // The parent should combine the child's visibility with its visibility. |
| final RyaStatement ryaStatementVisibilityDifferent = createRyaStatement("I", "see", "you", YESTERDAY); |
| ryaStatementVisibilityDifferent.setColumnVisibility(PARENT_COLUMN_VISIBILITY.getExpression()); |
| |
| // Setup initial parent instance with 7 rows |
| // This is the state of the parent data (as it is today) before merging occurs which will use the specified start time of yesterday. |
| parentDao.add(ryaStatementOutOfTimeRange); // Process should NOT copy statement |
| parentDao.add(ryaStatementShouldCopy1); // Process should copy statement |
| parentDao.add(ryaStatementShouldCopy2); // Process should copy statement |
| parentDao.add(ryaStatementShouldCopy3); // Process should copy statement |
| parentDao.add(ryaStatementShouldCopy4); // Process should copy statement |
| parentDao.add(ryaStatementVisibilityDifferent); // Process should copy and update statement |
| |
| |
| AccumuloRyaUtils.printTable(PARENT_TABLE_PREFIX + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, parentConfig); |
| //AccumuloRyaUtils.printTable(CHILD_TABLE_PREFIX + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, childConfig); |
| |
| log.info("Starting copy tool. Copying all data after the specified start time: " + YESTERDAY); |
| |
| isImporting = false; |
| copyToolRun(YESTERDAY); |
| |
| |
| // Copy Tool made child instance so hook the tables and dao into the driver. |
| final String childUser = accumuloDualInstanceDriver.getChildUser(); |
| final Connector childConnector = ConfigUtils.getConnector(childConfig); |
| accumuloDualInstanceDriver.getChildAccumuloInstanceDriver().setConnector(childConnector); |
| |
| accumuloDualInstanceDriver.getChildAccumuloInstanceDriver().setUpTables(); |
| |
| accumuloDualInstanceDriver.getChildAccumuloInstanceDriver().setUpDao(); |
| childDao = accumuloDualInstanceDriver.getChildDao(); |
| |
| |
| // Update child config to include changes made from copy process |
| final SecurityOperations childSecOps = accumuloDualInstanceDriver.getChildSecOps(); |
| Authorizations newChildAuths = AccumuloRyaUtils.addUserAuths(childUser, childSecOps, PARENT_AUTH); |
| childSecOps.changeUserAuthorizations(childUser, newChildAuths); |
| final String childAuthString = newChildAuths.toString(); |
| final List<String> duplicateKeys = MergeTool.DUPLICATE_KEY_MAP.get(MRUtils.AC_AUTH_PROP); |
| childConfig.set(MRUtils.AC_AUTH_PROP, childAuthString); |
| for (final String key : duplicateKeys) { |
| childConfig.set(key, childAuthString); |
| } |
| AccumuloRyaUtils.printTablePretty(CHILD_TABLE_PREFIX + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX, childConfig); |
| AccumuloRyaUtils.printTablePretty(CHILD_TABLE_PREFIX + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX, childConfig); |
| AccumuloRyaUtils.printTablePretty(CHILD_TABLE_PREFIX + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, childConfig); |
| |
| final Scanner scanner = AccumuloRyaUtils.getScanner(CHILD_TABLE_PREFIX + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, childConfig); |
| final Iterator<Entry<Key, Value>> iterator = scanner.iterator(); |
| int count = 0; |
| while (iterator.hasNext()) { |
| iterator.next(); |
| count++; |
| } |
| // Make sure we have all of them in the parent. |
| assertEquals(5, count); |
| |
| |
| assertStatementInChild("Child included statement that was out of time range", 0, ryaStatementOutOfTimeRange); |
| |
| assertStatementInChild("Child missing statement 1 that was in parent", 1, ryaStatementShouldCopy1); |
| |
| assertStatementInChild("Child missing statement 2 that was in parent", 1, ryaStatementShouldCopy2); |
| |
| assertStatementInChild("Child missing statement 3 that was in parent", 1, ryaStatementShouldCopy3); |
| |
| assertStatementInChild("Child missing statement 4 that was in parent", 1, ryaStatementShouldCopy4); |
| |
| assertStatementInChild("Child included statement 1 that was not in parent", 0, ryaStatementDoesNotExist1); |
| |
| assertStatementInChild("Child included statement 2 that was not in parent", 0, ryaStatementDoesNotExist2); |
| |
| assertStatementInChild("Child included statement 3 that was not in parent", 0, ryaStatementDoesNotExist3); |
| |
| // Check that it can be queried with child's visibility |
| assertStatementInChild("Child missing statement with child visibility", 1, ryaStatementVisibilityDifferent); |
| |
| // Check that it can be queried with parent's visibility |
| childConfig.set(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, PARENT_AUTH); |
| final SecurityOperations secOps = IS_MOCK ? accumuloDualInstanceDriver.getChildSecOps() : childSecOps; |
| newChildAuths = AccumuloRyaUtils.addUserAuths(accumuloDualInstanceDriver.getChildUser(), secOps, PARENT_AUTH); |
| secOps.changeUserAuthorizations(accumuloDualInstanceDriver.getChildUser(), newChildAuths); |
| assertStatementInChild("Child missing statement with parent visibility", 1, ryaStatementVisibilityDifferent); |
| |
| // Check that it can NOT be queried with some other visibility |
| childConfig.set(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, "bad_auth"); |
| final CloseableIteration<RyaStatement, RyaDAOException> iter = childDao.getQueryEngine().query(ryaStatementVisibilityDifferent, childConfig); |
| count = 0; |
| try { |
| while (iter.hasNext()) { |
| iter.next(); |
| count++; |
| } |
| } catch (final Exception e) { |
| // Expected |
| if (!(e.getCause() instanceof AccumuloSecurityException)) { |
| fail(); |
| } |
| } |
| iter.close(); |
| assertEquals(0, count); |
| |
| // reset auth |
| childConfig.set(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, childAuthString); |
| |
| log.info("DONE"); |
| } |
| |
| @Test |
| public void testImportDirectoryTool() throws Exception { |
| log.info(""); |
| log.info("Setting up initial state of parent before importing directory to child..."); |
| log.info("Adding data to parent..."); |
| |
| log.info("Starting import directory tool. Importing all data after the specified start time: " + YESTERDAY); |
| log.info(""); |
| |
| isImporting = true; |
| copyToolRun(YESTERDAY); |
| |
| |
| // Import Directory Tool made child instance so hook the tables and dao into the driver. |
| final String childUser = accumuloDualInstanceDriver.getChildUser(); |
| final Connector childConnector = ConfigUtils.getConnector(childConfig); |
| accumuloDualInstanceDriver.getChildAccumuloInstanceDriver().setConnector(childConnector); |
| |
| accumuloDualInstanceDriver.getChildAccumuloInstanceDriver().setUpTables(); |
| |
| accumuloDualInstanceDriver.getChildAccumuloInstanceDriver().setUpDao(); |
| |
| |
| // Update child config to include changes made from import directory process |
| final SecurityOperations childSecOps = accumuloDualInstanceDriver.getChildSecOps(); |
| final Authorizations newChildAuths = AccumuloRyaUtils.addUserAuths(childUser, childSecOps, PARENT_AUTH); |
| childSecOps.changeUserAuthorizations(childUser, newChildAuths); |
| final String childAuthString = newChildAuths.toString(); |
| final List<String> duplicateKeys = MergeTool.DUPLICATE_KEY_MAP.get(MRUtils.AC_AUTH_PROP); |
| childConfig.set(MRUtils.AC_AUTH_PROP, childAuthString); |
| for (final String key : duplicateKeys) { |
| childConfig.set(key, childAuthString); |
| } |
| |
| |
| //AccumuloRyaUtils.printTablePretty(CHILD_TABLE_PREFIX + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX, childConfig); |
| //AccumuloRyaUtils.printTablePretty(CHILD_TABLE_PREFIX + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX, childConfig); |
| AccumuloRyaUtils.printTablePretty(CHILD_TABLE_PREFIX + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, childConfig); |
| |
| final Scanner scanner = AccumuloRyaUtils.getScanner(CHILD_TABLE_PREFIX + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, childConfig); |
| final Iterator<Entry<Key, Value>> iterator = scanner.iterator(); |
| int count = 0; |
| while (iterator.hasNext()) { |
| iterator.next(); |
| count++; |
| } |
| log.info(""); |
| log.info("Total rows imported: " + count); |
| log.info(""); |
| |
| assertEquals(20, count); |
| |
| log.info("DONE"); |
| } |
| } |