blob: fac92418585d55b15d8c8ffdbaafd99e349d6c7a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.console.demo;
import java.io.File;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteServices;
import org.apache.ignite.Ignition;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.console.demo.service.DemoCachesLoadService;
import org.apache.ignite.console.demo.service.DemoComputeLoadService;
import org.apache.ignite.console.demo.service.DemoRandomCacheLoadService;
import org.apache.ignite.console.demo.service.DemoServiceClusterSingleton;
import org.apache.ignite.console.demo.service.DemoServiceKeyAffinity;
import org.apache.ignite.console.demo.service.DemoServiceMultipleInstances;
import org.apache.ignite.console.demo.service.DemoServiceNodeSingleton;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsConsistentIdProcessor;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.logger.slf4j.Slf4jLogger;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_JETTY_PORT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_NO_ASCII;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PERFORMANCE_SUGGESTIONS_DISABLED;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_QUIET;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER;
import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_DATA_REGION_INITIAL_SIZE;
import static org.apache.ignite.configuration.WALMode.LOG_ONLY;
import static org.apache.ignite.console.demo.AgentDemoUtils.newScheduledThreadPool;
import static org.apache.ignite.events.EventType.EVTS_DISCOVERY;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_REST_JETTY_ADDRS;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_REST_JETTY_PORT;
import static org.apache.ignite.internal.visor.util.VisorTaskUtils.VISOR_TASK_EVTS;
/**
* Demo for cluster features like SQL and Monitoring.
*
* Cache will be created and populated with data to query.
*/
public class AgentClusterDemo {
/** */
private static final Logger log = LoggerFactory.getLogger(AgentClusterDemo.class);
/** */
private static final AtomicBoolean initGuard = new AtomicBoolean();
/** */
private static final String SRV_NODE_NAME = "demo-server-";
/** */
private static final String CLN_NODE_NAME = "demo-client-";
/** */
private static final int NODE_CNT = 3;
/** */
private static final int WAL_SEGMENTS = 5;
/** WAL file segment size, 16MBytes. */
private static final int WAL_SEGMENT_SZ = 16 * 1024 * 1024;
/** */
private static CountDownLatch initLatch = new CountDownLatch(1);
/** */
private static volatile List<String> demoUrl;
/**
* Configure node.
*
* @param basePort Base port.
* @param gridIdx Ignite instance name index.
* @param client If {@code true} then start client node.
* @return IgniteConfiguration
*/
private static IgniteConfiguration igniteConfiguration(int basePort, int gridIdx, boolean client)
throws IgniteCheckedException {
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setGridLogger(new Slf4jLogger());
cfg.setIgniteInstanceName((client ? CLN_NODE_NAME : SRV_NODE_NAME) + gridIdx);
cfg.setLocalHost("127.0.0.1");
cfg.setEventStorageSpi(new MemoryEventStorageSpi());
cfg.setConsistentId(cfg.getIgniteInstanceName());
File workDir = new File(U.workDirectory(null, null), "demo-work");
cfg.setWorkDirectory(workDir.getAbsolutePath());
int[] evts = new int[EVTS_DISCOVERY.length + VISOR_TASK_EVTS.length];
System.arraycopy(EVTS_DISCOVERY, 0, evts, 0, EVTS_DISCOVERY.length);
System.arraycopy(VISOR_TASK_EVTS, 0, evts, EVTS_DISCOVERY.length, VISOR_TASK_EVTS.length);
cfg.setIncludeEventTypes(evts);
cfg.getConnectorConfiguration().setPort(basePort);
System.setProperty(IGNITE_JETTY_PORT, String.valueOf(basePort + 10));
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
int discoPort = basePort + 20;
ipFinder.setAddresses(Collections.singletonList("127.0.0.1:" + discoPort + ".." + (discoPort + NODE_CNT - 1)));
// Configure discovery SPI.
TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
discoSpi.setLocalPort(discoPort);
discoSpi.setIpFinder(ipFinder);
cfg.setDiscoverySpi(discoSpi);
TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
commSpi.setSharedMemoryPort(-1);
commSpi.setMessageQueueLimit(10);
int commPort = basePort + 30;
commSpi.setLocalPort(commPort);
cfg.setCommunicationSpi(commSpi);
cfg.setGridLogger(new Slf4jLogger(log));
cfg.setMetricsLogFrequency(0);
DataRegionConfiguration dataRegCfg = new DataRegionConfiguration();
dataRegCfg.setName("demo");
dataRegCfg.setMetricsEnabled(true);
dataRegCfg.setMaxSize(DFLT_DATA_REGION_INITIAL_SIZE);
dataRegCfg.setPersistenceEnabled(true);
DataStorageConfiguration dataStorageCfg = new DataStorageConfiguration();
dataStorageCfg.setMetricsEnabled(true);
dataStorageCfg.setStoragePath(PdsConsistentIdProcessor.DB_DEFAULT_FOLDER);
dataStorageCfg.setDefaultDataRegionConfiguration(dataRegCfg);
dataStorageCfg.setSystemRegionMaxSize(DFLT_DATA_REGION_INITIAL_SIZE);
dataStorageCfg.setWalMode(LOG_ONLY);
dataStorageCfg.setWalSegments(WAL_SEGMENTS);
dataStorageCfg.setWalSegmentSize(WAL_SEGMENT_SZ);
cfg.setDataStorageConfiguration(dataStorageCfg);
cfg.setClientMode(client);
return cfg;
}
/**
* Starts read and write from cache in background.
*
* @param services Distributed services on the grid.
*/
private static void deployServices(IgniteServices services) {
services.deployMultiple("Demo service: Multiple instances", new DemoServiceMultipleInstances(), 7, 3);
services.deployNodeSingleton("Demo service: Node singleton", new DemoServiceNodeSingleton());
services.deployClusterSingleton("Demo service: Cluster singleton", new DemoServiceClusterSingleton());
services.deployClusterSingleton("Demo caches load service", new DemoCachesLoadService(20));
services.deployKeyAffinitySingleton("Demo service: Key affinity singleton",
new DemoServiceKeyAffinity(), DemoCachesLoadService.CAR_CACHE_NAME, "id");
services.deployNodeSingleton("RandomCache load service", new DemoRandomCacheLoadService(20));
services.deployMultiple("Demo service: Compute load", new DemoComputeLoadService(), 2, 1);
}
/** */
public static List<String> getDemoUrl() {
return demoUrl;
}
/**
* Start ignite node with cacheEmployee and populate it with data.
*/
public static CountDownLatch tryStart() {
if (initGuard.compareAndSet(false, true)) {
log.info("DEMO: Starting embedded nodes for demo...");
System.setProperty(IGNITE_NO_ASCII, "true");
System.setProperty(IGNITE_QUIET, "false");
System.setProperty(IGNITE_UPDATE_NOTIFIER, "false");
System.setProperty(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, "20");
System.setProperty(IGNITE_PERFORMANCE_SUGGESTIONS_DISABLED, "true");
final AtomicInteger basePort = new AtomicInteger(60700);
final AtomicInteger cnt = new AtomicInteger(-1);
final ScheduledExecutorService execSrv = newScheduledThreadPool(1, "demo-nodes-start");
execSrv.scheduleAtFixedRate(new Runnable() {
@Override public void run() {
int idx = cnt.incrementAndGet();
int port = basePort.get();
boolean first = idx == 0;
try {
IgniteConfiguration cfg = igniteConfiguration(port, idx, false);
if (first) {
U.delete(Paths.get(cfg.getWorkDirectory()));
U.resolveWorkDirectory(
cfg.getWorkDirectory(),
cfg.getDataStorageConfiguration().getStoragePath(),
true
);
}
Ignite ignite = Ignition.start(cfg);
if (first) {
ClusterNode node = ignite.cluster().localNode();
Collection<String> jettyAddrs = node.attribute(ATTR_REST_JETTY_ADDRS);
if (jettyAddrs == null) {
Ignition.stopAll(true);
throw new IgniteException("DEMO: Failed to start Jetty REST server on embedded node");
}
String jettyHost = jettyAddrs.iterator().next();
Integer jettyPort = node.attribute(ATTR_REST_JETTY_PORT);
if (F.isEmpty(jettyHost) || jettyPort == null)
throw new IgniteException("DEMO: Failed to start Jetty REST handler on embedded node");
log.info("DEMO: Started embedded node for demo purpose [TCP binary port={}, Jetty REST port={}]", port, jettyPort);
demoUrl = Collections.singletonList(String.format("http://%s:%d", jettyHost, jettyPort));
initLatch.countDown();
}
}
catch (Throwable e) {
if (first) {
basePort.getAndAdd(50);
log.warn("DEMO: Failed to start embedded node.", e);
}
else
log.error("DEMO: Failed to start embedded node.", e);
}
finally {
if (idx == NODE_CNT) {
Ignite ignite = Ignition.ignite(SRV_NODE_NAME + 0);
if (ignite != null) {
ignite.cluster().active(true);
deployServices(ignite.services(ignite.cluster().forServers()));
}
log.info("DEMO: All embedded nodes for demo successfully started");
execSrv.shutdown();
}
}
}
}, 1, 5, TimeUnit.SECONDS);
}
return initLatch;
}
/** */
public static void stop() {
demoUrl = null;
Ignition.stopAll(true);
initLatch = new CountDownLatch(1);
initGuard.compareAndSet(true, false);
}
}