| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.test.junit.rules; |
| |
| import static org.apache.geode.distributed.ConfigurationProperties.HTTP_SERVICE_BIND_ADDRESS; |
| import static org.apache.geode.distributed.ConfigurationProperties.HTTP_SERVICE_PORT; |
| import static org.apache.geode.distributed.ConfigurationProperties.START_DEV_REST_API; |
| import static org.apache.geode.test.awaitility.GeodeAwaitility.await; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| |
| import org.apache.geode.cache.CacheFactory; |
| import org.apache.geode.cache.RegionFactory; |
| import org.apache.geode.cache.RegionShortcut; |
| import org.apache.geode.cache.server.CacheServer; |
| import org.apache.geode.distributed.internal.DistributionConfig; |
| import org.apache.geode.distributed.internal.InternalDistributedSystem; |
| import org.apache.geode.internal.AvailablePortHelper; |
| import org.apache.geode.internal.cache.GemFireCacheImpl; |
| import org.apache.geode.internal.cache.InternalCache; |
| import org.apache.geode.internal.cache.tier.sockets.CacheServerHelper; |
| import org.apache.geode.pdx.PdxSerializer; |
| |
| /** |
| * This is a rule to start up a server in your current VM. It's useful for your Integration Tests. |
| * |
| * <p> |
| * This rules allows you to create/start a server using any @ConfigurationProperties, you can chain |
| * the configuration of the rule like this: ServerStarterRule server = new ServerStarterRule() |
| * .withProperty(key, value) .withName(name) .withProperties(properties) .withSecurityManager(class) |
| * .withJmxManager() .withRestService() .withEmbeddedLocator() .withRegion(type, name) etc, etc. If |
| * your rule calls withAutoStart(), the cache and server will be started before your test code. |
| * |
| * <p> |
| * In your test code, you can use the rule to access the server's attributes, like the port |
| * information, working dir, name, and the cache and cacheServer it creates. |
| * |
| * <p> |
| * If you need a rule to start a server/locator in different VMs for Distributed tests, You should |
| * use {@code ClusterStartupRule}. |
| */ |
| public class ServerStarterRule extends MemberStarterRule<ServerStarterRule> implements Server { |
| private transient InternalCache cache; |
| private transient List<CacheServer> servers = new ArrayList<>(); |
| private int embeddedLocatorPort = -1; |
| private boolean pdxPersistent = false; |
| private boolean pdxPersistentUserSet = false; |
| private PdxSerializer pdxSerializer = null; |
| private boolean pdxReadSerialized = false; |
| private boolean pdxReadSerializedUserSet = false; |
| // By default we start one server per jvm |
| private int serverCount = 1; |
| |
| private Map<String, RegionShortcut> regions = new HashMap<>(); |
| |
| @Override |
| public InternalCache getCache() { |
| return cache; |
| } |
| |
| @Override |
| public CacheServer getServer() { |
| return servers.get(0); |
| } |
| |
| public List<CacheServer> getServers() { |
| return servers; |
| } |
| |
| @Override |
| public void before() { |
| super.before(); |
| if (autoStart) { |
| startServer(); |
| regions.forEach((regionName, regionType) -> { |
| RegionFactory rf = getCache().createRegionFactory(regionType); |
| rf.create(regionName); |
| }); |
| } |
| } |
| |
| @Override |
| public void stopMember() { |
| for (CacheServer server : servers) { |
| server.stop(); |
| } |
| // make sure this cache is the one currently open. A server cache can be recreated due to |
| // importing a new set of cluster configuration. |
| cache = GemFireCacheImpl.getInstance(); |
| if (cache != null) { |
| try { |
| cache.close(); |
| } catch (Exception e) { |
| } finally { |
| cache = null; |
| } |
| } |
| servers.clear(); |
| } |
| |
| public ServerStarterRule withPDXPersistent() { |
| pdxPersistent = true; |
| pdxPersistentUserSet = true; |
| return this; |
| } |
| |
| public ServerStarterRule withPDXReadSerialized() { |
| pdxReadSerialized = true; |
| pdxReadSerializedUserSet = true; |
| return this; |
| } |
| |
| public ServerStarterRule withPdxSerializer(PdxSerializer pdxSerializer) { |
| this.pdxSerializer = pdxSerializer; |
| return this; |
| } |
| |
| /** |
| * If your only needs a cache and does not need a server for clients to connect |
| */ |
| public ServerStarterRule withNoCacheServer() { |
| this.serverCount = 0; |
| return this; |
| } |
| |
| public ServerStarterRule withServerCount(int serverCount) { |
| this.serverCount = serverCount; |
| return this; |
| } |
| |
| public ServerStarterRule withEmbeddedLocator() { |
| embeddedLocatorPort = AvailablePortHelper.getRandomAvailableTCPPort(); |
| properties.setProperty("start-locator", "localhost[" + embeddedLocatorPort + "]"); |
| return this; |
| } |
| |
| public ServerStarterRule withRestService() { |
| return withRestService(false); |
| } |
| |
| public ServerStarterRule withRestService(boolean useDefaultPort) { |
| properties.setProperty(START_DEV_REST_API, "true"); |
| properties.setProperty(HTTP_SERVICE_BIND_ADDRESS, "localhost"); |
| if (!useDefaultPort) { |
| httpPort = AvailablePortHelper.getRandomAvailableTCPPort(); |
| properties.setProperty(HTTP_SERVICE_PORT, httpPort + ""); |
| } else { |
| httpPort = 0; |
| } |
| return this; |
| } |
| |
| @Override |
| protected void normalizeProperties() { |
| super.normalizeProperties(); |
| if (httpPort < 0 && "true".equalsIgnoreCase(properties.getProperty(START_DEV_REST_API))) { |
| withRestService(); |
| } |
| } |
| |
| public ServerStarterRule withRegion(RegionShortcut type, String name) { |
| this.autoStart = true; |
| regions.put(name, type); |
| return this; |
| } |
| |
| public void startServer(Properties properties, int locatorPort) { |
| withProperties(properties).withConnectionToLocator(locatorPort).startServer(); |
| } |
| |
| public void startServer() { |
| CacheFactory cf = new CacheFactory(this.properties); |
| if (pdxPersistentUserSet) { |
| cf.setPdxPersistent(pdxPersistent); |
| } |
| if (pdxReadSerializedUserSet) { |
| cf.setPdxReadSerialized(pdxReadSerialized); |
| } |
| if (pdxSerializer != null) { |
| cf.setPdxSerializer(pdxSerializer); |
| } |
| cache = (InternalCache) cf.create(); |
| DistributionConfig config = |
| ((InternalDistributedSystem) cache.getDistributedSystem()).getConfig(); |
| jmxPort = config.getJmxManagerPort(); |
| httpPort = config.getHttpServicePort(); |
| |
| if (serverCount > 1 && memberPort != 0) { |
| throw new IllegalStateException("can't specify a member port when you have multiple port"); |
| } |
| |
| for (int i = 0; i < serverCount; i++) { |
| CacheServer server = cache.addCacheServer(); |
| if (i == 0) { |
| CacheServerHelper.setIsDefaultServer(server); |
| } |
| // memberPort is by default zero, which translates to "randomly select an available port," |
| // which is why it is updated after this try block |
| if (serverCount == 1) { |
| server.setPort(memberPort); |
| } else { |
| server.setPort(0); |
| } |
| try { |
| server.start(); |
| } catch (IOException e) { |
| throw new RuntimeException("unable to start server", e); |
| } |
| // if this member has multiple cache servers, the memberPort will be the last server's port |
| // started. |
| memberPort = server.getPort(); |
| servers.add(server); |
| } |
| } |
| |
| @Override |
| public int getEmbeddedLocatorPort() { |
| return embeddedLocatorPort; |
| } |
| |
| @Override |
| public void waitTilFullyReconnected() { |
| try { |
| await().until(() -> { |
| InternalDistributedSystem internalDistributedSystem = |
| InternalDistributedSystem.getConnectedInstance(); |
| return internalDistributedSystem != null |
| && internalDistributedSystem.getCache() != null |
| && !internalDistributedSystem.getCache().getCacheServers().isEmpty(); |
| }); |
| |
| } catch (Exception e) { |
| // provide more information when condition is not satisfied after awaitility timeout |
| InternalDistributedSystem ids = InternalDistributedSystem.getConnectedInstance(); |
| System.out.println("ds is: " + (ids != null ? "not null" : "null")); |
| System.out.println("cache is: " + (ids.getCache() != null ? "not null" : "null")); |
| System.out.println("has cache server: " |
| + (!ids.getCache().getCacheServers().isEmpty())); |
| throw e; |
| } |
| InternalDistributedSystem dm = InternalDistributedSystem.getConnectedInstance(); |
| cache = dm.getCache(); |
| } |
| } |