blob: 90e6787c6f710bc13e9ce18f6bf2f84fc1401e86 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.impl;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Function;
import javax.management.ListenerNotFoundException;
import javax.management.Notification;
import javax.management.NotificationListener;
import org.apache.cassandra.batchlog.BatchlogManager;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.SharedExecutorPool;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.CQLStatement;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.statements.ParsedStatement;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.IListen;
import org.apache.cassandra.distributed.api.IMessage;
import org.apache.cassandra.distributed.api.NodeToolResult;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbe;
import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbeFactory;
import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.hints.HintsService;
import org.apache.cassandra.index.SecondaryIndexManager;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.sstable.IndexSummaryManager;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.net.IMessageSink;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.LegacySchemaMigrator;
import org.apache.cassandra.service.CassandraDaemon;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.PendingRangeCalculatorService;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.StorageServiceMBean;
import org.apache.cassandra.streaming.StreamCoordinator;
import org.apache.cassandra.tools.NodeTool;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.DiagnosticSnapshotService;
import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.cassandra.utils.memory.BufferPool;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
public class Instance extends IsolatedExecutor implements IInvokableInstance
{
private static final Map<Class<?>, Function<Object, Object>> mapper = new HashMap<Class<?>, Function<Object, Object>>() {{
this.put(IInstanceConfig.ParameterizedClass.class, (obj) -> {
IInstanceConfig.ParameterizedClass pc = (IInstanceConfig.ParameterizedClass) obj;
return new org.apache.cassandra.config.ParameterizedClass(pc.class_name, pc.parameters);
});
}};
public final IInstanceConfig config;
// should never be invoked directly, so that it is instantiated on other class loader;
// only visible for inheritance
Instance(IInstanceConfig config, ClassLoader classLoader)
{
super("node" + config.num(), classLoader);
this.config = config;
InstanceIDDefiner.setInstanceId(config.num());
FBUtilities.setBroadcastInetAddress(config.broadcastAddress().getAddress());
// Set the config at instance creation, possibly before startup() has run on all other instances.
// setMessagingVersions below will call runOnInstance which will instantiate
// the MessagingService and dependencies preventing later changes to network parameters.
Config.setOverrideLoadConfig(() -> loadConfig(config));
}
@Override
public IInstanceConfig config()
{
return config;
}
@Override
public ICoordinator coordinator()
{
return new Coordinator(this);
}
public IListen listen()
{
return new Listen(this);
}
@Override
public InetSocketAddress broadcastAddress() { return config.broadcastAddress(); }
@Override
public SimpleQueryResult executeInternalWithResult(String query, Object... args)
{
return sync(() -> {
ParsedStatement.Prepared prepared = QueryProcessor.prepareInternal(query);
ResultMessage result = prepared.statement.executeInternal(QueryProcessor.internalQueryState(),
QueryProcessor.makeInternalOptions(prepared, args));
return RowUtil.toQueryResult(result);
}).call();
}
@Override
public UUID schemaVersion()
{
// we do not use method reference syntax here, because we need to sync on the node-local schema instance
//noinspection Convert2MethodRef
return Schema.instance.getVersion();
}
public void startup()
{
throw new UnsupportedOperationException();
}
public boolean isShutdown()
{
return isolatedExecutor.isShutdown();
}
@Override
public void schemaChangeInternal(String query)
{
sync(() -> {
try
{
ClientState state = ClientState.forInternalCalls();
state.setKeyspace(SystemKeyspace.NAME);
QueryState queryState = new QueryState(state);
CQLStatement statement = QueryProcessor.parseStatement(query, queryState).statement;
statement.validate(state);
QueryOptions options = QueryOptions.forInternalCalls(Collections.emptyList());
statement.executeInternal(queryState, options);
}
catch (Exception e)
{
throw new RuntimeException("Error setting schema for test (query was: " + query + ")", e);
}
}).run();
}
private void registerMockMessaging(ICluster<IInstance> cluster)
{
BiConsumer<InetSocketAddress, IMessage> deliverToInstance = (to, message) -> cluster.get(to).receiveMessage(message);
BiConsumer<InetSocketAddress, IMessage> deliverToInstanceIfNotFiltered = (to, message) -> {
int fromNum = config().num();
int toNum = cluster.get(to).config().num();
if (cluster.filters().permitOutbound(fromNum, toNum, message)
&& cluster.filters().permitInbound(fromNum, toNum, message))
deliverToInstance.accept(to, message);
};
Map<InetAddress, InetSocketAddress> addressAndPortMap = new HashMap<>();
cluster.stream().forEach(instance -> {
InetSocketAddress addressAndPort = instance.broadcastAddress();
if (!addressAndPort.equals(instance.config().broadcastAddress()))
throw new IllegalStateException("addressAndPort mismatch: " + addressAndPort + " vs " + instance.config().broadcastAddress());
InetSocketAddress prev = addressAndPortMap.put(addressAndPort.getAddress(),
addressAndPort);
if (null != prev)
throw new IllegalStateException("This version of Cassandra does not support multiple nodes with the same InetAddress: " + addressAndPort + " vs " + prev);
});
MessagingService.instance().addMessageSink(new MessageDeliverySink(deliverToInstanceIfNotFiltered, addressAndPortMap::get));
}
// unnecessary if registerMockMessaging used
private void registerFilters(ICluster cluster)
{
IInstance instance = this;
MessagingService.instance().addMessageSink(new IMessageSink()
{
public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress toAddress)
{
// Port is not passed in, so take a best guess at the destination port from this instance
IInstance to = cluster.get(NetworkTopology.addressAndPort(toAddress,
instance.config().broadcastAddress().getPort()));
int fromNum = config().num();
int toNum = to.config().num();
return cluster.filters().permitOutbound(fromNum, toNum, serializeMessage(message, id,
broadcastAddress(),
to.config().broadcastAddress()));
}
public boolean allowIncomingMessage(MessageIn message, int id)
{
// Port is not passed in, so take a best guess at the destination port from this instance
IInstance from = cluster.get(NetworkTopology.addressAndPort(message.from,
instance.config().broadcastAddress().getPort()));
int fromNum = from.config().num();
int toNum = config().num();
IMessage msg = serializeMessage(message, id, from.broadcastAddress(), broadcastAddress());
return cluster.filters().permitInbound(fromNum, toNum, msg);
}
});
}
public static IMessage serializeMessage(MessageOut messageOut, int id, InetSocketAddress from, InetSocketAddress to)
{
try (DataOutputBuffer out = new DataOutputBuffer(1024))
{
int version = MessagingService.instance().getVersion(to.getAddress());
out.writeInt(MessagingService.PROTOCOL_MAGIC);
out.writeInt(id);
long timestamp = System.currentTimeMillis();
out.writeInt((int) timestamp);
messageOut.serialize(out, version);
return new MessageImpl(messageOut.verb.ordinal(), out.toByteArray(), id, version, from);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
public static IMessage serializeMessage(MessageIn<?> messageIn, int id, InetSocketAddress from, InetSocketAddress to)
{
try (DataOutputBuffer out = new DataOutputBuffer(1024))
{
// Serialize header
int version = MessagingService.instance().getVersion(to.getAddress());
out.writeInt(MessagingService.PROTOCOL_MAGIC);
out.writeInt(id);
long timestamp = System.currentTimeMillis();
out.writeInt((int) timestamp);
// Serialize the message itself
IVersionedSerializer serializer = MessagingService.instance().verbSerializers.get(messageIn.verb);
CompactEndpointSerializationHelper.serialize(from.getAddress(), out);
out.writeInt(MessagingService.Verb.convertForMessagingServiceVersion(messageIn.verb, version).ordinal());
out.writeInt(messageIn.parameters.size());
for (Map.Entry<String, byte[]> entry : messageIn.parameters.entrySet())
{
out.writeUTF(entry.getKey());
out.writeInt(entry.getValue().length);
out.write(entry.getValue());
}
if (messageIn.payload != null && serializer != MessagingService.CallbackDeterminedSerializer.instance)
{
try (DataOutputBuffer dob = new DataOutputBuffer())
{
serializer.serialize(messageIn.payload, dob, version);
int size = dob.getLength();
out.writeInt(size);
out.write(dob.getData(), 0, size);
}
}
else
{
out.writeInt(0);
}
return new MessageImpl(messageIn.verb.ordinal(), out.toByteArray(), id, version, from);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
private class MessageDeliverySink implements IMessageSink
{
private final BiConsumer<InetSocketAddress, IMessage> deliver;
private final Function<InetAddress, InetSocketAddress> lookupAddressAndPort;
MessageDeliverySink(BiConsumer<InetSocketAddress, IMessage> deliver,
Function<InetAddress, InetSocketAddress> lookupAddressAndPort)
{
this.deliver = deliver;
this.lookupAddressAndPort = lookupAddressAndPort;
}
public boolean allowOutgoingMessage(MessageOut messageOut, int id, InetAddress to)
{
InetSocketAddress from = broadcastAddress();
assert from.equals(lookupAddressAndPort.apply(messageOut.from));
// Tracing logic - similar to org.apache.cassandra.net.OutboundTcpConnection.writeConnected
byte[] sessionBytes = (byte[]) messageOut.parameters.get(Tracing.TRACE_HEADER);
if (sessionBytes != null)
{
UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
TraceState state = Tracing.instance.get(sessionId);
String traceMessage = String.format("Sending %s message to %s", messageOut.verb, to);
// session may have already finished; see CASSANDRA-5668
if (state == null)
{
byte[] traceTypeBytes = (byte[]) messageOut.parameters.get(Tracing.TRACE_TYPE);
Tracing.TraceType traceType = traceTypeBytes == null ? Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]);
TraceState.mutateWithTracing(ByteBuffer.wrap(sessionBytes), traceMessage, -1, traceType.getTTL());
}
else
{
state.trace(traceMessage);
if (messageOut.verb == MessagingService.Verb.REQUEST_RESPONSE)
Tracing.instance.doneWithNonLocalSession(state);
}
}
InetSocketAddress toFull = lookupAddressAndPort.apply(to);
deliver.accept(toFull,
serializeMessage(messageOut, id, broadcastAddress(), toFull));
return false;
}
public boolean allowIncomingMessage(MessageIn message, int id)
{
// we can filter to our heart's content on the outgoing message; no need to worry about incoming
return true;
}
}
public static Pair<MessageIn<Object>, Integer> deserializeMessage(IMessage imessage)
{
// Based on org.apache.cassandra.net.IncomingTcpConnection.receiveMessage
try (DataInputBuffer input = new DataInputBuffer(imessage.bytes()))
{
int version = imessage.version();
if (version > MessagingService.current_version)
{
throw new IllegalStateException(String.format("Received message version %d but current version is %d",
version,
MessagingService.current_version));
}
MessagingService.validateMagic(input.readInt());
int id;
if (version < MessagingService.VERSION_20)
id = Integer.parseInt(input.readUTF());
else
id = input.readInt();
if (imessage.id() != id)
throw new IllegalStateException(String.format("Message id mismatch: %d != %d", imessage.id(), id));
// make sure to readInt, even if cross_node_to is not enabled
int partial = input.readInt();
return Pair.create(MessageIn.read(input, version, id), partial);
//long currentTime = ApproximateTime.currentTimeMillis();
//return MessageIn.read(input, version, id, MessageIn.readConstructionTime(imessage.from().getAddress(), input, currentTime));
}
catch (IOException e)
{
throw new RuntimeException();
}
}
public void receiveMessage(IMessage imessage)
{
sync(() -> {
Pair<MessageIn<Object>, Integer> deserialized = null;
try
{
deserialized = deserializeMessage(imessage);
}
catch (Throwable t)
{
throw new RuntimeException("Exception occurred on node " + broadcastAddress(), t);
}
MessageIn<Object> message = deserialized.left;
int partial = deserialized.right;
long timestamp = System.currentTimeMillis();
boolean isCrossNodeTimestamp = false;
if (DatabaseDescriptor.hasCrossNodeTimeout())
{
long crossNodeTimestamp = (timestamp & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2);
isCrossNodeTimestamp = (timestamp != crossNodeTimestamp);
timestamp = crossNodeTimestamp;
}
if (message == null)
{
// callback expired; nothing to do
return;
}
if (message.version <= MessagingService.current_version)
{
MessagingService.instance().receive(message, imessage.id(), timestamp, isCrossNodeTimestamp);
}
}).run();
}
public int getMessagingVersion()
{
return callsOnInstance(() -> MessagingService.current_version).call();
}
public void setMessagingVersion(InetSocketAddress endpoint, int version)
{
runOnInstance(() -> MessagingService.instance().setVersion(endpoint.getAddress(), version));
}
public void flush(String keyspace)
{
runOnInstance(() -> FBUtilities.waitOnFutures(Keyspace.open(keyspace).flush()));
}
public void forceCompact(String keyspace, String table)
{
runOnInstance(() -> {
try
{
Keyspace.open(keyspace).getColumnFamilyStore(table).forceMajorCompaction();
}
catch (Exception e)
{
throw new RuntimeException(e);
}
});
}
@Override
public void startup(ICluster cluster)
{
sync(() -> {
try
{
mkdirs();
assert config.networkTopology().contains(config.broadcastAddress());
DistributedTestSnitch.assign(config.networkTopology());
DatabaseDescriptor.setDaemonInitialized();
DatabaseDescriptor.createAllDirectories();
// We need to persist this as soon as possible after startup checks.
// This should be the first write to SystemKeyspace (CASSANDRA-11742)
SystemKeyspace.persistLocalMetadata();
LegacySchemaMigrator.migrate();
try
{
// load schema from disk
Schema.instance.loadFromDisk();
}
catch (Exception e)
{
throw e;
}
Keyspace.setInitialized();
// Replay any CommitLogSegments found on disk
try
{
CommitLog.instance.recover();
}
catch (IOException e)
{
throw new RuntimeException(e);
}
if (config.has(NETWORK))
{
registerFilters(cluster);
MessagingService.instance().listen();
}
else
{
// Even though we don't use MessagingService, access the static SocketFactory
// instance here so that we start the static event loop state
// -- not sure what that means? SocketFactory.instance.getClass();
registerMockMessaging(cluster);
}
// TODO: this is more than just gossip
if (config.has(GOSSIP))
{
StorageService.instance.initServer();
StorageService.instance.removeShutdownHook();
}
else
{
initializeRing(cluster);
}
StorageService.instance.ensureTraceKeyspace();
SystemKeyspace.finishStartup();
if (config.has(NATIVE_PROTOCOL))
{
CassandraDaemon.getInstanceForTesting().initializeNativeTransport();
CassandraDaemon.getInstanceForTesting().startNativeTransport();
StorageService.instance.setRpcReady(true);
}
if (!FBUtilities.getBroadcastAddress().equals(broadcastAddress().getAddress()))
throw new IllegalStateException();
if (DatabaseDescriptor.getStoragePort() != broadcastAddress().getPort())
throw new IllegalStateException();
}
catch (Throwable t)
{
if (t instanceof RuntimeException)
throw (RuntimeException) t;
throw new RuntimeException(t);
}
}).run();
}
private void mkdirs()
{
new File(config.getString("saved_caches_directory")).mkdirs();
new File(config.getString("hints_directory")).mkdirs();
new File(config.getString("commitlog_directory")).mkdirs();
for (String dir : (String[]) config.get("data_file_directories"))
new File(dir).mkdirs();
}
private static Config loadConfig(IInstanceConfig overrides)
{
Config config = new Config();
overrides.propagate(config, mapper);
return config;
}
private void initializeRing(ICluster cluster)
{
// This should be done outside instance in order to avoid serializing config
String partitionerName = config.getString("partitioner");
List<String> initialTokens = new ArrayList<>();
List<InetSocketAddress> hosts = new ArrayList<>();
List<UUID> hostIds = new ArrayList<>();
for (int i = 1 ; i <= cluster.size() ; ++i)
{
IInstanceConfig config = cluster.get(i).config();
initialTokens.add(config.getString("initial_token"));
hosts.add(config.broadcastAddress());
hostIds.add(config.hostId());
}
try
{
IPartitioner partitioner = FBUtilities.newPartitioner(partitionerName);
StorageService storageService = StorageService.instance;
List<Token> tokens = new ArrayList<>();
for (String token : initialTokens)
tokens.add(partitioner.getTokenFactory().fromString(token));
for (int i = 0; i < tokens.size(); i++)
{
InetSocketAddress ep = hosts.get(i);
UUID hostId = hostIds.get(i);
Token token = tokens.get(i);
Gossiper.runInGossipStageBlocking(() -> {
Gossiper.instance.initializeNodeUnsafe(ep.getAddress(), hostId, 1);
Gossiper.instance.injectApplicationState(ep.getAddress(),
ApplicationState.TOKENS,
new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(token)));
storageService.onChange(ep.getAddress(),
ApplicationState.STATUS,
new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token)));
Gossiper.instance.realMarkAlive(ep.getAddress(), Gossiper.instance.getEndpointStateForEndpoint(ep.getAddress()));
});
int messagingVersion = cluster.get(ep).isShutdown()
? MessagingService.current_version
: Math.min(MessagingService.current_version, cluster.get(ep).getMessagingVersion());
MessagingService.instance().setVersion(ep.getAddress(), messagingVersion);
}
// check that all nodes are in token metadata
for (int i = 0; i < tokens.size(); ++i)
assert storageService.getTokenMetadata().isMember(hosts.get(i).getAddress());
}
catch (Throwable e) // UnknownHostException
{
throw new RuntimeException(e);
}
}
public Future<Void> shutdown()
{
return shutdown(true);
}
@Override
public Future<Void> shutdown(boolean graceful)
{
if (!graceful)
MessagingService.instance().shutdown(false);
Future<?> future = async((ExecutorService executor) -> {
Throwable error = null;
error = parallelRun(error, executor,
() -> StorageService.instance.setRpcReady(false),
CassandraDaemon.getInstanceForTesting()::destroyNativeTransport);
if (config.has(GOSSIP) || config.has(NETWORK))
{
StorageService.instance.shutdownServer();
error = parallelRun(error, executor,
() -> NanoTimeToCurrentTimeMillis.shutdown(MINUTES.toMillis(1L))
);
}
error = parallelRun(error, executor,
() -> Gossiper.instance.stopShutdownAndWait(1L, MINUTES),
CompactionManager.instance::forceShutdown,
() -> BatchlogManager.instance.shutdownAndWait(1L, MINUTES),
HintsService.instance::shutdownBlocking,
() -> StreamCoordinator.shutdownAndWait(1L, MINUTES),
() -> SecondaryIndexManager.shutdownAndWait(1L, MINUTES),
() -> IndexSummaryManager.instance.shutdownAndWait(1L, MINUTES),
() -> ColumnFamilyStore.shutdownExecutorsAndWait(1L, MINUTES),
() -> PendingRangeCalculatorService.instance.shutdownExecutor(1L, MINUTES),
() -> BufferPool.shutdownLocalCleaner(1L, MINUTES),
() -> StorageService.instance.shutdownBGMonitorAndWait(1L, MINUTES),
() -> Ref.shutdownReferenceReaper(1L, MINUTES),
() -> Memtable.MEMORY_POOL.shutdownAndWait(1L, MINUTES),
() -> SSTableReader.shutdownBlocking(1L, MINUTES),
() -> DiagnosticSnapshotService.instance.shutdownAndWait(1L, MINUTES)
);
error = parallelRun(error, executor,
() -> ScheduledExecutors.shutdownAndWait(1L, MINUTES),
MessagingService.instance()::shutdown
);
error = parallelRun(error, executor,
() -> StageManager.shutdownAndWait(1L, MINUTES),
() -> SharedExecutorPool.SHARED.shutdownAndWait(1L, MINUTES)
);
error = parallelRun(error, executor,
CommitLog.instance::shutdownBlocking
);
Throwables.maybeFail(error);
}).apply(isolatedExecutor);
return CompletableFuture.runAsync(ThrowingRunnable.toRunnable(future::get), isolatedExecutor)
.thenRun(super::shutdown);
}
public int liveMemberCount()
{
return sync(() -> {
if (!DatabaseDescriptor.isDaemonInitialized() || !Gossiper.instance.isEnabled())
return 0;
return Gossiper.instance.getLiveMembers().size();
}).call();
}
public NodeToolResult nodetoolResult(boolean withNotifications, String... commandAndArgs)
{
return sync(() -> {
DTestNodeTool nodetool = new DTestNodeTool(withNotifications);
int rc = nodetool.execute(commandAndArgs);
return new NodeToolResult(commandAndArgs, rc, new ArrayList<>(nodetool.notifications.notifications), nodetool.latestError);
}).call();
}
private static class DTestNodeTool extends NodeTool {
private final StorageServiceMBean storageProxy;
private final CollectingNotificationListener notifications = new CollectingNotificationListener();
private Throwable latestError;
DTestNodeTool(boolean withNotifications) {
super(new InternalNodeProbeFactory(withNotifications));
storageProxy = new InternalNodeProbe(withNotifications).getStorageService();
storageProxy.addNotificationListener(notifications, null, null);
}
public int execute(String... args)
{
try
{
return super.execute(args);
}
finally
{
try
{
storageProxy.removeNotificationListener(notifications, null, null);
}
catch (ListenerNotFoundException e)
{
// ignored
}
}
}
protected void badUse(Exception e)
{
super.badUse(e);
latestError = e;
}
protected void err(Throwable e)
{
super.err(e);
latestError = e;
}
}
private static final class CollectingNotificationListener implements NotificationListener
{
private final List<Notification> notifications = new CopyOnWriteArrayList<>();
public void handleNotification(Notification notification, Object handback)
{
notifications.add(notification);
}
}
public void uncaughtException(Thread thread, Throwable throwable)
{
System.out.println(String.format("Exception %s occurred on thread %s", throwable.getMessage(), thread.getName()));
throwable.printStackTrace();
}
public long killAttempts()
{
return callOnInstance(InstanceKiller::getKillAttempts);
}
private static void shutdownAndWait(List<ExecutorService> executors) throws TimeoutException, InterruptedException
{
ExecutorUtils.shutdownNow(executors);
ExecutorUtils.awaitTermination(1L, MINUTES, executors);
}
private static Throwable parallelRun(Throwable accumulate, ExecutorService runOn, ThrowingRunnable ... runnables)
{
List<Future<Throwable>> results = new ArrayList<>();
for (ThrowingRunnable runnable : runnables)
{
results.add(runOn.submit(() -> {
try
{
runnable.run();
return null;
}
catch (Throwable t)
{
return t;
}
}));
}
for (Future<Throwable> future : results)
{
try
{
Throwable t = future.get();
if (t != null)
throw t;
}
catch (Throwable t)
{
accumulate = Throwables.merge(accumulate, t);
}
}
return accumulate;
}
}