blob: 76b12481d906bf682a9c92ccc451d6c99d59ea72 [file] [log] [blame]
---
title: Implementing an AsyncEventListener for Write-Behind Cache Event Handling
---
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
An `AsyncEventListener` asynchronously processes batches of events after they have been applied to a region. You can use an `AsyncEventListener` implementation as a write-behind cache event handler to synchronize region updates with a database.
## <a id="implementing_write_behind_cache_event_handling__section_35B3ADC77E1147468A568E49C8C308E1" class="no-quick-link"></a>How an AsyncEventListener Works
An `AsyncEventListener` instance is serviced by its own dedicated thread in which a callback method is invoked. Events that update a region are placed in an internal `AsyncEventQueue`, and one or more threads dispatch batches of events at a time to the listener implementation.
You can configure an `AsyncEventQueue` to be either serial or parallel. A serial queue is deployed to one <%=vars.product_name%> member, and it delivers all of a region's events, in order of occurrence, to a configured `AsyncEventListener` implementation. A parallel queue is deployed to multiple <%=vars.product_name%> members, and each instance of the queue delivers region events, possibly simultaneously, to a local `AsyncEventListener` implementation.
While a parallel queue provides the best throughput for writing events, it provides less control for ordering those events. With a parallel queue, you cannot preserve event ordering for a region as a whole because multiple <%=vars.product_name%> servers queue and deliver the region's events at the same time. However, the ordering of events for a given partition (or for a given queue of a distributed region) can be preserved.
For both serial and parallel queues, you can control the maximum amount of memory that each queue uses, as well as the batch size and frequency for processing batches in the queue. You can also configure queues to persist to disk (instead of simply overflowing to disk) so that write-behind caching can pick up where it left off when a member shuts down and is later restarted.
Optionally, a queue can use multiple threads to dispatch queued events. When you configure multiple threads for a serial queue, the logical queue that is hosted on a <%=vars.product_name%> member is divided into multiple physical queues, each with a dedicated dispatcher thread. You can then configure whether the threads dispatch queued events by key, by thread, or in the same order in which events were added to the queue. When you configure multiple threads for a parallel queue, each queue hosted on a <%=vars.product_name%> member is processed by dispatcher threads; the total number of queues created depends on the number of members that host the region.
A `GatewayEventFilter` can be placed on the `AsyncEventQueue` to control whether a particular event is sent to a selected `AsyncEventListener`. For example, events associated with sensitive data could be detected and not queued. For more detail, see the Javadocs for `GatewayEventFilter`.
A `GatewayEventSubstitutionFilter` can specify whether the event is transmitted in its entirety or in an altered representation. For example, to reduce the size of the data being serialized, it might be a more efficient to represent a full object by only its key. For more detail, see the Javadocs for `GatewayEventSubstitutionFilter`.
## Operation Distribution from an AsyncEventQueue
An `AsyncEventQueue` distributes these operations:
- Entry create
- Entry put
- Entry distributed destroy, providing the operation is not an expiration action
- Expiration destroy, if the `forward-expiration-destroy` attribute is set to `true`. By default, this attribute is `false`, but you can set it to `true` using `cache.xml` or `gfsh`. To set this attribute in the Java API, use `AsyncEventQueueFactory.setForwardExpirationDestroy()`. See the javadocs for details.
These operations are not distributed:
- Get
- Invalidate
- Local destroy
- Region operations
- Expiration actions
- Expiration destroy, if the `forward-expiration-destroy` attribute is set to `false`. The default value is `false`.
## <a id="implementing_write_behind_cache_event_handling__section_6FDBAFCB9C194EB0AF0822A509F2F9F2" class="no-quick-link"></a>Guidelines for Using an AsyncEventListener
Review the following guidelines before using an AsyncEventListener:
- If you use an `AsyncEventListener` to implement a write-behind cache listener, your code should check for the possibility that an existing database connection may have been closed due to an earlier exception. For example, check for `Connection.isClosed()` in a catch block and re-create the connection as needed before performing further operations.
- Use a serial `AsyncEventQueue` if you need to preserve the order of region events within a thread when delivering events to your listener implementation. Use parallel queues when the order of events within a thread is not important, and when you require maximum throughput for processing events. In both cases, serial and parallel, the order of operations on a given key is preserved within the scope of the thread.
- You must install the `AsyncEventListener` implementation on a <%=vars.product_name%> member that hosts the region whose events you want to process.
- If you configure a parallel `AsyncEventQueue`, deploy the queue on each <%=vars.product_name%> member that hosts the region.
- You can install a listener on more than one member to provide high availability and guarantee delivery for events, in the event that a member with the active `AsyncEventListener` shuts down. At any given time only one member has an active listener for dispatching events. The listeners on other members remain on standby for redundancy. For best performance and most efficient use of memory, install only one standby listener (redundancy of at most one).
- Install no more than one standby listener (redundancy of at most one) for performance and memory reasons.
- To preserve pending events through member shutdowns, configure <%=vars.product_name%> to persist the internal queue of the `AsyncEventListener` to an available disk store. By default, any pending events that reside in the internal queue of an `AsyncEventListener` are lost if the active listener's member shuts down.
- To ensure high availability and reliable delivery of events, configure the event queue to be both persistent and redundant.
## <a id="implementing_write_behind_cache_event_handling__section_FB3EB382E37945D9895E09B47A64D6B9" class="no-quick-link"></a>Implementing an AsyncEventListener
To receive region events for processing, you create a class that implements the `AsyncEventListener` interface. The `processEvents` method in your listener receives a list of queued `AsyncEvent` objects in each batch.
Each `AsyncEvent` object contains information about a region event, such as the name of the region where the event occurred, the type of region operation, and the affected key and value.
The basic framework for implementing a write-behind event handler involves iterating through the batch of events and writing each event to a database. For example:
``` pre
class MyAsyncEventListener implements AsyncEventListener {
public boolean processEvents(List<AsyncEvent> events) {
// Process each AsyncEvent
for(AsyncEvent event: events) {
// Write the event to a database
}
}
}
```
## <a id="implementing_write_behind_cache_event_handling__section_AB80262CFB6D4867B52A5D6D880A5294" class="no-quick-link"></a>Processing AsyncEvents
Use the [AsyncEventListener.processEvents](/releases/latest/javadoc/org/apache/geode/cache/asyncqueue/AsyncEventListener.html) method to process AsyncEvents. This method is called asynchronously when events are queued to be processed. The size of the list reflects the number of batch events where batch size is defined in the AsyncEventQueueFactory. The `processEvents` method returns a boolean; true if the AsyncEvents are processed correctly, and false if any events fail processing. As long as `processEvents` returns false, <%=vars.product_name%> continues to re-try processing the events.
You can use the `getDeserializedValue` method to obtain cache values for entries that have been updated or created. Since the `getDeserializedValue` method will return a null value for destroyed entries, you should use the `getKey` method to obtain references to cache objects that have been destroyed. Here's an example of processing AsyncEvents:
``` pre
public boolean processEvents(@SuppressWarnings("rawtypes") List<AsyncEvent> list)
{
logger.log (Level.INFO, String.format("Size of List<GatewayEvent> = %s", list.size()));
List<JdbcBatch> newEntries = new ArrayList<JdbcBatch>();
List<JdbcBatch> updatedEntries = new ArrayList<JdbcBatch>();
List<String> destroyedEntries = new ArrayList<String>();
int possibleDuplicates = 0;
for (@SuppressWarnings("rawtypes") AsyncEvent ge: list)
{
if (ge.getPossibleDuplicate())
possibleDuplicates++;
if ( ge.getOperation().equals(Operation.UPDATE))
{
updatedEntries.add((JdbcBatch) ge.getDeserializedValue());
}
else if ( ge.getOperation().equals(Operation.CREATE))
{
newEntries.add((JdbcBatch) ge.getDeserializedValue());
}
else if ( ge.getOperation().equals(Operation.DESTROY))
{
destroyedEntries.add(ge.getKey().toString());
}
}
```
## <a id="implementing_write_behind_cache_event_handling__section_9286E8C6B3C54089888E1680B4F43692" class="no-quick-link"></a>Configuring an AsyncEventListener
To configure a write-behind cache listener, you first configure an asynchronous queue to dispatch the region events, and then create the queue with your listener implementation. You then assign the queue to a region in order to process that region's events.
**Procedure**
1. Configure a unique `AsyncEventQueue` with the name of your listener implementation. You can optionally configure the queue for parallel operation, persistence, batch size, and maximum memory size. See [WAN Configuration](../../reference/topics/elements_ref.html#topic_7B1CABCAD056499AA57AF3CFDBF8ABE3) for more information.
**gfsh configuration**
``` pre
gfsh>create async-event-queue --id=sampleQueue --persistent --disk-store=exampleStore --listener=com.myCompany.MyAsyncEventListener --listener-param=url#jdbc:db2:SAMPLE,username#gfeadmin,password#admin1
```
The parameters for this command uses the following syntax:
``` pre
create async-event-queue --id=value --listener=value [--group=value] [--batch-size=value]
[--persistent(=value)?] [--disk-store=value] [--max-queue-memory=value] [--listener-param=value(,value)*]
```
For more information, see [create async-event-queue](../../tools_modules/gfsh/command-pages/create.html#topic_ryz_pb1_dk).
**cache.xml Configuration**
``` pre
<cache>
<async-event-queue id="sampleQueue" persistent="true"
disk-store-name="exampleStore" parallel="false">
<async-event-listener>
<class-name>MyAsyncEventListener</class-name>
<parameter name="url">
<string>jdbc:db2:SAMPLE</string>
</parameter>
<parameter name="username">
<string>gfeadmin</string>
</parameter>
<parameter name="password">
<string>admin1</string>
</parameter>
</async-event-listener>
</async-event-queue>
...
</cache>
```
**Java Configuration**
``` pre
Cache cache = new CacheFactory().create();
AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
factory.setPersistent(true);
factory.setDiskStoreName("exampleStore");
factory.setParallel(false);
AsyncEventListener listener = new MyAsyncEventListener();
AsyncEventQueue asyncQueue = factory.create("sampleQueue", listener);
```
2. If you are using a parallel `AsyncEventQueue`, the gfsh example above requires no alteration, as gfsh applies to all members. If using cache.xml or the Java API to configure your `AsyncEventQueue`, repeat the above configuration in each <%=vars.product_name%> member that will host the region. Use the same ID and configuration settings for each queue configuration.
**Note:**
You can ensure other members use the sample configuration by using the cluster configuration service available in gfsh. See [Overview of the Cluster Configuration Service](../../configuring/cluster_config/gfsh_persist.html).
3. On each <%=vars.product_name%> member that hosts the `AsyncEventQueue`, assign the queue to each region that you want to use with the `AsyncEventListener` implementation.
**gfsh Configuration**
``` pre
gfsh>create region --name=Customer --async-event-queue-id=sampleQueue
```
Note that you can specify multiple queues on the command line in a comma-delimited list.
**cache.xml Configuration**
``` pre
<cache>
<region name="Customer">
<region-attributes async-event-queue-ids="sampleQueue">
</region-attributes>
</region>
...
</cache>
```
**Java Configuration**
``` pre
RegionFactory rf1 = cache.createRegionFactory();
rf1.addAsyncEventQueue(sampleQueue);
Region customer = rf1.create("Customer");
// Assign the queue to multiple regions as needed
RegionFactory rf2 = cache.createRegionFactory();
rf2.addAsyncEventQueue(sampleQueue);
Region order = rf2.create("Order");
```
Using the Java API, you can also add and remove queues to regions that have already been created:
``` pre
AttributesMutator mutator = order.getAttributesMutator();
mutator.addAsyncEventQueueId("sampleQueue");
```
See the [<%=vars.product_name%> API documentation](/releases/latest/javadoc/org/apache/geode/cache/AttributesMutator.html) for more information.
4. Optionally configure persistence and conflation for the queue.
**Note:**
You must configure your AsyncEventQueue to be persistent if you are using persistent data regions. Using a non-persistent queue with a persistent region is not supported.
5. Optionally configure multiple dispatcher threads and the ordering policy for the queue using the instructions in [Configuring Dispatcher Threads and Order Policy for Event Distribution](configuring_gateway_concurrency_levels.html).
The `AsyncEventListener` receives events from every region configured with the associated `AsyncEventQueue`.