blob: f504f8b3e5728517cb970c6900f904e3100ae38e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.clientImpl.ClientInfo;
import org.apache.accumulo.core.clientImpl.TabletLocator;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.WrappingIterator;
import org.apache.accumulo.core.util.cleaner.CleanerUtil;
import org.apache.accumulo.test.util.SerializationUtil;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Iterator that opens a BatchWriter and writes to another table.
* <p>
* For each entry passed to this iterator, this writes a certain number of entries with the same key
* to another table and passes the entry downstream of this iterator with its value replaced by
* either "{@value #SUCCESS_STRING}" or a description of what failed. Success counts as all entries
* writing to the result table within a timeout period. Failure counts as one of the entries taking
* longer than the timeout period.
* <p>
* Configure this iterator by calling the static {@link #iteratorSetting} method.
*/
public class BatchWriterIterator extends WrappingIterator {
private static final Logger log = LoggerFactory.getLogger(BatchWriterIterator.class);
private static final String OPT_sleepAfterFirstWrite = "sleepAfterFirstWrite";
private static final String OPT_numEntriesToWritePerEntry = "numEntriesToWritePerEntry";
private static final String OPT_batchWriterTimeout = "batchWriterTimeout";
private static final String OPT_batchWriterMaxMemory = "batchWriterMaxMemory";
private static final String OPT_clearCacheAfterFirstWrite = "clearCacheAfterFirstWrite";
private static final String OPT_splitAfterFirstWrite = "splitAfterFirstWrite";
private static final String ZOOKEEPERHOST = "zookeeperHost";
private static final String INSTANCENAME = "instanceName";
private static final String TABLENAME = "tableName";
private static final String USERNAME = "username";
private static final String ZOOKEEPERTIMEOUT = "zookeeperTimeout";
// base64 encoding of token
private static final String AUTHENTICATION_TOKEN = "authenticationToken";
// class of token
private static final String AUTHENTICATION_TOKEN_CLASS = "authenticationTokenClass";
private static final String SUCCESS_STRING = "success";
public static final Value SUCCESS_VALUE = new Value(SUCCESS_STRING);
private Map<String,String> originalOptions; // remembered for deepCopy
private int sleepAfterFirstWrite = 0;
private int numEntriesToWritePerEntry = 10;
private long batchWriterTimeout = 0;
private long batchWriterMaxMemory = 0;
private boolean clearCacheAfterFirstWrite = false;
private boolean splitAfterFirstWrite = false;
private String instanceName;
private String tableName;
private String zookeeperHost;
private int zookeeperTimeout = -1;
private String username;
private AuthenticationToken auth = null;
private BatchWriter batchWriter;
private boolean firstWrite = true;
private Value topValue = null;
private AccumuloClient accumuloClient;
public static IteratorSetting iteratorSetting(int priority, int sleepAfterFirstWrite,
long batchWriterTimeout, long batchWriterMaxMemory, int numEntriesToWrite, String tableName,
AccumuloClient accumuloClient, AuthenticationToken token, boolean clearCacheAfterFirstWrite,
boolean splitAfterFirstWrite) {
ClientInfo info = ClientInfo.from(accumuloClient.properties());
return iteratorSetting(priority, sleepAfterFirstWrite, batchWriterTimeout, batchWriterMaxMemory,
numEntriesToWrite, tableName, info.getZooKeepers(), info.getInstanceName(),
info.getZooKeepersSessionTimeOut(), accumuloClient.whoami(), token,
clearCacheAfterFirstWrite, splitAfterFirstWrite);
}
public static IteratorSetting iteratorSetting(int priority, int sleepAfterFirstWrite,
long batchWriterTimeout, long batchWriterMaxMemory, int numEntriesToWrite, String tableName,
String zookeeperHost, String instanceName, int zookeeperTimeout, String username,
AuthenticationToken token, boolean clearCacheAfterFirstWrite, boolean splitAfterFirstWrite) {
IteratorSetting itset = new IteratorSetting(priority, BatchWriterIterator.class);
itset.addOption(OPT_sleepAfterFirstWrite, Integer.toString(sleepAfterFirstWrite));
itset.addOption(OPT_numEntriesToWritePerEntry, Integer.toString(numEntriesToWrite));
itset.addOption(OPT_batchWriterTimeout, Long.toString(batchWriterTimeout));
itset.addOption(OPT_batchWriterMaxMemory, Long.toString(batchWriterMaxMemory));
itset.addOption(OPT_clearCacheAfterFirstWrite, Boolean.toString(clearCacheAfterFirstWrite));
itset.addOption(OPT_splitAfterFirstWrite, Boolean.toString(splitAfterFirstWrite));
itset.addOption(TABLENAME, tableName);
itset.addOption(ZOOKEEPERHOST, zookeeperHost);
itset.addOption(ZOOKEEPERTIMEOUT, Integer.toString(zookeeperTimeout));
itset.addOption(INSTANCENAME, instanceName);
itset.addOption(USERNAME, username);
itset.addOption(AUTHENTICATION_TOKEN_CLASS, token.getClass().getName());
itset.addOption(AUTHENTICATION_TOKEN, SerializationUtil.serializeWritableBase64(token));
return itset;
}
@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
IteratorEnvironment env) throws IOException {
super.init(source, options, env);
parseOptions(options);
initBatchWriter();
}
private void parseOptions(Map<String,String> options) {
this.originalOptions = new HashMap<>(options);
if (options.containsKey(OPT_numEntriesToWritePerEntry))
numEntriesToWritePerEntry = Integer.parseInt(options.get(OPT_numEntriesToWritePerEntry));
if (options.containsKey(OPT_sleepAfterFirstWrite))
sleepAfterFirstWrite = Integer.parseInt(options.get(OPT_sleepAfterFirstWrite));
if (options.containsKey(OPT_batchWriterTimeout))
batchWriterTimeout = Long.parseLong(options.get(OPT_batchWriterTimeout));
if (options.containsKey(OPT_batchWriterMaxMemory))
batchWriterMaxMemory = Long.parseLong(options.get(OPT_batchWriterMaxMemory));
if (options.containsKey(OPT_clearCacheAfterFirstWrite))
clearCacheAfterFirstWrite = Boolean.parseBoolean(options.get(OPT_clearCacheAfterFirstWrite));
if (options.containsKey(OPT_splitAfterFirstWrite))
splitAfterFirstWrite = Boolean.parseBoolean(options.get(OPT_splitAfterFirstWrite));
instanceName = options.get(INSTANCENAME);
tableName = options.get(TABLENAME);
zookeeperHost = options.get(ZOOKEEPERHOST);
zookeeperTimeout = Integer.parseInt(options.get(ZOOKEEPERTIMEOUT));
username = options.get(USERNAME);
String authClass = options.get(AUTHENTICATION_TOKEN_CLASS);
String authString = options.get(AUTHENTICATION_TOKEN);
auth = SerializationUtil.subclassNewInstance(authClass, AuthenticationToken.class);
SerializationUtil.deserializeWritableBase64(auth, authString);
}
private void initBatchWriter() {
accumuloClient = Accumulo.newClient().to(instanceName, zookeeperHost).as(username, auth)
.zkTimeout(zookeeperTimeout).build();
BatchWriterConfig bwc = new BatchWriterConfig();
bwc.setMaxMemory(batchWriterMaxMemory);
bwc.setTimeout(batchWriterTimeout, TimeUnit.SECONDS);
try {
batchWriter = accumuloClient.createBatchWriter(tableName, bwc);
} catch (TableNotFoundException e) {
log.error(tableName + " does not exist in instance " + instanceName, e);
accumuloClient.close();
throw new RuntimeException(e);
} catch (RuntimeException e) {
accumuloClient.close();
throw e;
}
// this is dubious, but necessary since iterators aren't closeable
CleanerUtil.batchWriterAndClientCloser(this, log, batchWriter, accumuloClient);
}
/**
* Write numEntriesToWritePerEntry. Flush. Set topValue accordingly.
*/
private void processNext() {
assert hasTop();
Key k = getTopKey();
Text row = k.getRow(), cf = k.getColumnFamily(), cq = k.getColumnQualifier();
Value v = super.getTopValue();
String failure = null;
try {
for (int i = 0; i < numEntriesToWritePerEntry; i++) {
Mutation m = new Mutation(row);
m.put(cf, cq, v);
batchWriter.addMutation(m);
if (firstWrite) {
batchWriter.flush();
if (clearCacheAfterFirstWrite)
TabletLocator.clearLocators();
if (splitAfterFirstWrite) {
SortedSet<Text> splits = new TreeSet<>();
splits.add(new Text(row));
accumuloClient.tableOperations().addSplits(tableName, splits);
}
if (sleepAfterFirstWrite > 0)
try {
Thread.sleep(sleepAfterFirstWrite);
} catch (InterruptedException ignored) {}
firstWrite = false;
}
}
batchWriter.flush();
} catch (Exception e) {
// in particular: watching for TimedOutException
log.error("Problem while BatchWriting to target table " + tableName, e);
failure = e.getClass().getSimpleName() + ": " + e.getMessage();
}
topValue = failure == null ? SUCCESS_VALUE : new Value(failure);
}
@Override
public void next() throws IOException {
super.next();
if (hasTop())
processNext();
}
@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
throws IOException {
super.seek(range, columnFamilies, inclusive);
if (hasTop())
processNext();
}
@Override
public Value getTopValue() {
return topValue;
}
@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
BatchWriterIterator newInstance;
try {
newInstance = this.getClass().getDeclaredConstructor().newInstance();
newInstance.init(getSource().deepCopy(env), originalOptions, env);
return newInstance;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}