blob: 4967e3f0e1cb1bab5e63d9563c766ce975112648 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.clientImpl;
import java.io.IOException;
import java.lang.management.CompilationMXBean;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Durability;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableDeletedException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.client.TimedOutException;
import org.apache.accumulo.core.clientImpl.TabletLocator.TabletServerMutations;
import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.constraints.Violations;
import org.apache.accumulo.core.data.ConstraintViolationSummary;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
import org.apache.accumulo.core.dataImpl.thrift.TMutation;
import org.apache.accumulo.core.dataImpl.thrift.UpdateErrors;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
/*
* Differences from previous TabletServerBatchWriter
* + As background threads finish sending mutations to tablet servers they decrement memory usage
* + Once the queue of unprocessed mutations reaches 50% it is always pushed
* to the background threads, even if they are currently processing... new
* mutations are merged with mutations currently processing in the background
* + Failed mutations are held for 1000ms and then re-added to the unprocessed queue
* + Flush holds adding of new mutations so it does not wait indefinitely
*
* Considerations
* + All background threads must catch and note Throwable
* + mutations for a single tablet server are only processed by one thread
* concurrently (if new mutations come in for a tablet server while one
* thread is processing mutations for it, no other thread should
* start processing those mutations)
*
* Memory accounting
* + when a mutation enters the system memory is incremented
* + when a mutation successfully leaves the system memory is decremented
*/
public class TabletServerBatchWriter implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(TabletServerBatchWriter.class);
// basic configuration
private final ClientContext context;
private final long maxMem;
private final long maxLatency;
private final long timeout;
private final Durability durability;
// state
private boolean flushing;
private boolean closed;
private MutationSet mutations;
// background writer
private final MutationWriter writer;
// latency timers
private final Timer jtimer = new Timer("BatchWriterLatencyTimer", true);
private final Map<String,TimeoutTracker> timeoutTrackers =
Collections.synchronizedMap(new HashMap<>());
// stats
private long totalMemUsed = 0;
private long lastProcessingStartTime;
private long totalAdded = 0;
private final AtomicLong totalSent = new AtomicLong(0);
private final AtomicLong totalBinned = new AtomicLong(0);
private final AtomicLong totalBinTime = new AtomicLong(0);
private final AtomicLong totalSendTime = new AtomicLong(0);
private long startTime = 0;
private long initialGCTimes;
private long initialCompileTimes;
private double initialSystemLoad;
private AtomicInteger tabletServersBatchSum = new AtomicInteger(0);
private AtomicInteger tabletBatchSum = new AtomicInteger(0);
private AtomicInteger numBatches = new AtomicInteger(0);
private AtomicInteger maxTabletBatch = new AtomicInteger(Integer.MIN_VALUE);
private AtomicInteger minTabletBatch = new AtomicInteger(Integer.MAX_VALUE);
private AtomicInteger minTabletServersBatch = new AtomicInteger(Integer.MAX_VALUE);
private AtomicInteger maxTabletServersBatch = new AtomicInteger(Integer.MIN_VALUE);
// error handling
private final Violations violations = new Violations();
private final Map<KeyExtent,Set<SecurityErrorCode>> authorizationFailures = new HashMap<>();
private final HashSet<String> serverSideErrors = new HashSet<>();
private final FailedMutations failedMutations = new FailedMutations();
private int unknownErrors = 0;
private boolean somethingFailed = false;
private Throwable lastUnknownError = null;
private static class TimeoutTracker {
final String server;
final long timeOut;
long activityTime;
Long firstErrorTime = null;
TimeoutTracker(String server, long timeOut) {
this.timeOut = timeOut;
this.server = server;
}
void startingWrite() {
activityTime = System.currentTimeMillis();
}
void madeProgress() {
activityTime = System.currentTimeMillis();
firstErrorTime = null;
}
void wroteNothing() {
if (firstErrorTime == null) {
firstErrorTime = activityTime;
} else if (System.currentTimeMillis() - firstErrorTime > timeOut) {
throw new TimedOutException(Collections.singleton(server));
}
}
void errorOccured() {
wroteNothing();
}
public long getTimeOut() {
return timeOut;
}
}
public TabletServerBatchWriter(ClientContext context, BatchWriterConfig config) {
this.context = context;
this.maxMem = config.getMaxMemory();
this.maxLatency = config.getMaxLatency(TimeUnit.MILLISECONDS) <= 0 ? Long.MAX_VALUE
: config.getMaxLatency(TimeUnit.MILLISECONDS);
this.timeout = config.getTimeout(TimeUnit.MILLISECONDS);
this.mutations = new MutationSet();
this.lastProcessingStartTime = System.currentTimeMillis();
this.durability = config.getDurability();
this.writer = new MutationWriter(config.getMaxWriteThreads());
if (this.maxLatency != Long.MAX_VALUE) {
jtimer.schedule(new TimerTask() {
@Override
public void run() {
try {
synchronized (TabletServerBatchWriter.this) {
if ((System.currentTimeMillis() - lastProcessingStartTime)
> TabletServerBatchWriter.this.maxLatency)
startProcessing();
}
} catch (Throwable t) {
updateUnknownErrors("Max latency task failed " + t.getMessage(), t);
}
}
}, 0, this.maxLatency / 4);
}
}
private synchronized void startProcessing() {
if (mutations.getMemoryUsed() == 0)
return;
lastProcessingStartTime = System.currentTimeMillis();
writer.queueMutations(mutations);
mutations = new MutationSet();
}
private synchronized void decrementMemUsed(long amount) {
totalMemUsed -= amount;
this.notifyAll();
}
public synchronized void addMutation(TableId table, Mutation m)
throws MutationsRejectedException {
if (closed)
throw new IllegalStateException("Closed");
if (m.size() == 0)
throw new IllegalArgumentException("Can not add empty mutations");
checkForFailures();
waitRTE(() -> (totalMemUsed > maxMem || flushing) && !somethingFailed);
// do checks again since things could have changed while waiting and not holding lock
if (closed)
throw new IllegalStateException("Closed");
checkForFailures();
if (startTime == 0) {
startTime = System.currentTimeMillis();
List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
for (GarbageCollectorMXBean garbageCollectorMXBean : gcmBeans) {
initialGCTimes += garbageCollectorMXBean.getCollectionTime();
}
CompilationMXBean compMxBean = ManagementFactory.getCompilationMXBean();
if (compMxBean.isCompilationTimeMonitoringSupported()) {
initialCompileTimes = compMxBean.getTotalCompilationTime();
}
initialSystemLoad = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
}
// create a copy of mutation so that after this method returns the user
// is free to reuse the mutation object, like calling readFields... this
// is important for the case where a mutation is passed from map to reduce
// to batch writer... the map reduce code will keep passing the same mutation
// object into the reduce method
m = new Mutation(m);
totalMemUsed += m.estimatedMemoryUsed();
mutations.addMutation(table, m);
totalAdded++;
if (mutations.getMemoryUsed() >= maxMem / 2) {
startProcessing();
checkForFailures();
}
}
public void addMutation(TableId table, Iterator<Mutation> iterator)
throws MutationsRejectedException {
while (iterator.hasNext()) {
addMutation(table, iterator.next());
}
}
public synchronized void flush() throws MutationsRejectedException {
if (closed)
throw new IllegalStateException("Closed");
try (TraceScope span = Trace.startSpan("flush")) {
checkForFailures();
if (flushing) {
// some other thread is currently flushing, so wait
waitRTE(() -> flushing && !somethingFailed);
checkForFailures();
return;
}
flushing = true;
startProcessing();
checkForFailures();
waitRTE(() -> totalMemUsed > 0 && !somethingFailed);
flushing = false;
this.notifyAll();
checkForFailures();
}
}
@Override
public synchronized void close() throws MutationsRejectedException {
if (closed)
return;
try (TraceScope span = Trace.startSpan("close")) {
closed = true;
startProcessing();
waitRTE(() -> totalMemUsed > 0 && !somethingFailed);
logStats();
checkForFailures();
} finally {
// make a best effort to release these resources
writer.binningThreadPool.shutdownNow();
writer.sendThreadPool.shutdownNow();
jtimer.cancel();
}
}
private void logStats() {
if (log.isTraceEnabled()) {
long finishTime = System.currentTimeMillis();
long finalGCTimes = 0;
List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
for (GarbageCollectorMXBean garbageCollectorMXBean : gcmBeans) {
finalGCTimes += garbageCollectorMXBean.getCollectionTime();
}
CompilationMXBean compMxBean = ManagementFactory.getCompilationMXBean();
long finalCompileTimes = 0;
if (compMxBean.isCompilationTimeMonitoringSupported()) {
finalCompileTimes = compMxBean.getTotalCompilationTime();
}
double averageRate = totalSent.get() / (totalSendTime.get() / 1000.0);
double overallRate = totalAdded / ((finishTime - startTime) / 1000.0);
double finalSystemLoad = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
log.trace("");
log.trace("TABLET SERVER BATCH WRITER STATISTICS");
log.trace(String.format("Added : %,10d mutations", totalAdded));
log.trace(String.format("Sent : %,10d mutations", totalSent.get()));
log.trace(String.format("Resent percentage : %10.2f%s",
(totalSent.get() - totalAdded) / (double) totalAdded * 100.0, "%"));
log.trace(
String.format("Overall time : %,10.2f secs", (finishTime - startTime) / 1000.0));
log.trace(String.format("Overall send rate : %,10.2f mutations/sec", overallRate));
log.trace(
String.format("Send efficiency : %10.2f%s", overallRate / averageRate * 100.0, "%"));
log.trace("");
log.trace("BACKGROUND WRITER PROCESS STATISTICS");
log.trace(
String.format("Total send time : %,10.2f secs %6.2f%s", totalSendTime.get() / 1000.0,
100.0 * totalSendTime.get() / (finishTime - startTime), "%"));
log.trace(String.format("Average send rate : %,10.2f mutations/sec", averageRate));
log.trace(String.format("Total bin time : %,10.2f secs %6.2f%s",
totalBinTime.get() / 1000.0, 100.0 * totalBinTime.get() / (finishTime - startTime), "%"));
log.trace(String.format("Average bin rate : %,10.2f mutations/sec",
totalBinned.get() / (totalBinTime.get() / 1000.0)));
log.trace(String.format("tservers per batch : %,8.2f avg %,6d min %,6d max",
(float) (numBatches.get() != 0 ? (tabletServersBatchSum.get() / numBatches.get()) : 0),
minTabletServersBatch.get(), maxTabletServersBatch.get()));
log.trace(String.format("tablets per batch : %,8.2f avg %,6d min %,6d max",
(float) (numBatches.get() != 0 ? (tabletBatchSum.get() / numBatches.get()) : 0),
minTabletBatch.get(), maxTabletBatch.get()));
log.trace("");
log.trace("SYSTEM STATISTICS");
log.trace(String.format("JVM GC Time : %,10.2f secs",
((finalGCTimes - initialGCTimes) / 1000.0)));
if (compMxBean.isCompilationTimeMonitoringSupported()) {
log.trace(String.format("JVM Compile Time : %,10.2f secs",
(finalCompileTimes - initialCompileTimes) / 1000.0));
}
log.trace(String.format("System load average : initial=%6.2f final=%6.2f", initialSystemLoad,
finalSystemLoad));
}
}
private void updateSendStats(long count, long time) {
totalSent.addAndGet(count);
totalSendTime.addAndGet(time);
}
public void updateBinningStats(int count, long time,
Map<String,TabletServerMutations<Mutation>> binnedMutations) {
if (log.isTraceEnabled()) {
totalBinTime.addAndGet(time);
totalBinned.addAndGet(count);
updateBatchStats(binnedMutations);
}
}
private static void computeMin(AtomicInteger stat, int update) {
int old = stat.get();
while (!stat.compareAndSet(old, Math.min(old, update))) {
old = stat.get();
}
}
private static void computeMax(AtomicInteger stat, int update) {
int old = stat.get();
while (!stat.compareAndSet(old, Math.max(old, update))) {
old = stat.get();
}
}
private void updateBatchStats(Map<String,TabletServerMutations<Mutation>> binnedMutations) {
tabletServersBatchSum.addAndGet(binnedMutations.size());
computeMin(minTabletServersBatch, binnedMutations.size());
computeMax(maxTabletServersBatch, binnedMutations.size());
int numTablets = 0;
for (Entry<String,TabletServerMutations<Mutation>> entry : binnedMutations.entrySet()) {
TabletServerMutations<Mutation> tsm = entry.getValue();
numTablets += tsm.getMutations().size();
}
tabletBatchSum.addAndGet(numTablets);
computeMin(minTabletBatch, numTablets);
computeMax(maxTabletBatch, numTablets);
numBatches.incrementAndGet();
}
private interface WaitCondition {
boolean shouldWait();
}
private void waitRTE(WaitCondition condition) {
try {
while (condition.shouldWait()) {
wait();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// BEGIN code for handling unrecoverable errors
private void updatedConstraintViolations(List<ConstraintViolationSummary> cvsList) {
if (!cvsList.isEmpty()) {
synchronized (this) {
somethingFailed = true;
violations.add(cvsList);
this.notifyAll();
}
}
}
private void updateAuthorizationFailures(Set<KeyExtent> keySet, SecurityErrorCode code) {
HashMap<KeyExtent,SecurityErrorCode> map = new HashMap<>();
for (KeyExtent ke : keySet)
map.put(ke, code);
updateAuthorizationFailures(map);
}
private void updateAuthorizationFailures(Map<KeyExtent,SecurityErrorCode> authorizationFailures) {
if (!authorizationFailures.isEmpty()) {
// was a table deleted?
HashSet<TableId> tableIds = new HashSet<>();
for (KeyExtent ke : authorizationFailures.keySet())
tableIds.add(ke.tableId());
Tables.clearCache(context);
for (TableId tableId : tableIds)
if (!Tables.exists(context, tableId))
throw new TableDeletedException(tableId.canonical());
synchronized (this) {
somethingFailed = true;
// add these authorizationFailures to those collected by this batch writer
authorizationFailures.forEach((ke, code) -> this.authorizationFailures
.computeIfAbsent(ke, k -> new HashSet<>()).add(code));
this.notifyAll();
}
}
}
private synchronized void updateServerErrors(String server, Exception e) {
somethingFailed = true;
this.serverSideErrors.add(server);
this.notifyAll();
log.error("Server side error on {}", server, e);
}
private synchronized void updateUnknownErrors(String msg, Throwable t) {
somethingFailed = true;
unknownErrors++;
this.lastUnknownError = t;
this.notifyAll();
if (t instanceof TableDeletedException || t instanceof TableOfflineException
|| t instanceof TimedOutException)
log.debug("{}", msg, t); // this is not unknown
else
log.error("{}", msg, t);
}
private void checkForFailures() throws MutationsRejectedException {
if (somethingFailed) {
List<ConstraintViolationSummary> cvsList = violations.asList();
HashMap<TabletId,Set<org.apache.accumulo.core.client.security.SecurityErrorCode>> af =
new HashMap<>();
for (Entry<KeyExtent,Set<SecurityErrorCode>> entry : authorizationFailures.entrySet()) {
HashSet<org.apache.accumulo.core.client.security.SecurityErrorCode> codes = new HashSet<>();
for (SecurityErrorCode sce : entry.getValue()) {
codes.add(org.apache.accumulo.core.client.security.SecurityErrorCode.valueOf(sce.name()));
}
af.put(new TabletIdImpl(entry.getKey()), codes);
}
throw new MutationsRejectedException(context, cvsList, af, serverSideErrors, unknownErrors,
lastUnknownError);
}
}
// END code for handling unrecoverable errors
// BEGIN code for handling failed mutations
/**
* Add mutations that previously failed back into the mix
*/
private synchronized void addFailedMutations(MutationSet failedMutations) {
mutations.addAll(failedMutations);
if (mutations.getMemoryUsed() >= maxMem / 2 || closed || flushing) {
startProcessing();
}
}
private class FailedMutations extends TimerTask {
private MutationSet recentFailures = null;
private long initTime;
FailedMutations() {
jtimer.schedule(this, 0, 500);
}
private MutationSet init() {
if (recentFailures == null) {
recentFailures = new MutationSet();
initTime = System.currentTimeMillis();
}
return recentFailures;
}
synchronized void add(TableId table, ArrayList<Mutation> tableFailures) {
init().addAll(table, tableFailures);
}
synchronized void add(MutationSet failures) {
init().addAll(failures);
}
synchronized void add(TabletServerMutations<Mutation> tsm) {
init();
tsm.getMutations().forEach((ke, muts) -> recentFailures.addAll(ke.tableId(), muts));
}
@Override
public void run() {
try {
MutationSet rf = null;
synchronized (this) {
if (recentFailures != null && System.currentTimeMillis() - initTime > 1000) {
rf = recentFailures;
recentFailures = null;
}
}
if (rf != null) {
if (log.isTraceEnabled())
log.trace("tid={} Requeuing {} failed mutations", Thread.currentThread().getId(),
rf.size());
addFailedMutations(rf);
}
} catch (Throwable t) {
updateUnknownErrors("tid=" + Thread.currentThread().getId()
+ " Failed to requeue failed mutations " + t.getMessage(), t);
cancel();
}
}
}
// END code for handling failed mutations
// BEGIN code for sending mutations to tablet servers using background threads
private class MutationWriter {
private static final int MUTATION_BATCH_SIZE = 1 << 17;
private final ExecutorService sendThreadPool;
private final SimpleThreadPool binningThreadPool;
private final Map<String,TabletServerMutations<Mutation>> serversMutations;
private final Set<String> queued;
private final Map<TableId,TabletLocator> locators;
public MutationWriter(int numSendThreads) {
serversMutations = new HashMap<>();
queued = new HashSet<>();
sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName());
locators = new HashMap<>();
binningThreadPool = new SimpleThreadPool(1, "BinMutations", new SynchronousQueue<>());
binningThreadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
}
private synchronized TabletLocator getLocator(TableId tableId) {
TabletLocator ret = locators.get(tableId);
if (ret == null) {
ret = new TimeoutTabletLocator(timeout, context, tableId);
locators.put(tableId, ret);
}
return ret;
}
private void binMutations(MutationSet mutationsToProcess,
Map<String,TabletServerMutations<Mutation>> binnedMutations) {
TableId tableId = null;
try {
Set<Entry<TableId,List<Mutation>>> es = mutationsToProcess.getMutations().entrySet();
for (Entry<TableId,List<Mutation>> entry : es) {
tableId = entry.getKey();
TabletLocator locator = getLocator(tableId);
List<Mutation> tableMutations = entry.getValue();
if (tableMutations != null) {
ArrayList<Mutation> tableFailures = new ArrayList<>();
locator.binMutations(context, tableMutations, binnedMutations, tableFailures);
if (!tableFailures.isEmpty()) {
failedMutations.add(tableId, tableFailures);
if (tableFailures.size() == tableMutations.size())
if (!Tables.exists(context, entry.getKey()))
throw new TableDeletedException(entry.getKey().canonical());
else if (Tables.getTableState(context, tableId) == TableState.OFFLINE)
throw new TableOfflineException(
Tables.getTableOfflineMsg(context, entry.getKey()));
}
}
}
return;
} catch (AccumuloServerException ase) {
updateServerErrors(ase.getServer(), ase);
} catch (AccumuloException ae) {
// assume an IOError communicating with metadata tablet
failedMutations.add(mutationsToProcess);
} catch (AccumuloSecurityException e) {
updateAuthorizationFailures(Collections.singletonMap(new KeyExtent(tableId, null, null),
SecurityErrorCode.valueOf(e.getSecurityErrorCode().name())));
} catch (TableDeletedException | TableNotFoundException | TableOfflineException e) {
updateUnknownErrors(e.getMessage(), e);
}
// an error occurred
binnedMutations.clear();
}
void queueMutations(final MutationSet mutationsToSend) {
if (mutationsToSend == null)
return;
binningThreadPool.execute(Trace.wrap(() -> {
if (mutationsToSend != null) {
try {
log.trace("{} - binning {} mutations", Thread.currentThread().getName(),
mutationsToSend.size());
addMutations(mutationsToSend);
} catch (Exception e) {
updateUnknownErrors("Error processing mutation set", e);
}
}
}));
}
private void addMutations(MutationSet mutationsToSend) {
Map<String,TabletServerMutations<Mutation>> binnedMutations = new HashMap<>();
try (TraceScope span = Trace.startSpan("binMutations")) {
long t1 = System.currentTimeMillis();
binMutations(mutationsToSend, binnedMutations);
long t2 = System.currentTimeMillis();
updateBinningStats(mutationsToSend.size(), (t2 - t1), binnedMutations);
}
addMutations(binnedMutations);
}
private synchronized void
addMutations(Map<String,TabletServerMutations<Mutation>> binnedMutations) {
int count = 0;
// merge mutations into existing mutations for a tablet server
for (Entry<String,TabletServerMutations<Mutation>> entry : binnedMutations.entrySet()) {
String server = entry.getKey();
TabletServerMutations<Mutation> currentMutations = serversMutations.get(server);
if (currentMutations == null) {
serversMutations.put(server, entry.getValue());
} else {
for (Entry<KeyExtent,List<Mutation>> entry2 : entry.getValue().getMutations()
.entrySet()) {
for (Mutation m : entry2.getValue()) {
currentMutations.addMutation(entry2.getKey(), m);
}
}
}
if (log.isTraceEnabled())
for (Entry<KeyExtent,List<Mutation>> entry2 : entry.getValue().getMutations().entrySet())
count += entry2.getValue().size();
}
if (count > 0 && log.isTraceEnabled())
log.trace(String.format("Started sending %,d mutations to %,d tablet servers", count,
binnedMutations.keySet().size()));
// randomize order of servers
ArrayList<String> servers = new ArrayList<>(binnedMutations.keySet());
Collections.shuffle(servers);
for (String server : servers)
if (!queued.contains(server)) {
sendThreadPool.submit(Trace.wrap(new SendTask(server)));
queued.add(server);
}
}
private synchronized TabletServerMutations<Mutation> getMutationsToSend(String server) {
TabletServerMutations<Mutation> tsmuts = serversMutations.remove(server);
if (tsmuts == null)
queued.remove(server);
return tsmuts;
}
class SendTask implements Runnable {
private final String location;
SendTask(String server) {
this.location = server;
}
@Override
public void run() {
try {
TabletServerMutations<Mutation> tsmuts = getMutationsToSend(location);
while (tsmuts != null) {
send(tsmuts);
tsmuts = getMutationsToSend(location);
}
return;
} catch (Throwable t) {
updateUnknownErrors(
"Failed to send tablet server " + location + " its batch : " + t.getMessage(), t);
}
}
public void send(TabletServerMutations<Mutation> tsm)
throws AccumuloServerException, AccumuloSecurityException {
MutationSet failures = null;
String oldName = Thread.currentThread().getName();
Map<KeyExtent,List<Mutation>> mutationBatch = tsm.getMutations();
try {
long count = 0;
Set<TableId> tableIds = new TreeSet<>();
for (Map.Entry<KeyExtent,List<Mutation>> entry : mutationBatch.entrySet()) {
count += entry.getValue().size();
tableIds.add(entry.getKey().tableId());
}
String msg = "sending " + String.format("%,d", count) + " mutations to "
+ String.format("%,d", mutationBatch.size()) + " tablets at " + location + " tids: ["
+ Joiner.on(',').join(tableIds) + ']';
Thread.currentThread().setName(msg);
try (TraceScope span = Trace.startSpan("sendMutations")) {
TimeoutTracker timeoutTracker = timeoutTrackers.get(location);
if (timeoutTracker == null) {
timeoutTracker = new TimeoutTracker(location, timeout);
timeoutTrackers.put(location, timeoutTracker);
}
long st1 = System.currentTimeMillis();
failures = sendMutationsToTabletServer(location, mutationBatch, timeoutTracker);
long st2 = System.currentTimeMillis();
if (log.isTraceEnabled())
log.trace("sent " + String.format("%,d", count) + " mutations to " + location + " in "
+ String.format("%.2f secs (%,.2f mutations/sec) with %,d failures",
(st2 - st1) / 1000.0, count / ((st2 - st1) / 1000.0), failures.size()));
long successBytes = 0;
for (Entry<KeyExtent,List<Mutation>> entry : mutationBatch.entrySet()) {
for (Mutation mutation : entry.getValue()) {
successBytes += mutation.estimatedMemoryUsed();
}
}
if (failures.size() > 0) {
failedMutations.add(failures);
successBytes -= failures.getMemoryUsed();
}
updateSendStats(count, st2 - st1);
decrementMemUsed(successBytes);
}
} catch (IOException e) {
if (log.isTraceEnabled())
log.trace("failed to send mutations to {} : {}", location, e.getMessage());
HashSet<TableId> tables = new HashSet<>();
for (KeyExtent ke : mutationBatch.keySet())
tables.add(ke.tableId());
for (TableId table : tables)
getLocator(table).invalidateCache(context, location);
failedMutations.add(tsm);
} finally {
Thread.currentThread().setName(oldName);
}
}
}
private MutationSet sendMutationsToTabletServer(String location,
Map<KeyExtent,List<Mutation>> tabMuts, TimeoutTracker timeoutTracker)
throws IOException, AccumuloSecurityException, AccumuloServerException {
if (tabMuts.isEmpty()) {
return new MutationSet();
}
TInfo tinfo = TraceUtil.traceInfo();
timeoutTracker.startingWrite();
try {
final HostAndPort parsedServer = HostAndPort.fromString(location);
final TabletClientService.Iface client;
if (timeoutTracker.getTimeOut() < context.getClientTimeoutInMillis())
client = ThriftUtil.getTServerClient(parsedServer, context, timeoutTracker.getTimeOut());
else
client = ThriftUtil.getTServerClient(parsedServer, context);
try {
MutationSet allFailures = new MutationSet();
if (tabMuts.size() == 1 && tabMuts.values().iterator().next().size() == 1) {
Entry<KeyExtent,List<Mutation>> entry = tabMuts.entrySet().iterator().next();
try {
client.update(tinfo, context.rpcCreds(), entry.getKey().toThrift(),
entry.getValue().get(0).toThrift(), DurabilityImpl.toThrift(durability));
} catch (NotServingTabletException e) {
allFailures.addAll(entry.getKey().tableId(), entry.getValue());
getLocator(entry.getKey().tableId()).invalidateCache(entry.getKey());
} catch (ConstraintViolationException e) {
updatedConstraintViolations(
Translator.translate(e.violationSummaries, Translators.TCVST));
}
timeoutTracker.madeProgress();
} else {
long usid =
client.startUpdate(tinfo, context.rpcCreds(), DurabilityImpl.toThrift(durability));
List<TMutation> updates = new ArrayList<>();
for (Entry<KeyExtent,List<Mutation>> entry : tabMuts.entrySet()) {
long size = 0;
Iterator<Mutation> iter = entry.getValue().iterator();
while (iter.hasNext()) {
while (size < MUTATION_BATCH_SIZE && iter.hasNext()) {
Mutation mutation = iter.next();
updates.add(mutation.toThrift());
size += mutation.numBytes();
}
client.applyUpdates(tinfo, usid, entry.getKey().toThrift(), updates);
updates.clear();
size = 0;
}
}
UpdateErrors updateErrors = client.closeUpdate(tinfo, usid);
Map<KeyExtent,Long> failures =
Translator.translate(updateErrors.failedExtents, Translators.TKET);
updatedConstraintViolations(
Translator.translate(updateErrors.violationSummaries, Translators.TCVST));
updateAuthorizationFailures(
Translator.translate(updateErrors.authorizationFailures, Translators.TKET));
long totalCommitted = 0;
for (Entry<KeyExtent,Long> entry : failures.entrySet()) {
KeyExtent failedExtent = entry.getKey();
int numCommitted = (int) (long) entry.getValue();
totalCommitted += numCommitted;
TableId tableId = failedExtent.tableId();
getLocator(tableId).invalidateCache(failedExtent);
List<Mutation> mutations = tabMuts.get(failedExtent);
allFailures.addAll(tableId, mutations.subList(numCommitted, mutations.size()));
}
if (failures.keySet().containsAll(tabMuts.keySet()) && totalCommitted == 0) {
// nothing was successfully written
timeoutTracker.wroteNothing();
} else {
// successfully wrote something to tablet server
timeoutTracker.madeProgress();
}
}
return allFailures;
} finally {
ThriftUtil.returnClient((TServiceClient) client);
}
} catch (TTransportException e) {
timeoutTracker.errorOccured();
throw new IOException(e);
} catch (TApplicationException tae) {
updateServerErrors(location, tae);
throw new AccumuloServerException(location, tae);
} catch (ThriftSecurityException e) {
updateAuthorizationFailures(tabMuts.keySet(), e.code);
throw new AccumuloSecurityException(e.user, e.code, e);
} catch (TException e) {
throw new IOException(e);
}
}
}
// END code for sending mutations to tablet servers using background threads
private static class MutationSet {
private final HashMap<TableId,List<Mutation>> mutations;
private long memoryUsed = 0;
MutationSet() {
mutations = new HashMap<>();
}
void addMutation(TableId table, Mutation mutation) {
mutations.computeIfAbsent(table, k -> new ArrayList<>()).add(mutation);
memoryUsed += mutation.estimatedMemoryUsed();
}
Map<TableId,List<Mutation>> getMutations() {
return mutations;
}
int size() {
int result = 0;
for (List<Mutation> perTable : mutations.values()) {
result += perTable.size();
}
return result;
}
public void addAll(MutationSet failures) {
Set<Entry<TableId,List<Mutation>>> es = failures.getMutations().entrySet();
for (Entry<TableId,List<Mutation>> entry : es) {
TableId table = entry.getKey();
for (Mutation mutation : entry.getValue()) {
addMutation(table, mutation);
}
}
}
public void addAll(TableId table, List<Mutation> mutations) {
for (Mutation mutation : mutations) {
addMutation(table, mutation);
}
}
public long getMemoryUsed() {
return memoryUsed;
}
}
}