blob: d8b08df5cd05615490825c8f74f3028a334d338b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import java.io.IOException;
import java.util.Arrays;
import java.util.Set;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.server.ClientSubscriptionConfig;
import org.apache.geode.cache.server.ServerLoadProbe;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.admin.ClientMembershipMessage;
import org.apache.geode.internal.cache.xmlcache.CacheCreation;
import org.apache.geode.management.membership.ClientMembership;
import org.apache.geode.management.membership.ClientMembershipEvent;
import org.apache.geode.management.membership.ClientMembershipListener;
import org.apache.geode.util.internal.GeodeGlossary;
/**
* Abstract class that contains common code that all true implementations of
* {@link InternalCacheServer}
* can use.
*
* @since GemFire 5.7
*/
public abstract class AbstractCacheServer implements InternalCacheServer {
public static final String TEST_OVERRIDE_DEFAULT_PORT_PROPERTY =
GeodeGlossary.GEMFIRE_PREFIX + "test.CacheServer.OVERRIDE_DEFAULT_PORT";
/** The cache that is served by this cache server */
protected final InternalCache cache;
/** The port that the cache server was configured to run on */
protected int port;
/** The maximum number of connections that the BridgeServer will accept */
protected int maxConnections;
/** The maximum number of threads that the BridgeServer will create */
protected int maxThreads;
/** Whether the cache server notifies by subscription */
protected boolean notifyBySubscription = true;
/**
* The buffer size in bytes of the socket for this <code>BridgeServer</code>
*/
protected int socketBufferSize;
/**
* The tcpNoDelay setting for outgoing sockets
*/
protected boolean tcpNoDelay;
/**
* The maximum amount of time between client pings. This value is used by the
* <code>ClientHealthMonitor</code> to determine the health of this <code>BridgeServer</code>'s
* clients.
*/
protected int maximumTimeBetweenPings;
/** the maximum number of messages that can be enqueued in a client-queue. */
protected int maximumMessageCount;
/**
* the time (in seconds) after which a message in the client queue will expire.
*/
protected int messageTimeToLive;
/**
* The groups this server belongs to. Use <code>getGroups</code> to read.
*
* @since GemFire 5.7
*/
protected String[] groups;
protected ServerLoadProbe loadProbe;
/**
* The ip address or host name that this server is to listen on.
*
* @since GemFire 5.7
*/
protected String bindAddress;
/**
* The ip address or host name that will be given to clients so they can connect to this server
*
* @since GemFire 5.7
*/
protected String hostnameForClients;
/**
* How frequency to poll the load on this server.
*/
protected long loadPollInterval;
protected ClientSubscriptionConfig clientSubscriptionConfig;
/**
* Listens to client membership events and notifies any admin members as clients of this server
* leave/crash.
*/
protected final ClientMembershipListener listener;
////////////////////// Constructors //////////////////////
/**
* Creates a new <code>BridgeServer</code> with the default configuration.
*
* @param cache The cache being served
*/
public AbstractCacheServer(InternalCache cache) {
this(cache, true);
}
public AbstractCacheServer(InternalCache cache, boolean attachListener) {
this.cache = cache;
this.port = Integer.getInteger(TEST_OVERRIDE_DEFAULT_PORT_PROPERTY, CacheServer.DEFAULT_PORT);
this.maxConnections = CacheServer.DEFAULT_MAX_CONNECTIONS;
this.maxThreads = CacheServer.DEFAULT_MAX_THREADS;
this.socketBufferSize = CacheServer.DEFAULT_SOCKET_BUFFER_SIZE;
this.tcpNoDelay = CacheServer.DEFAULT_TCP_NO_DELAY;
this.maximumTimeBetweenPings = CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS;
this.maximumMessageCount = CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT;
this.messageTimeToLive = CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE;
this.groups = CacheServer.DEFAULT_GROUPS;
this.bindAddress = CacheServer.DEFAULT_BIND_ADDRESS;
this.hostnameForClients = CacheServer.DEFAULT_HOSTNAME_FOR_CLIENTS;
this.loadProbe = CacheServer.DEFAULT_LOAD_PROBE;
this.loadPollInterval = CacheServer.DEFAULT_LOAD_POLL_INTERVAL;
this.clientSubscriptionConfig = new ClientSubscriptionConfigImpl();
if (!attachListener) {
this.listener = null;
return;
}
listener = new ClientMembershipListener() {
@Override
public void memberJoined(ClientMembershipEvent event) {
if (event.isClient()) {
createAndSendMessage(event, ClientMembershipMessage.JOINED);
}
}
@Override
public void memberLeft(ClientMembershipEvent event) {
if (event.isClient()) {
createAndSendMessage(event, ClientMembershipMessage.LEFT);
}
}
@Override
public void memberCrashed(ClientMembershipEvent event) {
if (event.isClient()) {
createAndSendMessage(event, ClientMembershipMessage.CRASHED);
}
}
/**
* Method to create & send the ClientMembershipMessage to admin members. The message is sent
* only if there are any admin members in the distribution system.
*
* @param event describes a change in client membership
* @param type type of event - one of ClientMembershipMessage.JOINED,
* ClientMembershipMessage.LEFT, ClientMembershipMessage.CRASHED
*/
private void createAndSendMessage(ClientMembershipEvent event, int type) {
InternalDistributedSystem ds = null;
Cache cacheInstance = AbstractCacheServer.this.cache;
if (cacheInstance != null && !(cacheInstance instanceof CacheCreation)) {
ds = (InternalDistributedSystem) cacheInstance.getDistributedSystem();
} else {
ds = InternalDistributedSystem.getAnyInstance();
}
// ds could be null
if (ds != null && ds.isConnected()) {
DistributionManager dm = ds.getDistributionManager();
Set adminMemberSet = dm.getAdminMemberSet();
/* check if there are any admin members at all */
if (!adminMemberSet.isEmpty()) {
DistributedMember member = event.getMember();
ClientMembershipMessage msg = new ClientMembershipMessage(event.getMemberId(),
member == null ? null : member.getHost(), type);
msg.setRecipients(adminMemberSet);
dm.putOutgoing(msg);
}
}
}
};
ClientMembership.registerClientMembershipListener(listener);
}
///////////////////// Instance Methods /////////////////////
@Override
public int getPort() {
return this.port;
}
@Override
public void setPort(int port) {
this.port = port;
}
@Override
public String getBindAddress() {
return this.bindAddress;
}
@Override
public void setBindAddress(String address) {
this.bindAddress = address;
}
@Override
public String getHostnameForClients() {
return this.hostnameForClients;
}
@Override
public void setHostnameForClients(String name) {
this.hostnameForClients = name;
}
@Override
public int getMaxConnections() {
return this.maxConnections;
}
@Override
public void setMaxConnections(int maxCon) {
this.maxConnections = maxCon;
}
@Override
public int getMaxThreads() {
return this.maxThreads;
}
@Override
public void setMaxThreads(int maxThreads) {
this.maxThreads = maxThreads;
}
@Override
public void start() throws IOException {
// This method is invoked during testing, but it is not necessary
// to do anything.
}
@Override
public void setNotifyBySubscription(boolean b) {
// this.notifyBySubscription = true;
}
@Override
public boolean getNotifyBySubscription() {
return this.notifyBySubscription;
}
@Override
public void setSocketBufferSize(int socketBufferSize) {
this.socketBufferSize = socketBufferSize;
}
@Override
public int getSocketBufferSize() {
return this.socketBufferSize;
}
@Override
public void setMaximumTimeBetweenPings(int maximumTimeBetweenPings) {
this.maximumTimeBetweenPings = maximumTimeBetweenPings;
}
@Override
public int getMaximumTimeBetweenPings() {
return this.maximumTimeBetweenPings;
}
@Override
public int getMaximumMessageCount() {
return this.maximumMessageCount;
}
@Override
public void setMaximumMessageCount(int maximumMessageCount) {
this.maximumMessageCount = maximumMessageCount;
}
@Override
public int getMessageTimeToLive() {
return this.messageTimeToLive;
}
@Override
public void setMessageTimeToLive(int messageTimeToLive) {
this.messageTimeToLive = messageTimeToLive;
}
@Override
public void setGroups(String[] groups) {
if (groups == null) {
this.groups = CacheServer.DEFAULT_GROUPS;
} else if (groups.length > 0) {
// copy it for isolation
String[] copy = new String[groups.length];
System.arraycopy(groups, 0, copy, 0, groups.length);
this.groups = copy;
} else {
this.groups = CacheServer.DEFAULT_GROUPS; // keep findbugs happy
}
}
@Override
public String[] getGroups() {
String[] result = this.groups;
if (result.length > 0) {
// copy it for isolation
String[] copy = new String[result.length];
System.arraycopy(result, 0, copy, 0, result.length);
result = copy;
}
return result;
}
@Override
public ServerLoadProbe getLoadProbe() {
return loadProbe;
}
@Override
public void setLoadProbe(ServerLoadProbe loadProbe) {
this.loadProbe = loadProbe;
}
@Override
public long getLoadPollInterval() {
return loadPollInterval;
}
@Override
public void setLoadPollInterval(long loadPollInterval) {
this.loadPollInterval = loadPollInterval;
}
@Override
public void setTcpNoDelay(boolean setting) {
this.tcpNoDelay = setting;
}
@Override
public boolean getTcpNoDelay() {
return this.tcpNoDelay;
}
@Override
public InternalCache getCache() {
return this.cache;
}
private static boolean eq(String s1, String s2) {
if (s1 == null) {
return s2 == null;
} else {
return s1.equals(s2);
}
}
/**
* Returns whether or not this cache server has the same configuration as another cache server.
*/
public boolean sameAs(CacheServer other) {
return getPort() == other.getPort() && eq(getBindAddress(), other.getBindAddress())
&& getSocketBufferSize() == other.getSocketBufferSize()
&& getMaximumTimeBetweenPings() == other.getMaximumTimeBetweenPings()
&& getNotifyBySubscription() == other.getNotifyBySubscription()
&& getMaxConnections() == other.getMaxConnections()
&& getMaxThreads() == other.getMaxThreads()
&& getMaximumMessageCount() == other.getMaximumMessageCount()
&& getMessageTimeToLive() == other.getMessageTimeToLive()
&& Arrays.equals(getGroups(), other.getGroups())
&& getLoadProbe().equals(other.getLoadProbe())
&& getLoadPollInterval() == other.getLoadPollInterval()
&& getTcpNoDelay() == other.getTcpNoDelay();
}
}