blob: 5b516acca90e05fdd23c4e5e6f4dd104c0af40a5 [file] [log] [blame]
/*
* The CqQuery Example.
*
* This example takes the following steps:
*
*/
// Use standard namespaces
using System;
// Use the GemFire namespace
using GemStone.GemFire.Cache;
// Use the "Tests" namespace for the query objects.
using GemStone.GemFire.Cache.Tests;
namespace GemStone.GemFire.Cache.Examples
{
// The CqQuery example.
//User Listener
public class MyCqListener : ICqListener
{
private int m_updateCnt;
private int m_createCnt;
private int m_destroyCnt;
private int m_errorCnt;
private int m_eventCnt;
private int m_id;
bool m_verbose;
public MyCqListener(int id, bool verbose)
{
m_updateCnt = 0;
m_createCnt = 0;
m_destroyCnt = 0;
m_errorCnt = 0;
m_eventCnt = 0;
m_id = id;
m_verbose = verbose;
}
public virtual void OnEvent(CqEvent ev)
{
m_eventCnt++;
Portfolio val = ev.getNewValue() as Portfolio;
CacheableString key = ev.getKey() as CacheableString;
CqOperationType opType = ev.getQueryOperation();
CqQuery cq = ev.getCq();
string opStr = "DESTROY";
if (opType == CqOperationType.OP_TYPE_CREATE) {
m_createCnt++;
opStr = "CREATE";
}
else if (opType == CqOperationType.OP_TYPE_UPDATE) {
m_updateCnt++;
opStr = "UPDATE";
}
else if (opType == CqOperationType.OP_TYPE_DESTROY) {
m_destroyCnt++;
opStr = "DESTROY";
}
if (m_eventCnt % 5000 == 0) {
if (m_verbose == true) {
Console.WriteLine("MyCqListener{0}::OnEvent called with key {1}, value ({2},{3}), op {4}.", m_id, key.Value, val.ID, val.Pkid, opStr);
}
else {
Console.WriteLine("cq{0}, listener{1}::OnEvent update count={2}, create Count={3}, destroy Count={4}, total count={5}", cq.Name, m_id, m_updateCnt, m_createCnt, m_destroyCnt, m_eventCnt);
}
Console.WriteLine("*******Type \'q\' to quit !!!! ******");
}
}
public virtual void OnError(CqEvent ev)
{
m_errorCnt++;
m_eventCnt++;
Console.WriteLine("MyCqListener{0}::OnError called", m_id);
}
public virtual void Close()
{
m_eventCnt++;
Console.WriteLine("MyCqListener{0}::close called", m_id);
}
}
class MyCacheListener : CacheListenerAdapter
{
private int m_eventCount;
private bool m_verbose;
private void check(EntryEvent ev, string opStr)
{
m_eventCount++;
Portfolio val = ev.NewValue as Portfolio;
CacheableString key = ev.Key as CacheableString;
if (m_eventCount % 3000 == 0) {
if (m_verbose == true) {
Console.WriteLine("MyCacheListener called with key {0}, value ({1},{2}), op {3}.", key.Value, val.ID, val.Pkid, opStr);
}
else {
Console.WriteLine("MyCacheListener::event count={0}", m_eventCount);
}
Console.WriteLine("*******Type \'q\' to quit !!!! ******");
}
}
public MyCacheListener(bool verbose)
{
m_eventCount = 0;
m_verbose = verbose;
}
public override void AfterCreate(EntryEvent ev)
{
check(ev, "AfterCreate");
}
public override void AfterUpdate(EntryEvent ev)
{
check(ev, "AfterUpdate");
}
public override void AfterDestroy(EntryEvent ev)
{
check(ev, "AfterDestroy");
}
public override void AfterInvalidate(EntryEvent ev)
{
check(ev, "AfterInvalidate");
}
}
class ContinuousQuery
{
private static string[] cqNames = new string[8]{
"MyCq_0",
"MyCq_1",
"MyCq_2",
"MyCq_3",
"MyCq_4",
"MyCq_5",
"MyCq_6",
"MyCq_7"
};
private static string[] queryStrings = new string[8]{
"select * from /Portfolios p where p.ID < 4",
"select * from /Portfolios p where p.ID < 9",
"select * from /Portfolios p where p.ID < 12",
"select * from /Portfolios p where p.ID < 3",
"select * from /Portfolios p where p.ID < 14",
"select * from /Portfolios p where p.ID < 5",
"select * from /Portfolios p where p.ID < 6",
"select * from /Portfolios p where p.ID < 7"
};
static void Main(string[] args)
{
bool verbose = false;
if (args.Length == 1 && args[0] == "-v")
verbose = true;
try {
// Connect to the GemFire Distributed System using the settings from the gfcpp.properties file by default.
Properties prop = Properties.Create();
prop.Insert("cache-xml-file", "clientCqQuery.xml");
CacheFactory cacheFactory = CacheFactory.CreateCacheFactory(prop);
Cache cache = cacheFactory.SetSubscriptionEnabled(true)
.Create();
Console.WriteLine("Created the GemFire Cache");
// Get the Portfolios Region from the Cache which is declared in the Cache XML file.
Region region = cache.GetRegion("Portfolios");
Console.WriteLine("Obtained the Region from the Cache");
region.GetAttributesMutator().SetCacheListener(new MyCacheListener(verbose));
// Register our Serializable/Cacheable Query objects, viz. Portfolio and Position.
Serializable.RegisterType(Portfolio.CreateDeserializable);
Serializable.RegisterType(Position.CreateDeserializable);
//Register all keys
region.RegisterAllKeys();
Console.WriteLine("Registered Serializable Query Objects");
// Populate the Region with some Portfolio objects.
Portfolio port1 = new Portfolio(1 /*ID*/, 10 /*size*/);
Portfolio port2 = new Portfolio(2 /*ID*/, 20 /*size*/);
Portfolio port3 = new Portfolio(3 /*ID*/, 30 /*size*/);
region.Put("Key1", port1);
region.Put("Key2", port2);
region.Put("Key3", port3);
Console.WriteLine("Populated some Portfolio Objects");
// Get the QueryService from the Cache.
QueryService qrySvc = cache.GetQueryService();
Console.WriteLine("Got the QueryService from the Cache");
//create CqAttributes with listener
CqAttributesFactory cqFac = new CqAttributesFactory();
ICqListener cqLstner = new MyCqListener(0, verbose);
cqFac.AddCqListener(cqLstner);
CqAttributes cqAttr = cqFac.Create();
//create a new cqQuery
CqQuery qry = qrySvc.NewCq(cqNames[0], queryStrings[0], cqAttr, true);
// Execute a CqQuery with Initial Results
ICqResults results = qry.ExecuteWithInitialResults();
Console.WriteLine("ResultSet Query returned {0} rows", results.Size);
SelectResultsIterator iter = results.GetIterator();
while (iter.HasNext) {
IGFSerializable item = iter.Next();
if (item != null) {
Struct st = item as Struct;
CacheableString key = st["key"] as CacheableString;
Console.WriteLine("Got key " + key.Value);
Portfolio port = st["value"] as Portfolio;
if (port == null) {
Position pos = st["value"] as Position;
if (pos == null) {
CacheableString cs = st["value"] as CacheableString;
if (cs == null) {
Console.WriteLine("Query got other/unknown object.");
}
else {
Console.WriteLine("Query got string : {0}.", cs.Value);
}
}
else {
Console.WriteLine("Query got Position object with secId {0}, shares {1}.", pos.SecId, pos.SharesOutstanding);
}
}
else {
Console.WriteLine("Query got Portfolio object with ID {0}, pkid {1}.", port.ID, port.Pkid);
}
}
}
//Stop the cq
qry.Stop();
//Restart the cq
qry.Execute();
for (int i = 1; i < cqNames.Length; i++) {
ICqListener cqLstner1 = new MyCqListener(i, verbose);
cqFac.AddCqListener(cqLstner1);
cqAttr = cqFac.Create();
qry = qrySvc.NewCq(cqNames[i], queryStrings[i], cqAttr, true);
}
qry = qrySvc.GetCq(cqNames[6]);
cqAttr = qry.GetCqAttributes();
ICqListener[] vl = cqAttr.getCqListeners();
Console.WriteLine("number of listeners for cq[{0}] is {1}", cqNames[6], vl.Length);
qry = qrySvc.GetCq(cqNames[0]);
CqAttributesMutator cqam = qry.GetCqAttributesMutator();
for (int i = 0; i < vl.Length; i++) {
cqam.AddCqListener(vl[i]);
}
//Stop the cq
qry.Stop();
//Start all Cq Query
qrySvc.ExecuteCqs();
for (int i = 0; i < cqNames.Length; i++) {
Console.WriteLine("get info for cq[{0}]:", cqNames[i]);
CqQuery cqy = qrySvc.GetCq(cqNames[i]);
CqStatistics cqStats = cqy.GetStatistics();
Console.WriteLine("Cq[{0}]: CqStatistics: numInserts[{1}], numDeletes[{2}], numUpdates[{3}], numEvents[{4}]",
cqNames[i], cqStats.numInserts(), cqStats.numDeletes(), cqStats.numUpdates(), cqStats.numEvents());
}
CqServiceStatistics serviceStats = qrySvc.GetCqStatistics();
Console.WriteLine("numCqsActive={0}, numCqsCreated={1}, numCqsClosed={2}, numCqsStopped={3}, numCqsOnClient={4}",
serviceStats.numCqsActive(), serviceStats.numCqsCreated(), serviceStats.numCqsClosed(),
serviceStats.numCqsStopped(), serviceStats.numCqsOnClient());
while (true) {
Console.WriteLine("*******Type \'q\' to quit !!!! ******");
ConsoleKeyInfo ckey;
ckey = Console.ReadKey(true);
if (ckey.Key == ConsoleKey.Q)
break;
}
//Stop all cqs
qrySvc.StopCqs();
for (int i = 0; i < cqNames.Length; i++) {
Console.WriteLine("get info for cq[{0}]:", cqNames[i]);
CqQuery cqy = qrySvc.GetCq(cqNames[i]);
cqAttr = qry.GetCqAttributes();
vl = cqAttr.getCqListeners();
Console.WriteLine("number of listeners for cq[{0}] is {1}", cqNames[i], vl.Length);
CqStatistics cqStats = cqy.GetStatistics();
Console.WriteLine("Cq[{0}]: CqStatistics: numInserts[{1}], numDeletes[{2}], numUpdates[{3}], numEvents[{4}]",
cqNames[i], cqStats.numInserts(), cqStats.numDeletes(), cqStats.numUpdates(), cqStats.numEvents());
}
//Close all cqs
qrySvc.CloseCqs();
Console.WriteLine("numCqsActive={0}, numCqsCreated={1}, numCqsClosed={2}, numCqsStopped={3}, numCqsOnClient={4}",
serviceStats.numCqsActive(), serviceStats.numCqsCreated(), serviceStats.numCqsClosed(), serviceStats.numCqsStopped(),
serviceStats.numCqsOnClient());
// Close the GemFire Cache.
cache.Close();
Console.WriteLine("Closed the GemFire Cache");
}
// An exception should not occur
catch (GemFireException gfex) {
Console.WriteLine("CqQuery GemFire Exception: {0}", gfex.Message);
}
}
}
}