blob: 0208a5276b4949d0aeac8da56684c0655f7bef2f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test;
import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.cluster.ClusterUser;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.ConditionalWriter.Result;
import org.apache.accumulo.core.client.ConditionalWriter.Status;
import org.apache.accumulo.core.client.ConditionalWriterConfig;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.RowIterator;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableDeletedException;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Condition;
import org.apache.accumulo.core.data.ConditionalMutation;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.LongCombiner.Type;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.WrappingIterator;
import org.apache.accumulo.core.iterators.user.SummingCombiner;
import org.apache.accumulo.core.iterators.user.VersioningIterator;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.core.security.SystemPermission;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.FastFormat;
import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.constraints.AlphaNumKeyConstraint;
import org.apache.accumulo.test.functional.BadIterator;
import org.apache.accumulo.test.functional.SlowIterator;
import org.apache.accumulo.tracer.TraceDump;
import org.apache.accumulo.tracer.TraceServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.htrace.Sampler;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Iterables;
public class ConditionalWriterIT extends SharedMiniClusterBase {
private static final Logger log = LoggerFactory.getLogger(ConditionalWriterIT.class);
@Override
protected int defaultTimeoutSeconds() {
return 120;
}
@BeforeClass
public static void setup() throws Exception {
SharedMiniClusterBase.startMiniClusterWithConfig(new Callback());
}
@AfterClass
public static void teardown() {
SharedMiniClusterBase.stopMiniCluster();
}
private static class Callback implements MiniClusterConfigurationCallback {
@Override
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
// Set the min span to 0 so we will definitely get all the traces back. See ACCUMULO-4365
Map<String,String> siteConf = cfg.getSiteConfig();
siteConf.put(Property.TRACE_SPAN_RECEIVER_PREFIX.getKey() + "tracer.span.min.ms", "0");
cfg.setSiteConfig(siteConf);
}
}
public static long abs(long l) {
l = Math.abs(l); // abs(Long.MIN_VALUE) == Long.MIN_VALUE...
if (l < 0)
return 0;
return l;
}
@Before
public void deleteUsers() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
Set<String> users = client.securityOperations().listLocalUsers();
ClusterUser user = getUser(0);
if (users.contains(user.getPrincipal())) {
client.securityOperations().dropLocalUser(user.getPrincipal());
}
}
}
@Test
public void testBasic() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
String tableName = getUniqueNames(1)[0];
client.tableOperations().create(tableName);
try (
ConditionalWriter cw =
client.createConditionalWriter(tableName, new ConditionalWriterConfig());
Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) {
// mutation conditional on column tx:seq not existing
ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq"));
cm0.put("name", "last", "doe");
cm0.put("name", "first", "john");
cm0.put("tx", "seq", "1");
assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus());
assertEquals(Status.REJECTED, cw.write(cm0).getStatus());
// mutation conditional on column tx:seq being 1
ConditionalMutation cm1 =
new ConditionalMutation("99006", new Condition("tx", "seq").setValue("1"));
cm1.put("name", "last", "Doe");
cm1.put("tx", "seq", "2");
assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus());
// test condition where value differs
ConditionalMutation cm2 =
new ConditionalMutation("99006", new Condition("tx", "seq").setValue("1"));
cm2.put("name", "last", "DOE");
cm2.put("tx", "seq", "2");
assertEquals(Status.REJECTED, cw.write(cm2).getStatus());
// test condition where column does not exists
ConditionalMutation cm3 =
new ConditionalMutation("99006", new Condition("txtypo", "seq").setValue("1"));
cm3.put("name", "last", "deo");
cm3.put("tx", "seq", "2");
assertEquals(Status.REJECTED, cw.write(cm3).getStatus());
// test two conditions, where one should fail
ConditionalMutation cm4 =
new ConditionalMutation("99006", new Condition("tx", "seq").setValue("2"),
new Condition("name", "last").setValue("doe"));
cm4.put("name", "last", "deo");
cm4.put("tx", "seq", "3");
assertEquals(Status.REJECTED, cw.write(cm4).getStatus());
// test two conditions, where one should fail
ConditionalMutation cm5 =
new ConditionalMutation("99006", new Condition("tx", "seq").setValue("1"),
new Condition("name", "last").setValue("Doe"));
cm5.put("name", "last", "deo");
cm5.put("tx", "seq", "3");
assertEquals(Status.REJECTED, cw.write(cm5).getStatus());
// ensure rejected mutations did not write
scanner.fetchColumn("name", "last");
scanner.setRange(new Range("99006"));
Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
assertEquals("Doe", entry.getValue().toString());
// test w/ two conditions that are met
ConditionalMutation cm6 =
new ConditionalMutation("99006", new Condition("tx", "seq").setValue("2"),
new Condition("name", "last").setValue("Doe"));
cm6.put("name", "last", "DOE");
cm6.put("tx", "seq", "3");
assertEquals(Status.ACCEPTED, cw.write(cm6).getStatus());
entry = Iterables.getOnlyElement(scanner);
assertEquals("DOE", entry.getValue().toString());
// test a conditional mutation that deletes
ConditionalMutation cm7 =
new ConditionalMutation("99006", new Condition("tx", "seq").setValue("3"));
cm7.putDelete("name", "last");
cm7.putDelete("name", "first");
cm7.putDelete("tx", "seq");
assertEquals(Status.ACCEPTED, cw.write(cm7).getStatus());
assertFalse("Did not expect to find any results", scanner.iterator().hasNext());
// add the row back
assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus());
assertEquals(Status.REJECTED, cw.write(cm0).getStatus());
entry = Iterables.getOnlyElement(scanner);
assertEquals("doe", entry.getValue().toString());
}
}
}
@Test
public void testFields() throws Exception {
try (AccumuloClient client1 = Accumulo.newClient().from(getClientProps()).build()) {
String tableName = getUniqueNames(1)[0];
String user = null;
ClusterUser user1 = getUser(0);
user = user1.getPrincipal();
if (saslEnabled()) {
// The token is pointless for kerberos
client1.securityOperations().createLocalUser(user, null);
} else {
client1.securityOperations().createLocalUser(user, new PasswordToken(user1.getPassword()));
}
Authorizations auths = new Authorizations("A", "B");
client1.securityOperations().changeUserAuthorizations(user, auths);
client1.securityOperations().grantSystemPermission(user, SystemPermission.CREATE_TABLE);
try (AccumuloClient client2 =
Accumulo.newClient().from(client1.properties()).as(user, user1.getToken()).build()) {
client2.tableOperations().create(tableName);
try (
ConditionalWriter cw = client2.createConditionalWriter(tableName,
new ConditionalWriterConfig().setAuthorizations(auths));
Scanner scanner = client2.createScanner(tableName, auths)) {
ColumnVisibility cva = new ColumnVisibility("A");
ColumnVisibility cvb = new ColumnVisibility("B");
ConditionalMutation cm0 =
new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cva));
cm0.put("name", "last", cva, "doe");
cm0.put("name", "first", cva, "john");
cm0.put("tx", "seq", cva, "1");
assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus());
scanner.setRange(new Range("99006"));
// TODO verify all columns
scanner.fetchColumn("tx", "seq");
Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
assertEquals("1", entry.getValue().toString());
long ts = entry.getKey().getTimestamp();
// test wrong colf
ConditionalMutation cm1 = new ConditionalMutation("99006",
new Condition("txA", "seq").setVisibility(cva).setValue("1"));
cm1.put("name", "last", cva, "Doe");
cm1.put("name", "first", cva, "John");
cm1.put("tx", "seq", cva, "2");
assertEquals(Status.REJECTED, cw.write(cm1).getStatus());
// test wrong colq
ConditionalMutation cm2 = new ConditionalMutation("99006",
new Condition("tx", "seqA").setVisibility(cva).setValue("1"));
cm2.put("name", "last", cva, "Doe");
cm2.put("name", "first", cva, "John");
cm2.put("tx", "seq", cva, "2");
assertEquals(Status.REJECTED, cw.write(cm2).getStatus());
// test wrong colv
ConditionalMutation cm3 = new ConditionalMutation("99006",
new Condition("tx", "seq").setVisibility(cvb).setValue("1"));
cm3.put("name", "last", cva, "Doe");
cm3.put("name", "first", cva, "John");
cm3.put("tx", "seq", cva, "2");
assertEquals(Status.REJECTED, cw.write(cm3).getStatus());
// test wrong timestamp
ConditionalMutation cm4 = new ConditionalMutation("99006",
new Condition("tx", "seq").setVisibility(cva).setTimestamp(ts + 1).setValue("1"));
cm4.put("name", "last", cva, "Doe");
cm4.put("name", "first", cva, "John");
cm4.put("tx", "seq", cva, "2");
assertEquals(Status.REJECTED, cw.write(cm4).getStatus());
// test wrong timestamp
ConditionalMutation cm5 = new ConditionalMutation("99006",
new Condition("tx", "seq").setVisibility(cva).setTimestamp(ts - 1).setValue("1"));
cm5.put("name", "last", cva, "Doe");
cm5.put("name", "first", cva, "John");
cm5.put("tx", "seq", cva, "2");
assertEquals(Status.REJECTED, cw.write(cm5).getStatus());
// ensure no updates were made
entry = Iterables.getOnlyElement(scanner);
assertEquals("1", entry.getValue().toString());
// set all columns correctly
ConditionalMutation cm6 = new ConditionalMutation("99006",
new Condition("tx", "seq").setVisibility(cva).setTimestamp(ts).setValue("1"));
cm6.put("name", "last", cva, "Doe");
cm6.put("name", "first", cva, "John");
cm6.put("tx", "seq", cva, "2");
assertEquals(Status.ACCEPTED, cw.write(cm6).getStatus());
entry = Iterables.getOnlyElement(scanner);
assertEquals("2", entry.getValue().toString());
}
}
}
}
@Test
public void testBadColVis() throws Exception {
// test when a user sets a col vis in a condition that can never be seen
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
String tableName = getUniqueNames(1)[0];
client.tableOperations().create(tableName);
Authorizations auths = new Authorizations("A", "B");
client.securityOperations().changeUserAuthorizations(getAdminPrincipal(), auths);
Authorizations filteredAuths = new Authorizations("A");
ColumnVisibility cva = new ColumnVisibility("A");
ColumnVisibility cvb = new ColumnVisibility("B");
ColumnVisibility cvc = new ColumnVisibility("C");
try (ConditionalWriter cw = client.createConditionalWriter(tableName,
new ConditionalWriterConfig().setAuthorizations(filteredAuths))) {
// User has authorization, but didn't include it in the writer
ConditionalMutation cm0 =
new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb));
cm0.put("name", "last", cva, "doe");
cm0.put("name", "first", cva, "john");
cm0.put("tx", "seq", cva, "1");
assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm0).getStatus());
ConditionalMutation cm1 = new ConditionalMutation("99006",
new Condition("tx", "seq").setVisibility(cvb).setValue("1"));
cm1.put("name", "last", cva, "doe");
cm1.put("name", "first", cva, "john");
cm1.put("tx", "seq", cva, "1");
assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm1).getStatus());
// User does not have the authorization
ConditionalMutation cm2 =
new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvc));
cm2.put("name", "last", cva, "doe");
cm2.put("name", "first", cva, "john");
cm2.put("tx", "seq", cva, "1");
assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm2).getStatus());
ConditionalMutation cm3 = new ConditionalMutation("99006",
new Condition("tx", "seq").setVisibility(cvc).setValue("1"));
cm3.put("name", "last", cva, "doe");
cm3.put("name", "first", cva, "john");
cm3.put("tx", "seq", cva, "1");
assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm3).getStatus());
// if any visibility is bad, good visibilities don't override
ConditionalMutation cm4 =
new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb),
new Condition("tx", "seq").setVisibility(cva));
cm4.put("name", "last", cva, "doe");
cm4.put("name", "first", cva, "john");
cm4.put("tx", "seq", cva, "1");
assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm4).getStatus());
ConditionalMutation cm5 = new ConditionalMutation("99006",
new Condition("tx", "seq").setVisibility(cvb).setValue("1"),
new Condition("tx", "seq").setVisibility(cva).setValue("1"));
cm5.put("name", "last", cva, "doe");
cm5.put("name", "first", cva, "john");
cm5.put("tx", "seq", cva, "1");
assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm5).getStatus());
ConditionalMutation cm6 = new ConditionalMutation("99006",
new Condition("tx", "seq").setVisibility(cvb).setValue("1"),
new Condition("tx", "seq").setVisibility(cva));
cm6.put("name", "last", cva, "doe");
cm6.put("name", "first", cva, "john");
cm6.put("tx", "seq", cva, "1");
assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm6).getStatus());
ConditionalMutation cm7 =
new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb),
new Condition("tx", "seq").setVisibility(cva).setValue("1"));
cm7.put("name", "last", cva, "doe");
cm7.put("name", "first", cva, "john");
cm7.put("tx", "seq", cva, "1");
assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm7).getStatus());
}
// test passing auths that exceed users configured auths
Authorizations exceedingAuths = new Authorizations("A", "B", "D");
try (ConditionalWriter cw2 = client.createConditionalWriter(tableName,
new ConditionalWriterConfig().setAuthorizations(exceedingAuths))) {
ConditionalMutation cm8 =
new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb),
new Condition("tx", "seq").setVisibility(cva).setValue("1"));
cm8.put("name", "last", cva, "doe");
cm8.put("name", "first", cva, "john");
cm8.put("tx", "seq", cva, "1");
try {
Status status = cw2.write(cm8).getStatus();
fail(
"Writing mutation with Authorizations the user doesn't have should fail. Got status: "
+ status);
} catch (AccumuloSecurityException ase) {
// expected, check specific failure?
}
}
}
}
@Test
public void testConstraints() throws Exception {
// ensure constraint violations are properly reported
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
String tableName = getUniqueNames(1)[0];
client.tableOperations().create(tableName);
client.tableOperations().addConstraint(tableName, AlphaNumKeyConstraint.class.getName());
client.tableOperations().clone(tableName, tableName + "_clone", true, new HashMap<>(),
new HashSet<>());
try (
ConditionalWriter cw =
client.createConditionalWriter(tableName + "_clone", new ConditionalWriterConfig());
Scanner scanner = client.createScanner(tableName + "_clone", new Authorizations())) {
ConditionalMutation cm0 = new ConditionalMutation("99006+", new Condition("tx", "seq"));
cm0.put("tx", "seq", "1");
assertEquals(Status.VIOLATED, cw.write(cm0).getStatus());
assertFalse("Should find no results in the table is mutation result was violated",
scanner.iterator().hasNext());
ConditionalMutation cm1 = new ConditionalMutation("99006", new Condition("tx", "seq"));
cm1.put("tx", "seq", "1");
assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus());
assertTrue("Accepted result should be returned when reading table",
scanner.iterator().hasNext());
}
}
}
@Test
public void testIterators() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
String tableName = getUniqueNames(1)[0];
client.tableOperations().create(tableName,
new NewTableConfiguration().withoutDefaultIterators());
try (BatchWriter bw = client.createBatchWriter(tableName)) {
Mutation m = new Mutation("ACCUMULO-1000");
m.put("count", "comments", "1");
bw.addMutation(m);
bw.addMutation(m);
bw.addMutation(m);
m = new Mutation("ACCUMULO-1001");
m.put("count2", "comments", "1");
bw.addMutation(m);
bw.addMutation(m);
m = new Mutation("ACCUMULO-1002");
m.put("count2", "comments", "1");
bw.addMutation(m);
bw.addMutation(m);
}
IteratorSetting iterConfig = new IteratorSetting(10, SummingCombiner.class);
SummingCombiner.setEncodingType(iterConfig, Type.STRING);
SummingCombiner.setColumns(iterConfig,
Collections.singletonList(new IteratorSetting.Column("count")));
IteratorSetting iterConfig2 = new IteratorSetting(10, SummingCombiner.class);
SummingCombiner.setEncodingType(iterConfig2, Type.STRING);
SummingCombiner.setColumns(iterConfig2,
Collections.singletonList(new IteratorSetting.Column("count2", "comments")));
IteratorSetting iterConfig3 = new IteratorSetting(5, VersioningIterator.class);
VersioningIterator.setMaxVersions(iterConfig3, 1);
try (Scanner scanner = client.createScanner(tableName, new Authorizations())) {
scanner.addScanIterator(iterConfig);
scanner.setRange(new Range("ACCUMULO-1000"));
scanner.fetchColumn("count", "comments");
Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
assertEquals("3", entry.getValue().toString());
try (ConditionalWriter cw =
client.createConditionalWriter(tableName, new ConditionalWriterConfig())) {
ConditionalMutation cm0 = new ConditionalMutation("ACCUMULO-1000",
new Condition("count", "comments").setValue("3"));
cm0.put("count", "comments", "1");
assertEquals(Status.REJECTED, cw.write(cm0).getStatus());
entry = Iterables.getOnlyElement(scanner);
assertEquals("3", entry.getValue().toString());
ConditionalMutation cm1 = new ConditionalMutation("ACCUMULO-1000",
new Condition("count", "comments").setIterators(iterConfig).setValue("3"));
cm1.put("count", "comments", "1");
assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus());
entry = Iterables.getOnlyElement(scanner);
assertEquals("4", entry.getValue().toString());
ConditionalMutation cm2 = new ConditionalMutation("ACCUMULO-1000",
new Condition("count", "comments").setValue("4"));
cm2.put("count", "comments", "1");
assertEquals(Status.REJECTED, cw.write(cm1).getStatus());
entry = Iterables.getOnlyElement(scanner);
assertEquals("4", entry.getValue().toString());
// run test with multiple iterators passed in same batch and condition with two iterators
ConditionalMutation cm3 = new ConditionalMutation("ACCUMULO-1000",
new Condition("count", "comments").setIterators(iterConfig).setValue("4"));
cm3.put("count", "comments", "1");
ConditionalMutation cm4 = new ConditionalMutation("ACCUMULO-1001",
new Condition("count2", "comments").setIterators(iterConfig2).setValue("2"));
cm4.put("count2", "comments", "1");
ConditionalMutation cm5 =
new ConditionalMutation("ACCUMULO-1002", new Condition("count2", "comments")
.setIterators(iterConfig2, iterConfig3).setValue("2"));
cm5.put("count2", "comments", "1");
Iterator<Result> results = cw.write(Arrays.asList(cm3, cm4, cm5).iterator());
Map<String,Status> actual = new HashMap<>();
while (results.hasNext()) {
Result result = results.next();
String k = new String(result.getMutation().getRow());
assertFalse("Did not expect to see multiple resultus for the row: " + k,
actual.containsKey(k));
actual.put(k, result.getStatus());
}
Map<String,Status> expected = new HashMap<>();
expected.put("ACCUMULO-1000", Status.ACCEPTED);
expected.put("ACCUMULO-1001", Status.ACCEPTED);
expected.put("ACCUMULO-1002", Status.REJECTED);
assertEquals(expected, actual);
}
}
}
}
public static class AddingIterator extends WrappingIterator {
long amount = 0;
@Override
public Value getTopValue() {
Value val = super.getTopValue();
long l = Long.parseLong(val.toString());
String newVal = (l + amount) + "";
return new Value(newVal);
}
@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
IteratorEnvironment env) {
this.setSource(source);
amount = Long.parseLong(options.get("amount"));
}
}
public static class MultiplyingIterator extends WrappingIterator {
long amount = 0;
@Override
public Value getTopValue() {
Value val = super.getTopValue();
long l = Long.parseLong(val.toString());
String newVal = l * amount + "";
return new Value(newVal);
}
@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
IteratorEnvironment env) {
this.setSource(source);
amount = Long.parseLong(options.get("amount"));
}
}
@Test
public void testTableAndConditionIterators() throws Exception {
// test w/ table that has iterators configured
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
String tableName = getUniqueNames(1)[0];
IteratorSetting aiConfig1 = new IteratorSetting(30, "AI1", AddingIterator.class);
aiConfig1.addOption("amount", "2");
IteratorSetting aiConfig2 = new IteratorSetting(35, "MI1", MultiplyingIterator.class);
aiConfig2.addOption("amount", "3");
IteratorSetting aiConfig3 = new IteratorSetting(40, "AI2", AddingIterator.class);
aiConfig3.addOption("amount", "5");
client.tableOperations().create(tableName);
try (BatchWriter bw = client.createBatchWriter(tableName)) {
Mutation m = new Mutation("ACCUMULO-1000");
m.put("count", "comments", "6");
bw.addMutation(m);
m = new Mutation("ACCUMULO-1001");
m.put("count", "comments", "7");
bw.addMutation(m);
m = new Mutation("ACCUMULO-1002");
m.put("count", "comments", "8");
bw.addMutation(m);
}
client.tableOperations().attachIterator(tableName, aiConfig1, EnumSet.of(IteratorScope.scan));
client.tableOperations().offline(tableName, true);
client.tableOperations().online(tableName, true);
try (
ConditionalWriter cw =
client.createConditionalWriter(tableName, new ConditionalWriterConfig());
Scanner scanner = client.createScanner(tableName, new Authorizations())) {
ConditionalMutation cm6 = new ConditionalMutation("ACCUMULO-1000",
new Condition("count", "comments").setValue("8"));
cm6.put("count", "comments", "7");
assertEquals(Status.ACCEPTED, cw.write(cm6).getStatus());
scanner.setRange(new Range("ACCUMULO-1000"));
scanner.fetchColumn("count", "comments");
Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
assertEquals("9", entry.getValue().toString());
ConditionalMutation cm7 = new ConditionalMutation("ACCUMULO-1000",
new Condition("count", "comments").setIterators(aiConfig2).setValue("27"));
cm7.put("count", "comments", "8");
assertEquals(Status.ACCEPTED, cw.write(cm7).getStatus());
entry = Iterables.getOnlyElement(scanner);
assertEquals("10", entry.getValue().toString());
ConditionalMutation cm8 = new ConditionalMutation("ACCUMULO-1000",
new Condition("count", "comments").setIterators(aiConfig2, aiConfig3).setValue("35"));
cm8.put("count", "comments", "9");
assertEquals(Status.ACCEPTED, cw.write(cm8).getStatus());
entry = Iterables.getOnlyElement(scanner);
assertEquals("11", entry.getValue().toString());
ConditionalMutation cm3 = new ConditionalMutation("ACCUMULO-1000",
new Condition("count", "comments").setIterators(aiConfig2).setValue("33"));
cm3.put("count", "comments", "3");
ConditionalMutation cm4 = new ConditionalMutation("ACCUMULO-1001",
new Condition("count", "comments").setIterators(aiConfig3).setValue("14"));
cm4.put("count", "comments", "3");
ConditionalMutation cm5 = new ConditionalMutation("ACCUMULO-1002",
new Condition("count", "comments").setIterators(aiConfig3).setValue("10"));
cm5.put("count", "comments", "3");
Iterator<Result> results = cw.write(Arrays.asList(cm3, cm4, cm5).iterator());
Map<String,Status> actual = new HashMap<>();
while (results.hasNext()) {
Result result = results.next();
String k = new String(result.getMutation().getRow());
assertFalse("Did not expect to see multiple resultus for the row: " + k,
actual.containsKey(k));
actual.put(k, result.getStatus());
}
Map<String,Status> expected = new HashMap<>();
expected.put("ACCUMULO-1000", Status.ACCEPTED);
expected.put("ACCUMULO-1001", Status.ACCEPTED);
expected.put("ACCUMULO-1002", Status.REJECTED);
assertEquals(expected, actual);
}
}
}
@Test
public void testBatch() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
String tableName = getUniqueNames(1)[0];
client.tableOperations().create(tableName);
client.securityOperations().changeUserAuthorizations(getAdminPrincipal(),
new Authorizations("A", "B"));
ColumnVisibility cvab = new ColumnVisibility("A|B");
ArrayList<ConditionalMutation> mutations = new ArrayList<>();
ConditionalMutation cm0 =
new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvab));
cm0.put("name", "last", cvab, "doe");
cm0.put("name", "first", cvab, "john");
cm0.put("tx", "seq", cvab, "1");
mutations.add(cm0);
ConditionalMutation cm1 =
new ConditionalMutation("59056", new Condition("tx", "seq").setVisibility(cvab));
cm1.put("name", "last", cvab, "doe");
cm1.put("name", "first", cvab, "jane");
cm1.put("tx", "seq", cvab, "1");
mutations.add(cm1);
ConditionalMutation cm2 =
new ConditionalMutation("19059", new Condition("tx", "seq").setVisibility(cvab));
cm2.put("name", "last", cvab, "doe");
cm2.put("name", "first", cvab, "jack");
cm2.put("tx", "seq", cvab, "1");
mutations.add(cm2);
try (
ConditionalWriter cw = client.createConditionalWriter(tableName,
new ConditionalWriterConfig().setAuthorizations(new Authorizations("A")));
Scanner scanner = client.createScanner(tableName, new Authorizations("A"))) {
Iterator<Result> results = cw.write(mutations.iterator());
int count = 0;
while (results.hasNext()) {
Result result = results.next();
assertEquals(Status.ACCEPTED, result.getStatus());
count++;
}
assertEquals(3, count);
scanner.fetchColumn("tx", "seq");
for (String row : new String[] {"99006", "59056", "19059"}) {
scanner.setRange(new Range(row));
Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
assertEquals("1", entry.getValue().toString());
}
TreeSet<Text> splits = new TreeSet<>();
splits.add(new Text("7"));
splits.add(new Text("3"));
client.tableOperations().addSplits(tableName, splits);
mutations.clear();
ConditionalMutation cm3 = new ConditionalMutation("99006",
new Condition("tx", "seq").setVisibility(cvab).setValue("1"));
cm3.put("name", "last", cvab, "Doe");
cm3.put("tx", "seq", cvab, "2");
mutations.add(cm3);
ConditionalMutation cm4 =
new ConditionalMutation("59056", new Condition("tx", "seq").setVisibility(cvab));
cm4.put("name", "last", cvab, "Doe");
cm4.put("tx", "seq", cvab, "1");
mutations.add(cm4);
ConditionalMutation cm5 = new ConditionalMutation("19059",
new Condition("tx", "seq").setVisibility(cvab).setValue("2"));
cm5.put("name", "last", cvab, "Doe");
cm5.put("tx", "seq", cvab, "3");
mutations.add(cm5);
results = cw.write(mutations.iterator());
int accepted = 0;
int rejected = 0;
while (results.hasNext()) {
Result result = results.next();
if (new String(result.getMutation().getRow()).equals("99006")) {
assertEquals(Status.ACCEPTED, result.getStatus());
accepted++;
} else {
assertEquals(Status.REJECTED, result.getStatus());
rejected++;
}
}
assertEquals("Expected only one accepted conditional mutation", 1, accepted);
assertEquals("Expected two rejected conditional mutations", 2, rejected);
for (String row : new String[] {"59056", "19059"}) {
scanner.setRange(new Range(row));
Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
assertEquals("1", entry.getValue().toString());
}
scanner.setRange(new Range("99006"));
Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
assertEquals("2", entry.getValue().toString());
scanner.clearColumns();
scanner.fetchColumn("name", "last");
entry = Iterables.getOnlyElement(scanner);
assertEquals("Doe", entry.getValue().toString());
}
}
}
@Test
public void testBigBatch() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
String tableName = getUniqueNames(1)[0];
NewTableConfiguration ntc = new NewTableConfiguration().withSplits(nss("2", "4", "6"));
client.tableOperations().create(tableName, ntc);
sleepUninterruptibly(2, TimeUnit.SECONDS);
int num = 100;
ArrayList<byte[]> rows = new ArrayList<>(num);
ArrayList<ConditionalMutation> cml = new ArrayList<>(num);
Random r = new SecureRandom();
byte[] e = new byte[0];
for (int i = 0; i < num; i++) {
rows.add(FastFormat.toZeroPaddedString(abs(r.nextLong()), 16, 16, e));
}
for (int i = 0; i < num; i++) {
ConditionalMutation cm = new ConditionalMutation(rows.get(i), new Condition("meta", "seq"));
cm.put("meta", "seq", "1");
cm.put("meta", "tx", UUID.randomUUID().toString());
cml.add(cm);
}
try (ConditionalWriter cw =
client.createConditionalWriter(tableName, new ConditionalWriterConfig())) {
Iterator<Result> results = cw.write(cml.iterator());
int count = 0;
// TODO check got each row back
while (results.hasNext()) {
Result result = results.next();
assertEquals(Status.ACCEPTED, result.getStatus());
count++;
}
assertEquals("Did not receive the expected number of results", num, count);
ArrayList<ConditionalMutation> cml2 = new ArrayList<>(num);
for (int i = 0; i < num; i++) {
ConditionalMutation cm =
new ConditionalMutation(rows.get(i), new Condition("meta", "seq").setValue("1"));
cm.put("meta", "seq", "2");
cm.put("meta", "tx", UUID.randomUUID().toString());
cml2.add(cm);
}
count = 0;
results = cw.write(cml2.iterator());
while (results.hasNext()) {
Result result = results.next();
assertEquals(Status.ACCEPTED, result.getStatus());
count++;
}
assertEquals("Did not receive the expected number of results", num, count);
}
}
}
@Test
public void testBatchErrors() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
String tableName = getUniqueNames(1)[0];
client.tableOperations().create(tableName);
client.tableOperations().addConstraint(tableName, AlphaNumKeyConstraint.class.getName());
client.tableOperations().clone(tableName, tableName + "_clone", true, new HashMap<>(),
new HashSet<>());
client.securityOperations().changeUserAuthorizations(getAdminPrincipal(),
new Authorizations("A", "B"));
ColumnVisibility cvaob = new ColumnVisibility("A|B");
ColumnVisibility cvaab = new ColumnVisibility("A&B");
switch ((new SecureRandom()).nextInt(3)) {
case 1:
client.tableOperations().addSplits(tableName, nss("6"));
break;
case 2:
client.tableOperations().addSplits(tableName, nss("2", "95"));
break;
}
ArrayList<ConditionalMutation> mutations = new ArrayList<>();
ConditionalMutation cm0 =
new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvaob));
cm0.put("name+", "last", cvaob, "doe");
cm0.put("name", "first", cvaob, "john");
cm0.put("tx", "seq", cvaob, "1");
mutations.add(cm0);
ConditionalMutation cm1 =
new ConditionalMutation("59056", new Condition("tx", "seq").setVisibility(cvaab));
cm1.put("name", "last", cvaab, "doe");
cm1.put("name", "first", cvaab, "jane");
cm1.put("tx", "seq", cvaab, "1");
mutations.add(cm1);
ConditionalMutation cm2 =
new ConditionalMutation("19059", new Condition("tx", "seq").setVisibility(cvaob));
cm2.put("name", "last", cvaob, "doe");
cm2.put("name", "first", cvaob, "jack");
cm2.put("tx", "seq", cvaob, "1");
mutations.add(cm2);
ConditionalMutation cm3 = new ConditionalMutation("90909",
new Condition("tx", "seq").setVisibility(cvaob).setValue("1"));
cm3.put("name", "last", cvaob, "doe");
cm3.put("name", "first", cvaob, "john");
cm3.put("tx", "seq", cvaob, "2");
mutations.add(cm3);
try (
ConditionalWriter cw = client.createConditionalWriter(tableName,
new ConditionalWriterConfig().setAuthorizations(new Authorizations("A")));
Scanner scanner = client.createScanner(tableName, new Authorizations("A"))) {
Iterator<Result> results = cw.write(mutations.iterator());
HashSet<String> rows = new HashSet<>();
while (results.hasNext()) {
Result result = results.next();
String row = new String(result.getMutation().getRow());
if (row.equals("19059")) {
assertEquals(Status.ACCEPTED, result.getStatus());
} else if (row.equals("59056")) {
assertEquals(Status.INVISIBLE_VISIBILITY, result.getStatus());
} else if (row.equals("99006")) {
assertEquals(Status.VIOLATED, result.getStatus());
} else if (row.equals("90909")) {
assertEquals(Status.REJECTED, result.getStatus());
}
rows.add(row);
}
assertEquals(4, rows.size());
scanner.fetchColumn("tx", "seq");
Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
assertEquals("1", entry.getValue().toString());
}
}
}
@Test
public void testSameRow() throws Exception {
// test multiple mutations for same row in same batch
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
String tableName = getUniqueNames(1)[0];
client.tableOperations().create(tableName);
try (ConditionalWriter cw =
client.createConditionalWriter(tableName, new ConditionalWriterConfig())) {
ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq"));
cm1.put("tx", "seq", "1");
cm1.put("data", "x", "a");
assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus());
ConditionalMutation cm2 =
new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1"));
cm2.put("tx", "seq", "2");
cm2.put("data", "x", "b");
ConditionalMutation cm3 =
new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1"));
cm3.put("tx", "seq", "2");
cm3.put("data", "x", "c");
ConditionalMutation cm4 =
new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1"));
cm4.put("tx", "seq", "2");
cm4.put("data", "x", "d");
Iterator<Result> results = cw.write(Arrays.asList(cm2, cm3, cm4).iterator());
int accepted = 0;
int rejected = 0;
int total = 0;
while (results.hasNext()) {
Status status = results.next().getStatus();
if (status == Status.ACCEPTED)
accepted++;
if (status == Status.REJECTED)
rejected++;
total++;
}
assertEquals("Expected one accepted result", 1, accepted);
assertEquals("Expected two rejected results", 2, rejected);
assertEquals("Expected three total results", 3, total);
}
}
}
private static class Stats {
ByteSequence row = null;
int seq;
long sum;
int[] data = new int[10];
public Stats(Iterator<Entry<Key,Value>> iterator) {
while (iterator.hasNext()) {
Entry<Key,Value> entry = iterator.next();
if (row == null)
row = entry.getKey().getRowData();
String cf = entry.getKey().getColumnFamilyData().toString();
String cq = entry.getKey().getColumnQualifierData().toString();
if (cf.equals("data")) {
data[Integer.parseInt(cq)] = Integer.parseInt(entry.getValue().toString());
} else if (cf.equals("meta")) {
if (cq.equals("sum")) {
sum = Long.parseLong(entry.getValue().toString());
} else if (cq.equals("seq")) {
seq = Integer.parseInt(entry.getValue().toString());
}
}
}
long sum2 = 0;
for (int datum : data) {
sum2 += datum;
}
assertEquals(sum2, sum);
}
public Stats(ByteSequence row) {
this.row = row;
for (int i = 0; i < data.length; i++) {
this.data[i] = 0;
}
this.seq = -1;
this.sum = 0;
}
void set(int index, int value) {
sum -= data[index];
sum += value;
data[index] = value;
}
ConditionalMutation toMutation() {
Condition cond = new Condition("meta", "seq");
if (seq >= 0)
cond.setValue(seq + "");
ConditionalMutation cm = new ConditionalMutation(row, cond);
cm.put("meta", "seq", (seq + 1) + "");
cm.put("meta", "sum", (sum) + "");
for (int i = 0; i < data.length; i++) {
cm.put("data", i + "", data[i] + "");
}
return cm;
}
@Override
public String toString() {
return row + " " + seq + " " + sum;
}
}
private static class MutatorTask implements Runnable {
String tableName;
ArrayList<ByteSequence> rows;
ConditionalWriter cw;
AccumuloClient client;
AtomicBoolean failed;
public MutatorTask(String tableName, AccumuloClient client, ArrayList<ByteSequence> rows,
ConditionalWriter cw, AtomicBoolean failed) {
this.tableName = tableName;
this.rows = rows;
this.client = client;
this.cw = cw;
this.failed = failed;
}
@Override
public void run() {
try (Scanner scanner =
new IsolatedScanner(client.createScanner(tableName, Authorizations.EMPTY))) {
Random rand = new SecureRandom();
for (int i = 0; i < 20; i++) {
int numRows = rand.nextInt(10) + 1;
ArrayList<ByteSequence> changes = new ArrayList<>(numRows);
ArrayList<ConditionalMutation> mutations = new ArrayList<>();
for (int j = 0; j < numRows; j++)
changes.add(rows.get(rand.nextInt(rows.size())));
for (ByteSequence row : changes) {
scanner.setRange(new Range(row.toString()));
Stats stats = new Stats(scanner.iterator());
stats.set(rand.nextInt(10), rand.nextInt(Integer.MAX_VALUE));
mutations.add(stats.toMutation());
}
ArrayList<ByteSequence> changed = new ArrayList<>(numRows);
Iterator<Result> results = cw.write(mutations.iterator());
while (results.hasNext()) {
Result result = results.next();
changed.add(new ArrayByteSequence(result.getMutation().getRow()));
}
Collections.sort(changes);
Collections.sort(changed);
assertEquals(changes, changed);
}
} catch (Exception e) {
log.error("{}", e.getMessage(), e);
failed.set(true);
}
}
}
@Test
public void testThreads() throws Exception {
// test multiple threads using a single conditional writer
String tableName = getUniqueNames(1)[0];
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
NewTableConfiguration ntc = new NewTableConfiguration();
Random rand = new SecureRandom();
switch (rand.nextInt(3)) {
case 1:
ntc = ntc.withSplits(nss("4"));
break;
case 2:
ntc = ntc.withSplits(nss("3", "5"));
break;
}
client.tableOperations().create(tableName, ntc);
try (ConditionalWriter cw =
client.createConditionalWriter(tableName, new ConditionalWriterConfig())) {
ArrayList<ByteSequence> rows = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
rows.add(new ArrayByteSequence(
FastFormat.toZeroPaddedString(abs(rand.nextLong()), 16, 16, new byte[0])));
}
ArrayList<ConditionalMutation> mutations = new ArrayList<>();
for (ByteSequence row : rows)
mutations.add(new Stats(row).toMutation());
ArrayList<ByteSequence> rows2 = new ArrayList<>();
Iterator<Result> results = cw.write(mutations.iterator());
while (results.hasNext()) {
Result result = results.next();
assertEquals(Status.ACCEPTED, result.getStatus());
rows2.add(new ArrayByteSequence(result.getMutation().getRow()));
}
Collections.sort(rows);
Collections.sort(rows2);
assertEquals(rows, rows2);
AtomicBoolean failed = new AtomicBoolean(false);
ExecutorService tp = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
tp.submit(new MutatorTask(tableName, client, rows, cw, failed));
}
tp.shutdown();
while (!tp.isTerminated()) {
tp.awaitTermination(1, TimeUnit.MINUTES);
}
assertFalse("A MutatorTask failed with an exception", failed.get());
}
try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) {
RowIterator rowIter = new RowIterator(scanner);
while (rowIter.hasNext()) {
Iterator<Entry<Key,Value>> row = rowIter.next();
new Stats(row);
}
}
}
}
private SortedSet<Text> nss(String... splits) {
TreeSet<Text> ret = new TreeSet<>();
for (String split : splits)
ret.add(new Text(split));
return ret;
}
@Test
public void testSecurity() throws Exception {
// test against table user does not have read and/or write permissions for
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
String user = null;
// Create a new user
ClusterUser user1 = getUser(0);
user = user1.getPrincipal();
if (saslEnabled()) {
client.securityOperations().createLocalUser(user, null);
} else {
client.securityOperations().createLocalUser(user, new PasswordToken(user1.getPassword()));
}
String[] tables = getUniqueNames(3);
String table1 = tables[0], table2 = tables[1], table3 = tables[2];
// Create three tables
client.tableOperations().create(table1);
client.tableOperations().create(table2);
client.tableOperations().create(table3);
// Grant R on table1, W on table2, R/W on table3
client.securityOperations().grantTablePermission(user, table1, TablePermission.READ);
client.securityOperations().grantTablePermission(user, table2, TablePermission.WRITE);
client.securityOperations().grantTablePermission(user, table3, TablePermission.READ);
client.securityOperations().grantTablePermission(user, table3, TablePermission.WRITE);
ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq"));
cm1.put("tx", "seq", "1");
cm1.put("data", "x", "a");
try (
AccumuloClient client2 =
Accumulo.newClient().from(client.properties()).as(user, user1.getToken()).build();
ConditionalWriter cw1 =
client2.createConditionalWriter(table1, new ConditionalWriterConfig());
ConditionalWriter cw2 =
client2.createConditionalWriter(table2, new ConditionalWriterConfig());
ConditionalWriter cw3 =
client2.createConditionalWriter(table3, new ConditionalWriterConfig())) {
// Should be able to conditional-update a table we have R/W on
assertEquals(Status.ACCEPTED, cw3.write(cm1).getStatus());
// Conditional-update to a table we only have read on should fail
try {
Status status = cw1.write(cm1).getStatus();
fail("Expected exception writing conditional mutation to table"
+ " the user doesn't have write access to, Got status: " + status);
} catch (AccumuloSecurityException ase) {
}
// Conditional-update to a table we only have writer on should fail
try {
Status status = cw2.write(cm1).getStatus();
fail("Expected exception writing conditional mutation to table"
+ " the user doesn't have read access to. Got status: " + status);
} catch (AccumuloSecurityException ase) {
}
}
}
}
@Test
public void testTimeout() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
String table = getUniqueNames(1)[0];
client.tableOperations().create(table);
try (
ConditionalWriter cw = client.createConditionalWriter(table,
new ConditionalWriterConfig().setTimeout(3, TimeUnit.SECONDS));
Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) {
ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq"));
cm1.put("tx", "seq", "1");
cm1.put("data", "x", "a");
assertEquals(cw.write(cm1).getStatus(), Status.ACCEPTED);
IteratorSetting is = new IteratorSetting(5, SlowIterator.class);
SlowIterator.setSeekSleepTime(is, 5000);
ConditionalMutation cm2 = new ConditionalMutation("r1",
new Condition("tx", "seq").setValue("1").setIterators(is));
cm2.put("tx", "seq", "2");
cm2.put("data", "x", "b");
assertEquals(cw.write(cm2).getStatus(), Status.UNKNOWN);
for (Entry<Key,Value> entry : scanner) {
String cf = entry.getKey().getColumnFamilyData().toString();
String cq = entry.getKey().getColumnQualifierData().toString();
String val = entry.getValue().toString();
if (cf.equals("tx") && cq.equals("seq"))
assertEquals("Unexpected value in tx:seq", "1", val);
else if (cf.equals("data") && cq.equals("x"))
assertEquals("Unexpected value in data:x", "a", val);
else
fail("Saw unexpected column family and qualifier: " + entry);
}
ConditionalMutation cm3 =
new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1"));
cm3.put("tx", "seq", "2");
cm3.put("data", "x", "b");
assertEquals(cw.write(cm3).getStatus(), Status.ACCEPTED);
}
}
}
@Test
public void testDeleteTable() throws Exception {
String table = getUniqueNames(1)[0];
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
try {
client.createConditionalWriter(table, new ConditionalWriterConfig());
fail("Creating conditional writer for table that doesn't exist should fail");
} catch (TableNotFoundException e) {}
client.tableOperations().create(table);
try (ConditionalWriter cw =
client.createConditionalWriter(table, new ConditionalWriterConfig())) {
client.tableOperations().delete(table);
ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq"));
cm1.put("tx", "seq", "1");
cm1.put("data", "x", "a");
Result result = cw.write(cm1);
try {
Status status = result.getStatus();
fail("Expected exception writing conditional mutation to deleted table. Got status: "
+ status);
} catch (AccumuloException ae) {
assertEquals(TableDeletedException.class, ae.getCause().getClass());
}
}
}
}
@Test
public void testOffline() throws Exception {
String table = getUniqueNames(1)[0];
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
client.tableOperations().create(table);
try (ConditionalWriter cw =
client.createConditionalWriter(table, new ConditionalWriterConfig())) {
client.tableOperations().offline(table, true);
ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq"));
cm1.put("tx", "seq", "1");
cm1.put("data", "x", "a");
Result result = cw.write(cm1);
try {
Status status = result.getStatus();
fail("Expected exception writing conditional mutation to offline table. Got status: "
+ status);
} catch (AccumuloException ae) {
assertEquals(TableOfflineException.class, ae.getCause().getClass());
}
try {
client.createConditionalWriter(table, new ConditionalWriterConfig());
fail("Expected exception creating conditional writer to offline table");
} catch (TableOfflineException e) {}
}
}
}
@Test
public void testError() throws Exception {
String table = getUniqueNames(1)[0];
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
client.tableOperations().create(table);
try (ConditionalWriter cw =
client.createConditionalWriter(table, new ConditionalWriterConfig())) {
IteratorSetting iterSetting = new IteratorSetting(5, BadIterator.class);
ConditionalMutation cm1 =
new ConditionalMutation("r1", new Condition("tx", "seq").setIterators(iterSetting));
cm1.put("tx", "seq", "1");
cm1.put("data", "x", "a");
Result result = cw.write(cm1);
try {
Status status = result.getStatus();
fail("Expected exception using iterator which throws an error, Got status: " + status);
} catch (AccumuloException ae) {
}
}
}
}
@Test(expected = IllegalArgumentException.class)
public void testNoConditions() throws AccumuloException, AccumuloSecurityException,
TableExistsException, TableNotFoundException {
String table = getUniqueNames(1)[0];
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
client.tableOperations().create(table);
try (ConditionalWriter cw =
client.createConditionalWriter(table, new ConditionalWriterConfig())) {
ConditionalMutation cm1 = new ConditionalMutation("r1");
cm1.put("tx", "seq", "1");
cm1.put("data", "x", "a");
cw.write(cm1);
}
}
}
@Test
public void testTrace() throws Exception {
// Need to add a getClientConfig() to AccumuloCluster
Process tracer = null;
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
MiniAccumuloClusterImpl mac = getCluster();
if (!client.tableOperations().exists("trace")) {
tracer = mac.exec(TraceServer.class).getProcess();
while (!client.tableOperations().exists("trace")) {
sleepUninterruptibly(1, TimeUnit.SECONDS);
}
}
String tableName = getUniqueNames(1)[0];
client.tableOperations().create(tableName);
TraceUtil.enableClientTraces("localhost", "testTrace", mac.getClientProperties());
sleepUninterruptibly(1, TimeUnit.SECONDS);
long rootTraceId;
try (TraceScope root = Trace.startSpan("traceTest", Sampler.ALWAYS); ConditionalWriter cw =
client.createConditionalWriter(tableName, new ConditionalWriterConfig())) {
rootTraceId = root.getSpan().getTraceId();
// mutation conditional on column tx:seq not exiting
ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq"));
cm0.put("name", "last", "doe");
cm0.put("name", "first", "john");
cm0.put("tx", "seq", "1");
assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus());
}
try (Scanner scanner = client.createScanner("trace", Authorizations.EMPTY)) {
scanner.setRange(new Range(new Text(Long.toHexString(rootTraceId))));
loop: while (true) {
final StringBuilder finalBuffer = new StringBuilder();
int traceCount = TraceDump.printTrace(scanner, line -> {
try {
finalBuffer.append(line).append("\n");
} catch (Exception ex) {
throw new RuntimeException(ex);
}
});
String traceOutput = finalBuffer.toString();
log.info("Trace output:" + traceOutput);
if (traceCount > 0) {
String[] parts = ("traceTest, startScan,startConditionalUpdate,conditionalUpdate"
+ ",Check conditions,apply conditional mutations").split(",");
int lastPos = 0;
for (String part : parts) {
log.info("Looking in trace output for '" + part + "'");
int pos = traceOutput.indexOf(part);
if (pos == -1) {
log.info("Trace output doesn't contain '" + part + "'");
Thread.sleep(1000);
break loop;
}
assertTrue("Did not find '" + part + "' in output", pos > 0);
assertTrue("'" + part + "' occurred earlier than the previous element unexpectedly",
pos > lastPos);
lastPos = pos;
}
break;
} else {
log.info("Ignoring trace output as traceCount not greater than zero: " + traceCount);
Thread.sleep(1000);
}
}
if (tracer != null) {
tracer.destroy();
}
}
}
}
}