blob: 01bd2abdda30c52162809fa1a630c16d0ad4a41f [file] [log] [blame]
---
title: Implementing Continuous Querying
---
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
Use continuous querying in your clients to receive continuous updates to queries run on the servers.
Before you begin, you should be familiar with [Querying](../querying_basics/chapter_overview.html) and have your client/server system configured.
1. Configure the client pools you will use for CQs with `subscription-enabled` set to true.
To have CQ and interest subscription events arrive as closely together as possible, use a single pool for everything. Different pools might use different servers, which can lead to greater differences in event delivery time.
2. Write your OQL query to retrieve the data you need from the server.
The query must satisfy these CQ requirements in addition to the standard GemFire querying specifications:
- The FROM clause must contain only a single region specification, with optional iterator variable.
- The query must be a SELECT expression only, preceded by zero or more IMPORT statements. This means the query cannot be a statement such as <code>"/tradeOrder.name"</code> or <code>"(SELECT \* from /tradeOrder).size".</code>
- The CQ query cannot use:
- Cross region joins
- Drill-downs into nested collections
- DISTINCT
- Projections
- Bind parameters
- ORDER BY
- GROUP BY
- Aggregate Functions (MIN, MAX, AVG, SUM, COUNT)
- The CQ query must be created on a partitioned or replicated region. See [Region Type Restrictions for CQs](how_continuous_querying_works.html#how_continuous_querying_works__section_bfs_llr_gr).
The basic syntax for the CQ query is:
``` pre
SELECT * FROM /fullRegionPath [iterator] [WHERE clause]
```
This example query could be used to get all trade orders where the price is over $100:
``` pre
SELECT * FROM /tradeOrder t WHERE t.price > 100.00
```
3. Write your CQ listeners to handle CQ events from the server.
Implement `org.apache.geode.cache.query.CqListener` in each event handler you need. In addition to your main CQ listeners, you might have listeners that you use for all CQs to track statistics or other general information.
**Note:**
Be especially careful if you choose to update your cache from your `CqListener`. If your listener updates the region that is queried in its own CQ and that region has a `Pool` named, the update will be forwarded to the server. If the update on the server satisfies the same CQ, it may be returned to the same listener that did the update, which could put your application into an infinite loop. This same scenario could be played out with multiple regions and multiple CQs, if the listeners are programmed to update each other's regions.
This example outlines a `CqListener` that might be used to update a display screen with current data from the server. The listener gets the `queryOperation` and entry key and value from the `CqEvent` and then updates the screen according to the type of `queryOperation`.
``` pre
// CqListener class
public class TradeEventListener implements CqListener
{
public void onEvent(CqEvent cqEvent)
{
// org.apache.geode.cache Operation associated with the query op
Operation queryOperation = cqEvent.getQueryOperation();
// key and new value from the event
Object key = cqEvent.getKey();
TradeOrder tradeOrder = (TradeOrder)cqEvent.getNewValue();
if (queryOperation.isUpdate())
{
// update data on the screen for the trade order . . .
}
else if (queryOperation.isCreate())
{
// add the trade order to the screen . . .
}
else if (queryOperation.isDestroy())
{
// remove the trade order from the screen . . .
}
}
public void onError(CqEvent cqEvent)
{
// handle the error
}
// From CacheCallback public void close()
{
// close the output screen for the trades . . .
}
}
```
When you install the listener and run the query, your listener will handle all of the CQ results.
4. If you need your CQs to detect whether they are connected to any of the servers that host its subscription queues, implement a `CqStatusListener` instead of a `CqListener`.
`CqStatusListener` extends the current `CqListener`, allowing a client to detect when a CQ is connected and/or disconnected from the server(s). The `onCqConnected()` method will be invoked when the CQ is connected, and when the CQ has been reconnected after being disconnected. The `onCqDisconnected()` method will be invoked when the CQ is no longer connected to any servers.
Taking the example from step 3, we can instead implement a `CqStatusListener`:
``` pre
public class TradeEventListener implements CqStatusListener
{
public void onEvent(CqEvent cqEvent)
{
// org.apache.geode.cache Operation associated with the query op
Operation queryOperation = cqEvent.getQueryOperation();
// key and new value from the event
Object key = cqEvent.getKey();
TradeOrder tradeOrder = (TradeOrder)cqEvent.getNewValue();
if (queryOperation.isUpdate())
{
// update data on the screen for the trade order . . .
}
else if (queryOperation.isCreate())
{
// add the trade order to the screen . . .
}
else if (queryOperation.isDestroy())
{
// remove the trade order from the screen . . .
}
}
public void onError(CqEvent cqEvent)
{
// handle the error
}
// From CacheCallback public void close()
{
// close the output screen for the trades . . .
}
public void onCqConnected() {
//Display connected symbol
}
public void onCqDisconnected() {
//Display disconnected symbol
}
}
```
When you install the `CqStatusListener`, your listener will be able to detect its connection status to the servers that it is querying.
5. Program your client to run the CQ:
1. Create a `CqAttributesFactory` and use it to set your `CqListener`s and `CqStatusListener`.
2. Pass the attributes factory and the CQ query and its unique name to the `QueryService` to create a new `CqQuery`.
3. Start the query running by calling one of the execute methods on the `CqQuery` object.
You can execute with or without an initial result set.
4. When you are done with the CQ, close it.
## Continuous Query Implementation
``` pre
// Get cache and queryService - refs to local cache and QueryService
// Create client /tradeOrder region configured to talk to the server
// Create CqAttribute using CqAttributeFactory
CqAttributesFactory cqf = new CqAttributesFactory();
// Create a listener and add it to the CQ attributes callback defined below
CqListener tradeEventListener = new TradeEventListener();
cqf.addCqListener(tradeEventListener);
CqAttributes cqa = cqf.create();
// Name of the CQ and its query
String cqName = "priceTracker";
String queryStr = "SELECT * FROM /tradeOrder t where t.price > 100.00";
// Create the CqQuery
CqQuery priceTracker = queryService.newCq(cqName, queryStr, cqa);
try
{ // Execute CQ, getting the optional initial result set
// Without the initial result set, the call is priceTracker.execute();
SelectResults sResults = priceTracker.executeWithInitialResults();
for (Object o : sResults) {
Struct s = (Struct) o;
TradeOrder to = (TradeOrder) s.get("value");
System.out.println("Intial result includes: " + to);
}
}
catch (Exception ex)
{
ex.printStackTrace();
}
// Now the CQ is running on the server, sending CqEvents to the listener
. . .
// End of life for the CQ - clear up resources by closing
priceTracker.close();
```
With continuous queries, you can optionally implement:
- Highly available CQs by configuring your servers for high availability.
- Durable CQs by configuring your clients for durable messaging and indicating which CQs are durable at creation.