blob: b510c394b499a125c4869ef01266d9539b0d97e1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode_examples.cq;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.query.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Stopwatch;
public class Example {
private ClientCache cache;
private Region<Integer, Integer> region;
private CqQuery randomTracker;
private void init() throws CqException, RegionNotFoundException, CqExistsException {
// init cache, region, and CQ
// connect to the locator using default port 10334
this.cache = connectToLocallyRunningGeodge();
// create a local region that matches the server region
this.region = cache.<Integer, Integer>createClientRegionFactory(ClientRegionShortcut.PROXY)
.create("example-region");
this.randomTracker = this.startCQ(this.cache, this.region);
}
private void run() throws InterruptedException {
this.startPuttingData(this.region);
}
private void close() throws CqException {
// close the CQ and Cache
this.randomTracker.close();
this.cache.close();
}
public static void main(String[] args) throws Exception {
Example mExample = new Example();
mExample.init();
mExample.run();
mExample.close();
System.out.println("\n---- So that is CQ's----\n");
}
private CqQuery startCQ(ClientCache cache, Region region)
throws CqException, RegionNotFoundException, CqExistsException {
// Get cache and queryService - refs to local cache and QueryService
CqAttributesFactory cqf = new CqAttributesFactory();
cqf.addCqListener(new RandomEventListener());
CqAttributes cqa = cqf.create();
String cqName = "randomTracker";
String queryStr = "SELECT * FROM /example-region i where i > 70";
QueryService queryService = region.getRegionService().getQueryService();
CqQuery randomTracker = queryService.newCq(cqName, queryStr, cqa);
randomTracker.execute();
System.out.println("------- CQ is running\n");
return randomTracker;
}
private void startPuttingData(Region region) throws InterruptedException {
// Example will run for 20 second
Stopwatch stopWatch = Stopwatch.createStarted();
while (stopWatch.elapsed(TimeUnit.SECONDS) < 20) {
// 500ms delay to make this easier to follow
Thread.sleep(500);
int randomKey = ThreadLocalRandom.current().nextInt(0, 99 + 1);
int randomValue = ThreadLocalRandom.current().nextInt(0, 100 + 1);
region.put(randomKey, randomValue);
System.out.println("Key: " + randomKey + " Value: " + randomValue);
}
stopWatch.stop();
}
private ClientCache connectToLocallyRunningGeodge() {
ClientCache cache = new ClientCacheFactory().addPoolLocator("127.0.0.1", 10334)
.setPoolSubscriptionEnabled(true).set("log-level", "WARN").create();
return cache;
}
}