blob: ba56f0d95d0f0b3e655b793612ff7676262146c0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import java.io.DataInputStream;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.*;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.WriteFailureException;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.WriteResponseHandler;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
public class BatchlogManager implements BatchlogManagerMBean
{
private static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
private static final long REPLAY_INTERVAL = 60 * 1000; // milliseconds
private static final int PAGE_SIZE = 128; // same as HHOM, for now, w/out using any heuristics. TODO: set based on avg batch size.
private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
public static final BatchlogManager instance = new BatchlogManager();
private final AtomicLong totalBatchesReplayed = new AtomicLong();
// Single-thread executor service for scheduling and serializing log replay.
private static final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks");
public void start()
{
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try
{
mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
}
catch (Exception e)
{
throw new RuntimeException(e);
}
Runnable runnable = new WrappedRunnable()
{
public void runMayThrow() throws ExecutionException, InterruptedException
{
replayAllFailedBatches();
}
};
batchlogTasks.scheduleWithFixedDelay(runnable, StorageService.RING_DELAY, REPLAY_INTERVAL, TimeUnit.MILLISECONDS);
}
public static void shutdown() throws InterruptedException
{
batchlogTasks.shutdown();
batchlogTasks.awaitTermination(60, TimeUnit.SECONDS);
}
public int countAllBatches()
{
String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHLOG);
return (int) executeInternal(query).one().getLong("count");
}
public long getTotalBatchesReplayed()
{
return totalBatchesReplayed.longValue();
}
public void forceBatchlogReplay()
{
startBatchlogReplay();
}
public Future<?> startBatchlogReplay()
{
Runnable runnable = new WrappedRunnable()
{
public void runMayThrow() throws ExecutionException, InterruptedException
{
replayAllFailedBatches();
}
};
// If a replay is already in progress this request will be executed after it completes.
return batchlogTasks.submit(runnable);
}
public static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid, int version)
{
return getBatchlogMutationFor(mutations, uuid, version, FBUtilities.timestampMicros());
}
@VisibleForTesting
static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid, int version, long now)
{
ColumnFamily cf = ArrayBackedSortedColumns.factory.create(SystemKeyspace.Batchlog);
CFRowAdder adder = new CFRowAdder(cf, SystemKeyspace.Batchlog.comparator.builder().build(), now);
adder.add("data", serializeMutations(mutations, version))
.add("written_at", new Date(now / 1000))
.add("version", version);
return new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(uuid), cf);
}
private static ByteBuffer serializeMutations(Collection<Mutation> mutations, int version)
{
try (DataOutputBuffer buf = new DataOutputBuffer())
{
buf.writeInt(mutations.size());
for (Mutation mutation : mutations)
Mutation.serializer.serialize(mutation, buf, version);
return buf.buffer();
}
catch (IOException e)
{
throw new AssertionError(); // cannot happen.
}
}
private void replayAllFailedBatches() throws ExecutionException, InterruptedException
{
logger.trace("Started replayAllFailedBatches");
// rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
// max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size();
RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
UntypedResultSet page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s LIMIT %d",
SystemKeyspace.NAME,
SystemKeyspace.BATCHLOG,
PAGE_SIZE));
while (!page.isEmpty())
{
UUID id = processBatchlogPage(page, rateLimiter);
if (page.size() < PAGE_SIZE)
break; // we've exhausted the batchlog, next query would be empty.
page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s WHERE token(id) > token(?) LIMIT %d",
SystemKeyspace.NAME,
SystemKeyspace.BATCHLOG,
PAGE_SIZE),
id);
}
cleanup();
logger.trace("Finished replayAllFailedBatches");
}
private void deleteBatch(UUID id)
{
Mutation mutation = new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(id));
mutation.delete(SystemKeyspace.BATCHLOG, FBUtilities.timestampMicros());
mutation.apply();
}
private UUID processBatchlogPage(UntypedResultSet page, RateLimiter rateLimiter)
{
UUID id = null;
ArrayList<Batch> batches = new ArrayList<>(page.size());
// Sending out batches for replay without waiting for them, so that one stuck batch doesn't affect others
for (UntypedResultSet.Row row : page)
{
id = row.getUUID("id");
long writtenAt = row.getLong("written_at");
// enough time for the actual write + batchlog entry mutation delivery (two separate requests).
long timeout = getBatchlogTimeout();
if (System.currentTimeMillis() < writtenAt + timeout)
continue; // not ready to replay yet, might still get a deletion.
int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12;
Batch batch = new Batch(id, writtenAt, row.getBytes("data"), version);
try
{
if (batch.replay(rateLimiter) > 0)
{
batches.add(batch);
}
else
{
deleteBatch(id); // no write mutations were sent (either expired or all CFs involved truncated).
totalBatchesReplayed.incrementAndGet();
}
}
catch (IOException e)
{
logger.warn("Skipped batch replay of {} due to {}", id, e);
deleteBatch(id);
}
}
// now waiting for all batches to complete their processing
// schedule hints for timed out deliveries
for (Batch batch : batches)
{
batch.finish();
deleteBatch(batch.id);
}
totalBatchesReplayed.addAndGet(batches.size());
return id;
}
public long getBatchlogTimeout()
{
return DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation
}
private static class Batch
{
private final UUID id;
private final long writtenAt;
private final ByteBuffer data;
private final int version;
private List<ReplayWriteResponseHandler<Mutation>> replayHandlers;
public Batch(UUID id, long writtenAt, ByteBuffer data, int version)
{
this.id = id;
this.writtenAt = writtenAt;
this.data = data;
this.version = version;
}
public int replay(RateLimiter rateLimiter) throws IOException
{
logger.trace("Replaying batch {}", id);
List<Mutation> mutations = replayingMutations();
if (mutations.isEmpty())
return 0;
int ttl = calculateHintTTL(mutations);
if (ttl <= 0)
return 0;
replayHandlers = sendReplays(mutations, writtenAt, ttl);
rateLimiter.acquire(data.remaining()); // acquire afterwards, to not mess up ttl calculation.
return replayHandlers.size();
}
public void finish()
{
for (int i = 0; i < replayHandlers.size(); i++)
{
ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
try
{
handler.get();
}
catch (WriteTimeoutException|WriteFailureException e)
{
logger.trace("Failed replaying a batched mutation to a node, will write a hint");
logger.trace("Failure was : {}", e.getMessage());
// writing hints for the rest to hints, starting from i
writeHintsForUndeliveredEndpoints(i);
return;
}
}
}
private List<Mutation> replayingMutations() throws IOException
{
DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));
int size = in.readInt();
List<Mutation> mutations = new ArrayList<>(size);
for (int i = 0; i < size; i++)
{
Mutation mutation = Mutation.serializer.deserialize(in, version);
// Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis.
// We don't abort the replay entirely b/c this can be considered a success (truncated is same as delivered then
// truncated.
for (UUID cfId : mutation.getColumnFamilyIds())
if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId))
mutation = mutation.without(cfId);
if (!mutation.isEmpty())
mutations.add(mutation);
}
return mutations;
}
private void writeHintsForUndeliveredEndpoints(int startFrom)
{
try
{
// Here we deserialize mutations 2nd time from byte buffer.
// but this is ok, because timeout on batch direct delivery is rare
// (it can happen only several seconds until node is marked dead)
// so trading some cpu to keep less objects
List<Mutation> replayingMutations = replayingMutations();
for (int i = startFrom; i < replayHandlers.size(); i++)
{
Mutation undeliveredMutation = replayingMutations.get(i);
int ttl = calculateHintTTL(replayingMutations);
ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
if (ttl > 0 && handler != null)
for (InetAddress endpoint : handler.undelivered)
StorageProxy.writeHintForMutation(undeliveredMutation, writtenAt, ttl, endpoint);
}
}
catch (IOException e)
{
logger.error("Cannot schedule hints for undelivered batch", e);
}
}
private List<ReplayWriteResponseHandler<Mutation>> sendReplays(List<Mutation> mutations, long writtenAt, int ttl)
{
List<ReplayWriteResponseHandler<Mutation>> handlers = new ArrayList<>(mutations.size());
for (Mutation mutation : mutations)
{
ReplayWriteResponseHandler<Mutation> handler = sendSingleReplayMutation(mutation, writtenAt, ttl);
if (handler != null)
handlers.add(handler);
}
return handlers;
}
/**
* We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
* when a replica is down or a write request times out.
*
* @return direct delivery handler to wait on or null, if no live nodes found
*/
private ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation, long writtenAt, int ttl)
{
Set<InetAddress> liveEndpoints = new HashSet<>();
String ks = mutation.getKeyspaceName();
Token tk = StorageService.getPartitioner().getToken(mutation.key());
for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
{
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
mutation.apply();
else if (FailureDetector.instance.isAlive(endpoint))
liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
else
StorageProxy.writeHintForMutation(mutation, writtenAt, ttl, endpoint);
}
if (liveEndpoints.isEmpty())
return null;
ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(liveEndpoints);
MessageOut<Mutation> message = mutation.createMessage();
for (InetAddress endpoint : liveEndpoints)
MessagingService.instance().sendRR(message, endpoint, handler, false);
return handler;
}
/*
* Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog).
* This ensures that deletes aren't "undone" by an old batch replay.
*/
private int calculateHintTTL(Collection<Mutation> mutations)
{
int unadjustedTTL = Integer.MAX_VALUE;
for (Mutation mutation : mutations)
unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation));
return unadjustedTTL - (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - writtenAt);
}
/**
* A wrapper of WriteResponseHandler that stores the addresses of the endpoints from
* which we did not receive a successful reply.
*/
private static class ReplayWriteResponseHandler<T> extends WriteResponseHandler<T>
{
private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<InetAddress, Boolean>());
public ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints)
{
super(writeEndpoints, Collections.<InetAddress>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH);
undelivered.addAll(writeEndpoints);
}
@Override
protected int totalBlockFor()
{
return this.naturalEndpoints.size();
}
@Override
public void response(MessageIn<T> m)
{
boolean removed = undelivered.remove(m == null ? FBUtilities.getBroadcastAddress() : m.from);
assert removed;
super.response(m);
}
}
}
// force flush + compaction to reclaim space from the replayed batches
private void cleanup() throws ExecutionException, InterruptedException
{
ColumnFamilyStore cfs = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHLOG);
cfs.forceBlockingFlush();
Collection<Descriptor> descriptors = new ArrayList<>();
for (SSTableReader sstr : cfs.getSSTables())
descriptors.add(sstr.descriptor);
if (!descriptors.isEmpty()) // don't pollute the logs if there is nothing to compact.
CompactionManager.instance.submitUserDefined(cfs, descriptors, Integer.MAX_VALUE).get();
}
public static class EndpointFilter
{
private final String localRack;
private final Multimap<String, InetAddress> endpoints;
public EndpointFilter(String localRack, Multimap<String, InetAddress> endpoints)
{
this.localRack = localRack;
this.endpoints = endpoints;
}
/**
* @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks.
*/
public Collection<InetAddress> filter()
{
// special case for single-node data centers
if (endpoints.values().size() == 1)
return endpoints.values();
// strip out dead endpoints and localhost
ListMultimap<String, InetAddress> validated = ArrayListMultimap.create();
for (Map.Entry<String, InetAddress> entry : endpoints.entries())
if (isValid(entry.getValue()))
validated.put(entry.getKey(), entry.getValue());
if (validated.size() <= 2)
return validated.values();
if (validated.size() - validated.get(localRack).size() >= 2)
{
// we have enough endpoints in other racks
validated.removeAll(localRack);
}
if (validated.keySet().size() == 1)
{
// we have only 1 `other` rack
// pick up to two random nodes from there
List<InetAddress> otherRack = validated.get(validated.keySet().iterator().next());
Collections.shuffle(otherRack);
return Lists.newArrayList(Iterables.limit(otherRack, 2));
}
// randomize which racks we pick from if more than 2 remaining
Collection<String> racks;
if (validated.keySet().size() == 2)
{
racks = validated.keySet();
}
else
{
racks = Lists.newArrayList(validated.keySet());
Collections.shuffle((List) racks);
}
// grab a random member of up to two racks
List<InetAddress> result = new ArrayList<>(2);
for (String rack : Iterables.limit(racks, 2))
{
List<InetAddress> rackMembers = validated.get(rack);
result.add(rackMembers.get(getRandomInt(rackMembers.size())));
}
return result;
}
@VisibleForTesting
protected boolean isValid(InetAddress input)
{
return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input);
}
@VisibleForTesting
protected int getRandomInt(int bound)
{
return ThreadLocalRandom.current().nextInt(bound);
}
}
}