blob: 774715c83cf86e2c3b624673985e11724c42f45f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.console.demo.service;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.QueryIndexType;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.console.demo.AgentDemoUtils;
import org.apache.ignite.console.demo.model.Car;
import org.apache.ignite.console.demo.model.Country;
import org.apache.ignite.console.demo.model.Department;
import org.apache.ignite.console.demo.model.Employee;
import org.apache.ignite.console.demo.model.Parking;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceContext;
import org.apache.ignite.transactions.Transaction;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
* Demo service. Create and populate caches. Run demo load on caches.
*/
public class DemoCachesLoadService implements Service {
/** Ignite instance. */
@IgniteInstanceResource
private Ignite ignite;
/** Thread pool to execute cache load operations. */
private ScheduledExecutorService cachePool;
/** */
private static final String COUNTRY_CACHE_NAME = "CountryCache";
/** */
private static final String DEPARTMENT_CACHE_NAME = "DepartmentCache";
/** */
private static final String EMPLOYEE_CACHE_NAME = "EmployeeCache";
/** */
private static final String PARKING_CACHE_NAME = "ParkingCache";
/** */
public static final String CAR_CACHE_NAME = "CarCache";
/** */
static final Set<String> DEMO_CACHES = new HashSet<>(Arrays.asList(COUNTRY_CACHE_NAME,
DEPARTMENT_CACHE_NAME, EMPLOYEE_CACHE_NAME, PARKING_CACHE_NAME, CAR_CACHE_NAME));
/** Countries count. */
private static final int CNTR_CNT = 10;
/** Departments count */
private static final int DEP_CNT = 100;
/** Employees count. */
private static final int EMPL_CNT = 1000;
/** Countries count. */
private static final int CAR_CNT = 100;
/** Departments count */
private static final int PARK_CNT = 10;
/** */
private static final Random rnd = new Random();
/** Maximum count read/write key. */
private final int cnt;
/** Time range in milliseconds. */
private final long range;
/**
* @param cnt Maximum count read/write key.
*/
public DemoCachesLoadService(int cnt) {
this.cnt = cnt;
range = new java.util.Date().getTime();
}
/** {@inheritDoc} */
@Override public void cancel(ServiceContext ctx) {
if (cachePool != null)
cachePool.shutdownNow();
}
/** {@inheritDoc} */
@Override public void init(ServiceContext ctx) throws Exception {
ignite.getOrCreateCaches(Arrays.asList(
cacheCountry(), cacheDepartment(), cacheEmployee(), cacheCar(), cacheParking()
));
populateCacheEmployee();
populateCacheCar();
cachePool = AgentDemoUtils.newScheduledThreadPool(2, "demo-sql-load-cache-tasks");
}
/** {@inheritDoc} */
@Override public void execute(ServiceContext ctx) throws Exception {
cachePool.scheduleWithFixedDelay(new Runnable() {
@Override public void run() {
try {
IgniteCache<Integer, Employee> cacheEmployee = ignite.cache(EMPLOYEE_CACHE_NAME);
if (cacheEmployee != null)
try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
for (int i = 0, n = 1; i < cnt; i++, n++) {
Integer id = rnd.nextInt(EMPL_CNT);
Integer depId = rnd.nextInt(DEP_CNT);
double r = rnd.nextDouble();
cacheEmployee.put(id, new Employee(id, depId, depId, "First name employee #" + n,
"Last name employee #" + n, "Email employee #" + n, "Phone number employee #" + n,
new java.sql.Date((long)(r * range)), "Job employee #" + n,
500 + AgentDemoUtils.round(r * 2000, 2)));
if (rnd.nextBoolean())
cacheEmployee.remove(rnd.nextInt(EMPL_CNT));
cacheEmployee.get(rnd.nextInt(EMPL_CNT));
}
if (rnd.nextInt(100) > 20)
tx.commit();
}
}
catch (Throwable e) {
if (!e.getMessage().contains("cache is stopped"))
ignite.log().error("Cache write task execution error", e);
}
}
}, 10, 3, TimeUnit.SECONDS);
cachePool.scheduleWithFixedDelay(new Runnable() {
@Override public void run() {
try {
IgniteCache<Integer, Car> cache = ignite.cache(CAR_CACHE_NAME);
if (cache != null)
for (int i = 0; i < cnt; i++) {
Integer carId = rnd.nextInt(CAR_CNT);
cache.put(carId, new Car(carId, rnd.nextInt(PARK_CNT), "Car #" + (i + 1)));
if (rnd.nextBoolean())
cache.remove(rnd.nextInt(CAR_CNT));
}
}
catch (IllegalStateException ignored) {
// No-op.
}
catch (Throwable e) {
if (!e.getMessage().contains("cache is stopped"))
ignite.log().error("Cache write task execution error", e);
}
}
}, 10, 3, TimeUnit.SECONDS);
}
/**
* Create base cache configuration.
*
* @param name cache name.
* @return Cache configuration with basic properties set.
*/
private static CacheConfiguration cacheConfiguration(String name) {
CacheConfiguration ccfg = new CacheConfiguration<>(name);
ccfg.setAffinity(new RendezvousAffinityFunction(false, 32));
ccfg.setQueryDetailMetricsSize(10);
ccfg.setStatisticsEnabled(true);
ccfg.setSqlFunctionClasses(SQLFunctions.class);
ccfg.setDataRegionName("demo");
return ccfg;
}
/**
* Configure cacheCountry.
*/
private static CacheConfiguration cacheCountry() {
CacheConfiguration ccfg = cacheConfiguration(COUNTRY_CACHE_NAME);
// Configure cacheCountry types.
Collection<QueryEntity> qryEntities = new ArrayList<>();
// COUNTRY.
QueryEntity type = new QueryEntity();
qryEntities.add(type);
type.setKeyType(Integer.class.getName());
type.setValueType(Country.class.getName());
// Query fields for COUNTRY.
LinkedHashMap<String, String> qryFlds = new LinkedHashMap<>();
qryFlds.put("id", "java.lang.Integer");
qryFlds.put("name", "java.lang.String");
qryFlds.put("population", "java.lang.Integer");
type.setFields(qryFlds);
ccfg.setQueryEntities(qryEntities);
return ccfg;
}
/**
* Configure cacheEmployee.
*/
private static CacheConfiguration cacheDepartment() {
CacheConfiguration ccfg = cacheConfiguration(DEPARTMENT_CACHE_NAME);
// Configure cacheDepartment types.
Collection<QueryEntity> qryEntities = new ArrayList<>();
// DEPARTMENT.
QueryEntity type = new QueryEntity();
qryEntities.add(type);
type.setKeyType(Integer.class.getName());
type.setValueType(Department.class.getName());
// Query fields for DEPARTMENT.
LinkedHashMap<String, String> qryFlds = new LinkedHashMap<>();
qryFlds.put("id", "java.lang.Integer");
qryFlds.put("countryId", "java.lang.Integer");
qryFlds.put("name", "java.lang.String");
type.setFields(qryFlds);
// Indexes for DEPARTMENT.
ArrayList<QueryIndex> indexes = new ArrayList<>();
indexes.add(new QueryIndex("countryId", QueryIndexType.SORTED, false, "DEP_COUNTRY"));
type.setIndexes(indexes);
ccfg.setQueryEntities(qryEntities);
return ccfg;
}
/**
* Configure cacheEmployee.
*/
private static CacheConfiguration cacheEmployee() {
CacheConfiguration ccfg = cacheConfiguration(EMPLOYEE_CACHE_NAME);
ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
ccfg.setBackups(1);
// Configure cacheEmployee types.
Collection<QueryEntity> qryEntities = new ArrayList<>();
// EMPLOYEE.
QueryEntity type = new QueryEntity();
qryEntities.add(type);
type.setKeyType(Integer.class.getName());
type.setValueType(Employee.class.getName());
// Query fields for EMPLOYEE.
LinkedHashMap<String, String> qryFlds = new LinkedHashMap<>();
qryFlds.put("id", "java.lang.Integer");
qryFlds.put("departmentId", "java.lang.Integer");
qryFlds.put("managerId", "java.lang.Integer");
qryFlds.put("firstName", "java.lang.String");
qryFlds.put("lastName", "java.lang.String");
qryFlds.put("email", "java.lang.String");
qryFlds.put("phoneNumber", "java.lang.String");
qryFlds.put("hireDate", "java.sql.Date");
qryFlds.put("job", "java.lang.String");
qryFlds.put("salary", "java.lang.Double");
type.setFields(qryFlds);
// Indexes for EMPLOYEE.
Collection<QueryIndex> indexes = new ArrayList<>();
indexes.add(new QueryIndex("departmentId", QueryIndexType.SORTED, false, "EMP_DEPARTMENT"));
indexes.add(new QueryIndex("managerId", QueryIndexType.SORTED, false, "EMP_MANAGER"));
QueryIndex idx = new QueryIndex();
idx.setName("EMP_NAMES");
idx.setIndexType(QueryIndexType.SORTED);
LinkedHashMap<String, Boolean> indFlds = new LinkedHashMap<>();
indFlds.put("firstName", Boolean.FALSE);
indFlds.put("lastName", Boolean.FALSE);
idx.setFields(indFlds);
indexes.add(idx);
indexes.add(new QueryIndex("salary", QueryIndexType.SORTED, false, "EMP_SALARY"));
type.setIndexes(indexes);
ccfg.setQueryEntities(qryEntities);
return ccfg;
}
/**
* Configure cacheEmployee.
*/
private static CacheConfiguration cacheParking() {
CacheConfiguration ccfg = cacheConfiguration(PARKING_CACHE_NAME);
// Configure cacheParking types.
Collection<QueryEntity> qryEntities = new ArrayList<>();
// PARKING.
QueryEntity type = new QueryEntity();
qryEntities.add(type);
type.setKeyType(Integer.class.getName());
type.setValueType(Parking.class.getName());
// Query fields for PARKING.
LinkedHashMap<String, String> qryFlds = new LinkedHashMap<>();
qryFlds.put("id", "java.lang.Integer");
qryFlds.put("name", "java.lang.String");
qryFlds.put("capacity", "java.lang.Integer");
type.setFields(qryFlds);
ccfg.setQueryEntities(qryEntities);
return ccfg;
}
/**
* Configure cacheEmployee.
*/
private static CacheConfiguration cacheCar() {
CacheConfiguration ccfg = cacheConfiguration(CAR_CACHE_NAME);
// Configure cacheCar types.
Collection<QueryEntity> qryEntities = new ArrayList<>();
// CAR.
QueryEntity type = new QueryEntity();
qryEntities.add(type);
type.setKeyType(Integer.class.getName());
type.setValueType(Car.class.getName());
// Query fields for CAR.
LinkedHashMap<String, String> qryFlds = new LinkedHashMap<>();
qryFlds.put("id", "java.lang.Integer");
qryFlds.put("parkingId", "java.lang.Integer");
qryFlds.put("name", "java.lang.String");
type.setFields(qryFlds);
// Indexes for CAR.
ArrayList<QueryIndex> indexes = new ArrayList<>();
indexes.add(new QueryIndex("parkingId", QueryIndexType.SORTED, false, "CAR_PARKING"));
type.setIndexes(indexes);
ccfg.setQueryEntities(qryEntities);
return ccfg;
}
/** */
private void populateCacheEmployee() {
if (ignite.log().isDebugEnabled())
ignite.log().debug("DEMO: Start employees population with data...");
IgniteCache<Integer, Country> cacheCountry = ignite.cache(COUNTRY_CACHE_NAME);
for (int i = 0, n = 1; i < CNTR_CNT; i++, n++)
cacheCountry.put(i, new Country(i, "Country #" + n, n * 10000000));
IgniteCache<Integer, Department> cacheDepartment = ignite.cache(DEPARTMENT_CACHE_NAME);
IgniteCache<Integer, Employee> cacheEmployee = ignite.cache(EMPLOYEE_CACHE_NAME);
for (int i = 0, n = 1; i < DEP_CNT; i++, n++) {
cacheDepartment.put(i, new Department(n, rnd.nextInt(CNTR_CNT), "Department #" + n));
double r = rnd.nextDouble();
cacheEmployee.put(i, new Employee(i, rnd.nextInt(DEP_CNT), null, "First name manager #" + n,
"Last name manager #" + n, "Email manager #" + n, "Phone number manager #" + n,
new java.sql.Date((long)(r * range)), "Job manager #" + n, 1000 + AgentDemoUtils.round(r * 4000, 2)));
}
for (int i = 0, n = 1; i < EMPL_CNT; i++, n++) {
Integer depId = rnd.nextInt(DEP_CNT);
double r = rnd.nextDouble();
cacheEmployee.put(i, new Employee(i, depId, depId, "First name employee #" + n,
"Last name employee #" + n, "Email employee #" + n, "Phone number employee #" + n,
new java.sql.Date((long)(r * range)), "Job employee #" + n, 500 + AgentDemoUtils.round(r * 2000, 2)));
}
if (ignite.log().isDebugEnabled())
ignite.log().debug("DEMO: Finished employees population.");
}
/** */
private void populateCacheCar() {
if (ignite.log().isDebugEnabled())
ignite.log().debug("DEMO: Start cars population...");
IgniteCache<Integer, Parking> cacheParking = ignite.cache(PARKING_CACHE_NAME);
for (int i = 0, n = 1; i < PARK_CNT; i++, n++)
cacheParking.put(i, new Parking(i, "Parking #" + n, n * 10));
IgniteCache<Integer, Car> cacheCar = ignite.cache(CAR_CACHE_NAME);
for (int i = 0, n = 1; i < CAR_CNT; i++, n++)
cacheCar.put(i, new Car(i, rnd.nextInt(PARK_CNT), "Car #" + n));
if (ignite.log().isDebugEnabled())
ignite.log().debug("DEMO: Finished cars population.");
}
/**
* Utility class with custom SQL functions.
*/
public static class SQLFunctions {
/**
* Sleep function to simulate long running queries.
*
* @param x Time to sleep.
* @return Return specified argument.
*/
@QuerySqlFunction
public static long sleep(long x) {
if (x >= 0)
try {
Thread.sleep(x);
}
catch (InterruptedException ignored) {
// No-op.
}
return x;
}
}
}