blob: eab4f7623588235798c95a6a71af02a3564998bf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.test.junit.rules;
import static org.apache.geode.distributed.ConfigurationProperties.HTTP_SERVICE_BIND_ADDRESS;
import static org.apache.geode.distributed.ConfigurationProperties.HTTP_SERVICE_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.START_DEV_REST_API;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.tier.sockets.CacheServerHelper;
import org.apache.geode.pdx.PdxSerializer;
/**
* This is a rule to start up a server in your current VM. It's useful for your Integration Tests.
*
* <p>
* This rules allows you to create/start a server using any @ConfigurationProperties, you can chain
* the configuration of the rule like this: ServerStarterRule server = new ServerStarterRule()
* .withProperty(key, value) .withName(name) .withProperties(properties) .withSecurityManager(class)
* .withJmxManager() .withRestService() .withEmbeddedLocator() .withRegion(type, name) etc, etc. If
* your rule calls withAutoStart(), the cache and server will be started before your test code.
*
* <p>
* In your test code, you can use the rule to access the server's attributes, like the port
* information, working dir, name, and the cache and cacheServer it creates.
*
* <p>
* If you need a rule to start a server/locator in different VMs for Distributed tests, You should
* use {@code ClusterStartupRule}.
*/
public class ServerStarterRule extends MemberStarterRule<ServerStarterRule> implements Server {
private transient InternalCache cache;
private transient List<CacheServer> servers = new ArrayList<>();
private int embeddedLocatorPort = -1;
private boolean pdxPersistent = false;
private boolean pdxPersistentUserSet = false;
private PdxSerializer pdxSerializer = null;
private boolean pdxReadSerialized = false;
private boolean pdxReadSerializedUserSet = false;
// By default we start one server per jvm
private int serverCount = 1;
private Map<String, RegionShortcut> regions = new HashMap<>();
@Override
public InternalCache getCache() {
return cache;
}
@Override
public CacheServer getServer() {
return servers.get(0);
}
public List<CacheServer> getServers() {
return servers;
}
@Override
public void before() {
super.before();
if (autoStart) {
startServer();
regions.forEach((regionName, regionType) -> {
RegionFactory rf = getCache().createRegionFactory(regionType);
rf.create(regionName);
});
}
}
@Override
public void stopMember() {
for (CacheServer server : servers) {
server.stop();
}
// make sure this cache is the one currently open. A server cache can be recreated due to
// importing a new set of cluster configuration.
cache = GemFireCacheImpl.getInstance();
if (cache != null) {
try {
cache.close();
} catch (Exception e) {
} finally {
cache = null;
}
}
servers.clear();
}
public ServerStarterRule withPDXPersistent() {
pdxPersistent = true;
pdxPersistentUserSet = true;
return this;
}
public ServerStarterRule withPDXReadSerialized() {
pdxReadSerialized = true;
pdxReadSerializedUserSet = true;
return this;
}
public ServerStarterRule withPdxSerializer(PdxSerializer pdxSerializer) {
this.pdxSerializer = pdxSerializer;
return this;
}
/**
* If your only needs a cache and does not need a server for clients to connect
*/
public ServerStarterRule withNoCacheServer() {
this.serverCount = 0;
return this;
}
public ServerStarterRule withServerCount(int serverCount) {
this.serverCount = serverCount;
return this;
}
public ServerStarterRule withEmbeddedLocator() {
embeddedLocatorPort = AvailablePortHelper.getRandomAvailableTCPPort();
properties.setProperty("start-locator", "localhost[" + embeddedLocatorPort + "]");
return this;
}
public ServerStarterRule withRestService() {
return withRestService(false);
}
public ServerStarterRule withRestService(boolean useDefaultPort) {
properties.setProperty(START_DEV_REST_API, "true");
properties.setProperty(HTTP_SERVICE_BIND_ADDRESS, "localhost");
if (!useDefaultPort) {
httpPort = AvailablePortHelper.getRandomAvailableTCPPort();
properties.setProperty(HTTP_SERVICE_PORT, httpPort + "");
} else {
httpPort = 0;
}
return this;
}
@Override
protected void normalizeProperties() {
super.normalizeProperties();
if (httpPort < 0 && "true".equalsIgnoreCase(properties.getProperty(START_DEV_REST_API))) {
withRestService();
}
}
public ServerStarterRule withRegion(RegionShortcut type, String name) {
this.autoStart = true;
regions.put(name, type);
return this;
}
public void startServer(Properties properties, int locatorPort) {
withProperties(properties).withConnectionToLocator(locatorPort).startServer();
}
public void startServer() {
CacheFactory cf = new CacheFactory(this.properties);
if (pdxPersistentUserSet) {
cf.setPdxPersistent(pdxPersistent);
}
if (pdxReadSerializedUserSet) {
cf.setPdxReadSerialized(pdxReadSerialized);
}
if (pdxSerializer != null) {
cf.setPdxSerializer(pdxSerializer);
}
cache = (InternalCache) cf.create();
DistributionConfig config =
((InternalDistributedSystem) cache.getDistributedSystem()).getConfig();
jmxPort = config.getJmxManagerPort();
httpPort = config.getHttpServicePort();
if (serverCount > 1 && memberPort != 0) {
throw new IllegalStateException("can't specify a member port when you have multiple port");
}
for (int i = 0; i < serverCount; i++) {
CacheServer server = cache.addCacheServer();
if (i == 0) {
CacheServerHelper.setIsDefaultServer(server);
}
// memberPort is by default zero, which translates to "randomly select an available port,"
// which is why it is updated after this try block
if (serverCount == 1) {
server.setPort(memberPort);
} else {
server.setPort(0);
}
try {
server.start();
} catch (IOException e) {
throw new RuntimeException("unable to start server", e);
}
// if this member has multiple cache servers, the memberPort will be the last server's port
// started.
memberPort = server.getPort();
servers.add(server);
}
}
@Override
public int getEmbeddedLocatorPort() {
return embeddedLocatorPort;
}
@Override
public void waitTilFullyReconnected() {
try {
await().until(() -> {
InternalDistributedSystem internalDistributedSystem =
InternalDistributedSystem.getConnectedInstance();
return internalDistributedSystem != null
&& internalDistributedSystem.getCache() != null
&& !internalDistributedSystem.getCache().getCacheServers().isEmpty();
});
} catch (Exception e) {
// provide more information when condition is not satisfied after awaitility timeout
InternalDistributedSystem ids = InternalDistributedSystem.getConnectedInstance();
System.out.println("ds is: " + (ids != null ? "not null" : "null"));
System.out.println("cache is: " + (ids.getCache() != null ? "not null" : "null"));
System.out.println("has cache server: "
+ (!ids.getCache().getCacheServers().isEmpty()));
throw e;
}
InternalDistributedSystem dm = InternalDistributedSystem.getConnectedInstance();
cache = dm.getCache();
}
}