blob: beb9fd61857e917de0b531d709f910e2f13f0f60 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test.functional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map.Entry;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.fate.util.UtilWaitThread;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
public class ScannerContextIT extends AccumuloClusterHarness {
private static final String CONTEXT = ScannerContextIT.class.getSimpleName();
@SuppressWarnings("removal")
private static final Property VFS_CONTEXT_CLASSPATH_PROPERTY =
Property.VFS_CONTEXT_CLASSPATH_PROPERTY;
private static final String CONTEXT_PROPERTY = VFS_CONTEXT_CLASSPATH_PROPERTY + CONTEXT;
private static final String CONTEXT_DIR = "file://" + System.getProperty("user.dir") + "/target";
private static final String CONTEXT_CLASSPATH = CONTEXT_DIR + "/Test.jar";
private static int ITERATIONS = 10;
private static final long WAIT = 7000;
private FileSystem fs;
@Override
protected int defaultTimeoutSeconds() {
return 2 * 60;
}
@Before
public void checkCluster() throws Exception {
Assume.assumeTrue(getClusterType() == ClusterType.MINI);
MiniAccumuloClusterImpl.class.cast(getCluster());
fs = FileSystem.get(cluster.getServerContext().getHadoopConf());
}
private Path copyTestIteratorsJarToTmp() throws IOException {
// Copy the test iterators jar to tmp
Path baseDir = new Path(System.getProperty("user.dir"));
Path targetDir = new Path(baseDir, "target");
Path jarPath = new Path(targetDir, "TestJar-Iterators.jar");
Path dstPath = new Path(CONTEXT_DIR + "/Test.jar");
fs.copyFromLocalFile(jarPath, dstPath);
// Sleep to ensure jar change gets picked up
UtilWaitThread.sleep(WAIT);
return dstPath;
}
@Test
public void test() throws Exception {
Path dstPath = copyTestIteratorsJarToTmp();
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
// Set the classloader context property on the table to point to the test iterators jar file.
c.instanceOperations().setProperty(CONTEXT_PROPERTY, CONTEXT_CLASSPATH);
// Insert rows with the word "Test" in the value.
String tableName = getUniqueNames(1)[0];
c.tableOperations().create(tableName);
try (BatchWriter bw = c.createBatchWriter(tableName)) {
for (int i = 0; i < ITERATIONS; i++) {
Mutation m = new Mutation("row" + i);
m.put("cf", "col1", "Test");
bw.addMutation(m);
}
}
// Ensure that we can get the data back
scanCheck(c, tableName, null, null, "Test");
batchCheck(c, tableName, null, null, "Test");
// This iterator is in the test iterators jar file
IteratorSetting cfg = new IteratorSetting(21, "reverse",
"org.apache.accumulo.test.functional.ValueReversingIterator");
// Check that ValueReversingIterator is not already on the classpath by not setting the
// context. This should fail.
try {
scanCheck(c, tableName, cfg, null, "tseT");
fail("This should have failed because context was not set");
} catch (Exception e) {
// Do nothing, this should fail as the classloader context is not set.
}
try {
batchCheck(c, tableName, cfg, null, "tseT");
fail("This should have failed because context was not set");
} catch (Exception e) {
// Do nothing, this should fail as the classloader context is not set.
}
// Ensure that the value is reversed using the iterator config and classloader context
scanCheck(c, tableName, cfg, CONTEXT, "tseT");
batchCheck(c, tableName, cfg, CONTEXT, "tseT");
} finally {
// Delete file in tmp
fs.delete(dstPath, true);
}
}
@Test
public void testScanContextOverridesTableContext() throws Exception {
Path dstPath = copyTestIteratorsJarToTmp();
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
// Create two contexts FOO and ScanContextIT. The FOO context will point to a classpath
// that contains nothing. The ScanContextIT context will point to the test iterators jar
String tableContext = "FOO";
String tableContextProperty = VFS_CONTEXT_CLASSPATH_PROPERTY + tableContext;
String tableContextDir = "file://" + System.getProperty("user.dir") + "/target";
String tableContextClasspath = tableContextDir + "/TestFoo.jar";
// Define both contexts
c.instanceOperations().setProperty(tableContextProperty, tableContextClasspath);
c.instanceOperations().setProperty(CONTEXT_PROPERTY, CONTEXT_CLASSPATH);
String tableName = getUniqueNames(1)[0];
c.tableOperations().create(tableName);
// Set the FOO context on the table
c.tableOperations().setProperty(tableName, Property.TABLE_CLASSLOADER_CONTEXT.getKey(),
tableContext);
try (BatchWriter bw = c.createBatchWriter(tableName)) {
for (int i = 0; i < ITERATIONS; i++) {
Mutation m = new Mutation("row" + i);
m.put("cf", "col1", "Test");
bw.addMutation(m);
}
}
scanCheck(c, tableName, null, null, "Test");
batchCheck(c, tableName, null, null, "Test");
// This iterator is in the test iterators jar file
IteratorSetting cfg = new IteratorSetting(21, "reverse",
"org.apache.accumulo.test.functional.ValueReversingIterator");
// Check that ValueReversingIterator is not already on the classpath by not setting the
// context. This should fail.
try {
scanCheck(c, tableName, cfg, null, "tseT");
fail("This should have failed because context was not set");
} catch (Exception e) {
// Do nothing, this should fail as the classloader context is not set.
}
try {
batchCheck(c, tableName, cfg, null, "tseT");
fail("This should have failed because context was not set");
} catch (Exception e) {
// Do nothing, this should fail as the classloader context is not set.
}
// Ensure that the value is reversed using the iterator config and classloader context
scanCheck(c, tableName, cfg, CONTEXT, "tseT");
batchCheck(c, tableName, cfg, CONTEXT, "tseT");
} finally {
// Delete file in tmp
fs.delete(dstPath, true);
}
}
@Test
public void testOneScannerDoesntInterfereWithAnother() throws Exception {
Path dstPath = copyTestIteratorsJarToTmp();
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
// Set the classloader context property on the table to point to the test iterators jar file.
c.instanceOperations().setProperty(CONTEXT_PROPERTY, CONTEXT_CLASSPATH);
// Insert rows with the word "Test" in the value.
String tableName = getUniqueNames(1)[0];
c.tableOperations().create(tableName);
try (BatchWriter bw = c.createBatchWriter(tableName)) {
for (int i = 0; i < ITERATIONS; i++) {
Mutation m = new Mutation("row" + i);
m.put("cf", "col1", "Test");
bw.addMutation(m);
}
}
try (Scanner one = c.createScanner(tableName, Authorizations.EMPTY);
Scanner two = c.createScanner(tableName, Authorizations.EMPTY)) {
IteratorSetting cfg = new IteratorSetting(21, "reverse",
"org.apache.accumulo.test.functional.ValueReversingIterator");
one.addScanIterator(cfg);
one.setClassLoaderContext(CONTEXT);
Iterator<Entry<Key,Value>> iterator = one.iterator();
for (int i = 0; i < ITERATIONS; i++) {
assertTrue(iterator.hasNext());
Entry<Key,Value> next = iterator.next();
assertEquals("tseT", next.getValue().toString());
}
Iterator<Entry<Key,Value>> iterator2 = two.iterator();
for (int i = 0; i < ITERATIONS; i++) {
assertTrue(iterator2.hasNext());
Entry<Key,Value> next = iterator2.next();
assertEquals("Test", next.getValue().toString());
}
}
} finally {
// Delete file in tmp
fs.delete(dstPath, true);
}
}
@Test
public void testClearContext() throws Exception {
Path dstPath = copyTestIteratorsJarToTmp();
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
// Set the classloader context property on the table to point to the test iterators jar file.
c.instanceOperations().setProperty(CONTEXT_PROPERTY, CONTEXT_CLASSPATH);
// Insert rows with the word "Test" in the value.
String tableName = getUniqueNames(1)[0];
c.tableOperations().create(tableName);
try (BatchWriter bw = c.createBatchWriter(tableName)) {
for (int i = 0; i < ITERATIONS; i++) {
Mutation m = new Mutation("row" + i);
m.put("cf", "col1", "Test");
bw.addMutation(m);
}
}
try (Scanner one = c.createScanner(tableName, Authorizations.EMPTY)) {
IteratorSetting cfg = new IteratorSetting(21, "reverse",
"org.apache.accumulo.test.functional.ValueReversingIterator");
one.addScanIterator(cfg);
one.setClassLoaderContext(CONTEXT);
Iterator<Entry<Key,Value>> iterator = one.iterator();
for (int i = 0; i < ITERATIONS; i++) {
assertTrue(iterator.hasNext());
Entry<Key,Value> next = iterator.next();
assertEquals("tseT", next.getValue().toString());
}
one.removeScanIterator("reverse");
one.clearClassLoaderContext();
iterator = one.iterator();
for (int i = 0; i < ITERATIONS; i++) {
assertTrue(iterator.hasNext());
Entry<Key,Value> next = iterator.next();
assertEquals("Test", next.getValue().toString());
}
}
} finally {
// Delete file in tmp
fs.delete(dstPath, true);
}
}
private void scanCheck(AccumuloClient c, String tableName, IteratorSetting cfg, String context,
String expected) throws Exception {
try (Scanner bs = c.createScanner(tableName, Authorizations.EMPTY)) {
if (context != null) {
bs.setClassLoaderContext(context);
}
if (cfg != null) {
bs.addScanIterator(cfg);
}
Iterator<Entry<Key,Value>> iterator = bs.iterator();
for (int i = 0; i < ITERATIONS; i++) {
assertTrue(iterator.hasNext());
Entry<Key,Value> next = iterator.next();
assertEquals(expected, next.getValue().toString());
}
assertFalse(iterator.hasNext());
}
}
private void batchCheck(AccumuloClient c, String tableName, IteratorSetting cfg, String context,
String expected) throws Exception {
try (BatchScanner bs = c.createBatchScanner(tableName)) {
bs.setRanges(Collections.singleton(new Range()));
if (context != null) {
bs.setClassLoaderContext(context);
}
if (cfg != null) {
bs.addScanIterator(cfg);
}
Iterator<Entry<Key,Value>> iterator = bs.iterator();
for (int i = 0; i < ITERATIONS; i++) {
assertTrue(iterator.hasNext());
Entry<Key,Value> next = iterator.next();
assertEquals(expected, next.getValue().toString());
}
assertFalse(iterator.hasNext());
}
}
}