blob: da744743b733ab7fbb3dd0c7949a34ecb3ab1425 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.wan;
import static org.apache.geode.internal.AvailablePort.SOCKET;
import static org.apache.geode.internal.AvailablePort.getAddress;
import static org.apache.geode.internal.AvailablePort.getRandomAvailablePortInRange;
import static org.apache.geode.internal.AvailablePort.isPortAvailable;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import org.apache.logging.log4j.Logger;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.wan.GatewayReceiver;
import org.apache.geode.cache.wan.GatewayTransportFilter;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ResourceEvent;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalCacheServer;
import org.apache.geode.internal.net.SocketCreator;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* @since GemFire 7.0
*/
public class GatewayReceiverImpl implements GatewayReceiver {
private static final Logger logger = LogService.getLogger();
private final InternalCache cache;
private final String hostnameForSenders;
private final int startPort;
private final int endPort;
private final int maximumTimeBetweenPings;
private final int socketBufferSize;
private final boolean manualStart;
private final List<GatewayTransportFilter> gatewayTransportFilters;
private final String bindAddress;
private final Function<Integer, Boolean> isPortAvailableFunction;
private final Function<PortRange, Integer> getRandomAvailablePortInRangeFunction;
private volatile int port;
private volatile InternalCacheServer receiverServer;
GatewayReceiverImpl(final InternalCache cache, final int startPort, final int endPort,
final int maximumTimeBetweenPings, final int socketBufferSize, final String bindAddress,
final List<GatewayTransportFilter> gatewayTransportFilters, final String hostnameForSenders,
final boolean manualStart) {
this(cache, startPort, endPort, maximumTimeBetweenPings, socketBufferSize, bindAddress,
gatewayTransportFilters, hostnameForSenders, manualStart,
port -> isPortAvailable(port, SOCKET, getAddress(SOCKET)),
portRange -> getRandomAvailablePortInRange(portRange.startPort, portRange.endPort, SOCKET));
}
@VisibleForTesting
GatewayReceiverImpl(final InternalCache cache, final int startPort, final int endPort,
final int maximumTimeBetweenPings, final int socketBufferSize, final String bindAddress,
final List<GatewayTransportFilter> gatewayTransportFilters, final String hostnameForSenders,
final boolean manualStart, final boolean isPortAvailableResult,
final int getRandomAvailablePortInRangeResult) {
this(cache, startPort, endPort, maximumTimeBetweenPings, socketBufferSize, bindAddress,
gatewayTransportFilters, hostnameForSenders, manualStart, port -> isPortAvailableResult,
portRange -> getRandomAvailablePortInRangeResult);
}
private GatewayReceiverImpl(final InternalCache cache, final int startPort, final int endPort,
final int maximumTimeBetweenPings, final int socketBufferSize, final String bindAddress,
final List<GatewayTransportFilter> gatewayTransportFilters, final String hostnameForSenders,
final boolean manualStart, final Function<Integer, Boolean> isPortAvailableFunction,
final Function<PortRange, Integer> getRandomAvailablePortInRangeFunction) {
this.cache = cache;
this.hostnameForSenders = hostnameForSenders;
this.startPort = startPort;
this.endPort = endPort;
this.maximumTimeBetweenPings = maximumTimeBetweenPings;
this.socketBufferSize = socketBufferSize;
this.bindAddress = bindAddress;
this.gatewayTransportFilters = gatewayTransportFilters;
this.manualStart = manualStart;
this.isPortAvailableFunction = isPortAvailableFunction;
this.getRandomAvailablePortInRangeFunction = getRandomAvailablePortInRangeFunction;
}
@Override
public String getHostnameForSenders() {
return hostnameForSenders;
}
@Override
public String getHost() {
if (receiverServer != null) {
return receiverServer.getExternalAddress();
}
if (hostnameForSenders != null && !hostnameForSenders.isEmpty()) {
return hostnameForSenders;
}
if (bindAddress != null && !bindAddress.isEmpty()) {
return bindAddress;
}
try {
return SocketCreator.getLocalHost().getHostName();
} catch (UnknownHostException e) {
throw new IllegalStateException("Could not get host name", e);
}
}
@Override
public List<GatewayTransportFilter> getGatewayTransportFilters() {
return gatewayTransportFilters;
}
@Override
public int getMaximumTimeBetweenPings() {
return maximumTimeBetweenPings;
}
@Override
public int getPort() {
return port;
}
@Override
public int getStartPort() {
return startPort;
}
@Override
public int getEndPort() {
return endPort;
}
@Override
public int getSocketBufferSize() {
return socketBufferSize;
}
@Override
public boolean isManualStart() {
return manualStart;
}
@Override
public CacheServer getServer() {
return receiverServer;
}
private boolean tryToStart(int port) {
if (!isPortAvailableFunction.apply(port)) {
return false;
}
CacheServer cacheServer = receiverServer;
cacheServer.setPort(port);
cacheServer.setSocketBufferSize(socketBufferSize);
cacheServer.setMaximumTimeBetweenPings(maximumTimeBetweenPings);
if (hostnameForSenders != null && !hostnameForSenders.isEmpty()) {
cacheServer.setHostnameForClients(hostnameForSenders);
}
cacheServer.setBindAddress(bindAddress);
cacheServer.setGroups(new String[] {GatewayReceiver.RECEIVER_GROUP});
try {
cacheServer.start();
this.port = port;
return true;
} catch (IOException e) {
logger.info("Failed to create server socket on {}[{}]", bindAddress, port);
return false;
}
}
@Override
public void start() {
if (receiverServer == null) {
receiverServer = cache.addGatewayReceiverServer(this);
}
if (receiverServer.isRunning()) {
logger.warn("Gateway Receiver is already running");
return;
}
int loopStartPort = getPortToStart();
int port = loopStartPort;
while (!tryToStart(port)) {
// get next port to try
if (port == endPort && startPort != endPort) {
port = startPort;
} else {
port++;
}
if (port == loopStartPort || port > endPort) {
throw new GatewayReceiverException("No available free port found in the given range (" +
startPort + "-" + endPort + ")");
}
}
logger.info("The GatewayReceiver started on port : {}", this.port);
InternalDistributedSystem system = cache.getInternalDistributedSystem();
system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_START, this);
}
private int getPortToStart() {
// choose a random port from the given port range
int randomPort;
if (startPort == endPort) {
randomPort = startPort;
} else {
randomPort = getRandomAvailablePortInRangeFunction.apply(new PortRange(startPort, endPort));
}
return randomPort;
}
@Override
public void stop() {
if (!isRunning()) {
throw new GatewayReceiverException("Gateway Receiver is not running");
}
receiverServer.stop();
}
@Override
public void destroy() {
logger.info("Destroying Gateway Receiver: {}", this);
if (receiverServer == null) {
// receiver was not started
cache.removeGatewayReceiver(this);
} else {
if (receiverServer.isRunning()) {
throw new GatewayReceiverException(
"Gateway Receiver is running and needs to be stopped first");
}
cache.removeGatewayReceiver(this);
cache.removeGatewayReceiverServer(receiverServer);
}
InternalDistributedSystem system = cache.getInternalDistributedSystem();
system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_DESTROY, this);
}
@Override
public String getBindAddress() {
return bindAddress;
}
@Override
public boolean isRunning() {
if (receiverServer != null) {
return receiverServer.isRunning();
}
return false;
}
@Override
public String toString() {
return new StringBuilder().append("Gateway Receiver").append("@")
.append(Integer.toHexString(hashCode())).append("'; port=").append(getPort())
.append("; bindAddress=").append(getBindAddress()).append("'; hostnameForSenders=")
.append(getHostnameForSenders()).append("; maximumTimeBetweenPings=")
.append(getMaximumTimeBetweenPings()).append("; socketBufferSize=")
.append(getSocketBufferSize()).append("; isManualStart=").append(isManualStart())
.append("; group=").append(Arrays.toString(new String[] {GatewayReceiver.RECEIVER_GROUP}))
.append("]").toString();
}
private static class PortRange {
private final int startPort;
private final int endPort;
private PortRange(int startPort, int endPort) {
this.startPort = startPort;
this.endPort = endPort;
}
}
}