blob: 5c9e57533d3dd2ddfe1435368de777af9541e19f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.transport;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelMatcher;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.netty.util.internal.logging.Slf4JLoggerFactory;
import org.apache.cassandra.auth.AuthenticatedUser;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaChangeListener;
import org.apache.cassandra.service.*;
import org.apache.cassandra.transport.messages.EventMessage;
import org.apache.cassandra.utils.FBUtilities;
public class Server implements CassandraDaemon.Server
{
static
{
InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
}
private static final Logger logger = LoggerFactory.getLogger(Server.class);
private static final boolean useEpoll = NativeTransportService.useEpoll();
private final ConnectionTracker connectionTracker = new ConnectionTracker();
private final Connection.Factory connectionFactory = new Connection.Factory()
{
public Connection newConnection(Channel channel, ProtocolVersion version)
{
return new ServerConnection(channel, version, connectionTracker);
}
};
public final InetSocketAddress socket;
public final EncryptionOptions.TlsEncryptionPolicy tlsEncryptionPolicy;
private final AtomicBoolean isRunning = new AtomicBoolean(false);
private final PipelineConfigurator pipelineConfigurator;
private final EventLoopGroup workerGroup;
private Server (Builder builder)
{
this.socket = builder.getSocket();
this.tlsEncryptionPolicy = builder.tlsEncryptionPolicy;
if (builder.workerGroup != null)
{
workerGroup = builder.workerGroup;
}
else
{
if (useEpoll)
workerGroup = new EpollEventLoopGroup();
else
workerGroup = new NioEventLoopGroup();
}
pipelineConfigurator = builder.pipelineConfigurator != null
? builder.pipelineConfigurator
: new PipelineConfigurator(useEpoll,
DatabaseDescriptor.getRpcKeepAlive(),
DatabaseDescriptor.useNativeTransportLegacyFlusher(),
builder.tlsEncryptionPolicy);
EventNotifier notifier = builder.eventNotifier != null ? builder.eventNotifier : new EventNotifier();
notifier.registerConnectionTracker(connectionTracker);
StorageService.instance.register(notifier);
Schema.instance.registerListener(notifier);
}
public void stop()
{
if (isRunning.compareAndSet(true, false))
close();
}
public boolean isRunning()
{
return isRunning.get();
}
public synchronized void start()
{
if(isRunning())
return;
// Configure the server.
ChannelFuture bindFuture = pipelineConfigurator.initializeChannel(workerGroup, socket, connectionFactory);
if (!bindFuture.awaitUninterruptibly().isSuccess())
throw new IllegalStateException(String.format("Failed to bind port %d on %s.", socket.getPort(), socket.getAddress().getHostAddress()),
bindFuture.cause());
connectionTracker.allChannels.add(bindFuture.channel());
isRunning.set(true);
}
public int countConnectedClients()
{
return connectionTracker.countConnectedClients();
}
public Map<String, Integer> countConnectedClientsByUser()
{
return connectionTracker.countConnectedClientsByUser();
}
public List<ConnectedClient> getConnectedClients()
{
List<ConnectedClient> result = new ArrayList<>();
for (Channel c : connectionTracker.allChannels)
{
Connection conn = c.attr(Connection.attributeKey).get();
if (conn instanceof ServerConnection)
result.add(new ConnectedClient((ServerConnection) conn));
}
return result;
}
public List<ClientStat> recentClientStats()
{
return connectionTracker.protocolVersionTracker.getAll();
}
@Override
public void clearConnectionHistory()
{
connectionTracker.protocolVersionTracker.clear();
}
private void close()
{
// Close opened connections
connectionTracker.closeAll();
logger.info("Stop listening for CQL clients");
}
public static class Builder
{
private EventLoopGroup workerGroup;
private EncryptionOptions.TlsEncryptionPolicy tlsEncryptionPolicy = EncryptionOptions.TlsEncryptionPolicy.UNENCRYPTED;
private InetAddress hostAddr;
private int port = -1;
private InetSocketAddress socket;
private PipelineConfigurator pipelineConfigurator;
private EventNotifier eventNotifier;
public Builder withTlsEncryptionPolicy(EncryptionOptions.TlsEncryptionPolicy tlsEncryptionPolicy)
{
this.tlsEncryptionPolicy = tlsEncryptionPolicy;
return this;
}
public Builder withEventLoopGroup(EventLoopGroup eventLoopGroup)
{
this.workerGroup = eventLoopGroup;
return this;
}
public Builder withHost(InetAddress host)
{
this.hostAddr = host;
this.socket = null;
return this;
}
public Builder withPort(int port)
{
this.port = port;
this.socket = null;
return this;
}
public Builder withPipelineConfigurator(PipelineConfigurator configurator)
{
this.pipelineConfigurator = configurator;
return this;
}
public Builder withEventNotifier(EventNotifier eventNotifier)
{
this.eventNotifier = eventNotifier;
return this;
}
public Server build()
{
return new Server(this);
}
private InetSocketAddress getSocket()
{
if (this.socket != null)
return this.socket;
else
{
if (this.port == -1)
throw new IllegalStateException("Missing port number");
if (this.hostAddr != null)
this.socket = new InetSocketAddress(this.hostAddr, this.port);
else
throw new IllegalStateException("Missing host");
return this.socket;
}
}
}
public static class ConnectionTracker implements Connection.Tracker
{
private static final ChannelMatcher PRE_V5_CHANNEL = channel -> channel.attr(Connection.attributeKey)
.get()
.getVersion()
.isSmallerThan(ProtocolVersion.V5);
// TODO: should we be using the GlobalEventExecutor or defining our own?
public final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private final EnumMap<Event.Type, ChannelGroup> groups = new EnumMap<>(Event.Type.class);
private final ProtocolVersionTracker protocolVersionTracker = new ProtocolVersionTracker();
public ConnectionTracker()
{
for (Event.Type type : Event.Type.values())
groups.put(type, new DefaultChannelGroup(type.toString(), GlobalEventExecutor.INSTANCE));
}
public void addConnection(Channel ch, Connection connection)
{
allChannels.add(ch);
if (ch.remoteAddress() instanceof InetSocketAddress)
protocolVersionTracker.addConnection(((InetSocketAddress) ch.remoteAddress()).getAddress(), connection.getVersion());
}
public void register(Event.Type type, Channel ch)
{
groups.get(type).add(ch);
}
public void send(Event event)
{
ChannelGroup registered = groups.get(event.type);
EventMessage message = new EventMessage(event);
// Deliver event to pre-v5 channels
registered.writeAndFlush(message, PRE_V5_CHANNEL);
// Deliver event to post-v5 channels
for (Channel c : registered)
if (!PRE_V5_CHANNEL.matches(c))
c.attr(Dispatcher.EVENT_DISPATCHER).get().accept(message);
}
void closeAll()
{
allChannels.close().awaitUninterruptibly();
}
int countConnectedClients()
{
/*
- When server is running: allChannels contains all clients' connections (channels)
plus one additional channel used for the server's own bootstrap.
- When server is stopped: the size is 0
*/
return allChannels.size() != 0 ? allChannels.size() - 1 : 0;
}
Map<String, Integer> countConnectedClientsByUser()
{
Map<String, Integer> result = new HashMap<>();
for (Channel c : allChannels)
{
Connection connection = c.attr(Connection.attributeKey).get();
if (connection instanceof ServerConnection)
{
ServerConnection conn = (ServerConnection) connection;
AuthenticatedUser user = conn.getClientState().getUser();
String name = (null != user) ? user.getName() : null;
result.put(name, result.getOrDefault(name, 0) + 1);
}
}
return result;
}
}
private static class LatestEvent
{
public final Event.StatusChange.Status status;
public final Event.TopologyChange.Change topology;
private LatestEvent(Event.StatusChange.Status status, Event.TopologyChange.Change topology)
{
this.status = status;
this.topology = topology;
}
@Override
public String toString()
{
return String.format("Status %s, Topology %s", status, topology);
}
public static LatestEvent forStatusChange(Event.StatusChange.Status status, LatestEvent prev)
{
return new LatestEvent(status,
prev == null ?
null :
prev.topology);
}
public static LatestEvent forTopologyChange(Event.TopologyChange.Change change, LatestEvent prev)
{
return new LatestEvent(prev == null ?
null :
prev.status,
change);
}
}
public static class EventNotifier extends SchemaChangeListener implements IEndpointLifecycleSubscriber
{
private ConnectionTracker connectionTracker;
// We keep track of the latest status change events we have sent to avoid sending duplicates
// since StorageService may send duplicate notifications (CASSANDRA-7816, CASSANDRA-8236, CASSANDRA-9156)
private final Map<InetAddressAndPort, LatestEvent> latestEvents = new ConcurrentHashMap<>();
// We also want to delay delivering a NEW_NODE notification until the new node has set its RPC ready
// state. This tracks the endpoints which have joined, but not yet signalled they're ready for clients
private final Set<InetAddressAndPort> endpointsPendingJoinedNotification = ConcurrentHashMap.newKeySet();
private void registerConnectionTracker(ConnectionTracker connectionTracker)
{
this.connectionTracker = connectionTracker;
}
private InetAddressAndPort getNativeAddress(InetAddressAndPort endpoint)
{
try
{
return InetAddressAndPort.getByName(StorageService.instance.getNativeaddress(endpoint, true));
}
catch (UnknownHostException e)
{
// That should not happen, so log an error, but return the
// endpoint address since there's a good change this is right
logger.error("Problem retrieving RPC address for {}", endpoint, e);
return InetAddressAndPort.getByAddressOverrideDefaults(endpoint.address, DatabaseDescriptor.getNativeTransportPort());
}
}
private void send(InetAddressAndPort endpoint, Event.NodeEvent event)
{
if (logger.isTraceEnabled())
logger.trace("Sending event for endpoint {}, rpc address {}", endpoint, event.nodeAddressAndPort());
// If the endpoint is not the local node, extract the node address
// and if it is the same as our own RPC broadcast address (which defaults to the rcp address)
// then don't send the notification. This covers the case of rpc_address set to "localhost",
// which is not useful to any driver and in fact may cauase serious problems to some drivers,
// see CASSANDRA-10052
if (!endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) &&
event.nodeAddressAndPort().equals(FBUtilities.getBroadcastNativeAddressAndPort()))
return;
send(event);
}
private void send(Event event)
{
connectionTracker.send(event);
}
public void onJoinCluster(InetAddressAndPort endpoint)
{
if (!StorageService.instance.isRpcReady(endpoint))
endpointsPendingJoinedNotification.add(endpoint);
else
onTopologyChange(endpoint, Event.TopologyChange.newNode(getNativeAddress(endpoint)));
}
public void onLeaveCluster(InetAddressAndPort endpoint)
{
onTopologyChange(endpoint, Event.TopologyChange.removedNode(getNativeAddress(endpoint)));
}
public void onMove(InetAddressAndPort endpoint)
{
onTopologyChange(endpoint, Event.TopologyChange.movedNode(getNativeAddress(endpoint)));
}
public void onUp(InetAddressAndPort endpoint)
{
if (endpointsPendingJoinedNotification.remove(endpoint))
onJoinCluster(endpoint);
onStatusChange(endpoint, Event.StatusChange.nodeUp(getNativeAddress(endpoint)));
}
public void onDown(InetAddressAndPort endpoint)
{
onStatusChange(endpoint, Event.StatusChange.nodeDown(getNativeAddress(endpoint)));
}
private void onTopologyChange(InetAddressAndPort endpoint, Event.TopologyChange event)
{
if (logger.isTraceEnabled())
logger.trace("Topology changed event : {}, {}", endpoint, event.change);
LatestEvent prev = latestEvents.get(endpoint);
if (prev == null || prev.topology != event.change)
{
LatestEvent ret = latestEvents.put(endpoint, LatestEvent.forTopologyChange(event.change, prev));
if (ret == prev)
send(endpoint, event);
}
}
private void onStatusChange(InetAddressAndPort endpoint, Event.StatusChange event)
{
if (logger.isTraceEnabled())
logger.trace("Status changed event : {}, {}", endpoint, event.status);
LatestEvent prev = latestEvents.get(endpoint);
if (prev == null || prev.status != event.status)
{
LatestEvent ret = latestEvents.put(endpoint, LatestEvent.forStatusChange(event.status, null));
if (ret == prev)
send(endpoint, event);
}
}
public void onCreateKeyspace(String ksName)
{
send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, ksName));
}
public void onCreateTable(String ksName, String cfName)
{
send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TABLE, ksName, cfName));
}
public void onCreateType(String ksName, String typeName)
{
send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
}
public void onCreateFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
{
send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.FUNCTION,
ksName, functionName, AbstractType.asCQLTypeStringList(argTypes)));
}
public void onCreateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
{
send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.AGGREGATE,
ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
}
public void onAlterKeyspace(String ksName)
{
send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName));
}
public void onAlterTable(String ksName, String cfName, boolean affectsStatements)
{
send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TABLE, ksName, cfName));
}
public void onAlterType(String ksName, String typeName)
{
send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
}
public void onAlterFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
{
send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.FUNCTION,
ksName, functionName, AbstractType.asCQLTypeStringList(argTypes)));
}
public void onAlterAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
{
send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.AGGREGATE,
ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
}
public void onDropKeyspace(String ksName)
{
send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, ksName));
}
public void onDropTable(String ksName, String cfName)
{
send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TABLE, ksName, cfName));
}
public void onDropType(String ksName, String typeName)
{
send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TYPE, ksName, typeName));
}
public void onDropFunction(String ksName, String functionName, List<AbstractType<?>> argTypes)
{
send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.FUNCTION,
ksName, functionName, AbstractType.asCQLTypeStringList(argTypes)));
}
public void onDropAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes)
{
send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.AGGREGATE,
ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes)));
}
}
}