[SM-2108] Added StoreListener interface. Store and StoreFactory are StoreListener aware. Added base implementations for Store and StoreFactory. All implementations now throw the appropriate events. Added some unit test cases inside MemoryStoreTest that test how StoreListener works.

git-svn-id: https://svn.apache.org/repos/asf/servicemix/utils/trunk@1146047 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/pom.xml b/pom.xml
index 2f4a66d..0d659a2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -164,6 +164,12 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.easymock</groupId>
+            <artifactId>easymock</artifactId>
+            <version>3.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.springframework</groupId>
             <artifactId>spring-context</artifactId>
             <version>3.0.5.RELEASE</version>
diff --git a/src/main/java/org/apache/servicemix/store/Store.java b/src/main/java/org/apache/servicemix/store/Store.java
index 24e8c04..db750cf 100644
--- a/src/main/java/org/apache/servicemix/store/Store.java
+++ b/src/main/java/org/apache/servicemix/store/Store.java
@@ -17,6 +17,8 @@
 package org.apache.servicemix.store;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.Set;
 
 /**
  * A Store is an interface representing a storage where objects can be
@@ -28,7 +30,7 @@
  *  
  * @author gnodet
  */
-public interface Store {
+public interface Store  {
 
     String PERSISTENT = "Persistent";
     
@@ -83,5 +85,25 @@
      * @throws IOException if an error occurs
      */
     Object peek(String id) throws IOException;
-    
+
+
+
+    /***
+     * Registers a {@link StoreListener}.
+     * @param listener
+     */
+    void addListener(StoreListener listener);
+
+
+    /***
+     * Unregisters a {@link StoreListener}.
+     * @param listener
+     */
+    void removeListener(StoreListener listener);
+
+    /***
+     * Lists all {@link StoreListener}s.
+     */
+    public Set<StoreListener> getStoreListeners();
+
 }
diff --git a/src/main/java/org/apache/servicemix/store/StoreFactory.java b/src/main/java/org/apache/servicemix/store/StoreFactory.java
index 0b179cb..367e8fc 100644
--- a/src/main/java/org/apache/servicemix/store/StoreFactory.java
+++ b/src/main/java/org/apache/servicemix/store/StoreFactory.java
@@ -18,10 +18,21 @@
 
 import java.io.IOException;
 
-public interface StoreFactory {
+public interface StoreFactory  {
 
+    /**
+     * Opens a {@link Store} with the specified name.
+     * @param name
+     * @return
+     * @throws IOException
+     */
     Store open(String name) throws IOException;
-    
+
+    /**
+     * Closes a {@link Store} with the specified name,
+     * @param store
+     * @throws IOException
+     */
     void close(Store store) throws IOException;
     
 }
diff --git a/src/main/java/org/apache/servicemix/store/StoreListener.java b/src/main/java/org/apache/servicemix/store/StoreListener.java
new file mode 100644
index 0000000..347e921
--- /dev/null
+++ b/src/main/java/org/apache/servicemix/store/StoreListener.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.store;
+
+/**
+ * A listener interface for {@link Store} implementations.
+ * @author: iocanel
+ */
+public interface StoreListener {
+
+    /**
+     * Method that is called each time an item is added.
+     * @param id
+     * @param data
+     */
+    public void onAdd(String id, Object data);
+
+    /**
+     * Method that is called each time an item is removed.
+     * @param id
+     * @param data
+     */
+    public void onRemove(String id, Object data);
+
+    /**
+     * Method that is called each time an item is evicted.
+     * Please note that not all {@link Store}s support eviction.
+     * @param id
+     * @param data
+     */
+    public void onEvict(String id, Object data);
+}
diff --git a/src/main/java/org/apache/servicemix/store/base/BaseStore.java b/src/main/java/org/apache/servicemix/store/base/BaseStore.java
new file mode 100644
index 0000000..bffd2a4
--- /dev/null
+++ b/src/main/java/org/apache/servicemix/store/base/BaseStore.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.store.base;
+
+import org.apache.servicemix.store.Store;
+import org.apache.servicemix.store.StoreListener;
+
+import java.io.Serializable;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/**
+ * @author: iocanel
+ */
+public abstract class BaseStore implements Store, Serializable {
+
+    protected final Set<StoreListener> storeListeners = new LinkedHashSet<StoreListener>();
+
+    /**
+     * Notify all registered {@link StoreListener}s that an item has been added.
+     * @param id
+     * @param data
+     */
+    public void fireAddedEvent(String id, Object data) {
+        for(StoreListener listener:storeListeners) {
+            listener.onAdd(id,data);
+        }
+    }
+
+    /**
+     * Notify all registered {@link StoreListener}s that an item has been removed.
+     * @param id
+     * @param data
+     */
+    public void fireRemovedEvent(String id, Object data) {
+        for(StoreListener listener:storeListeners) {
+            listener.onRemove(id, data);
+        }
+    }
+
+    /**
+     * Notify all registered {@link StoreListener}s that an item has been evicted.
+     * @param id
+     * @param data
+     */
+    public void fireEvictedEvent(String id, Object data) {
+        for(StoreListener listener:storeListeners) {
+            listener.onEvict(id, data);
+        }
+    }
+
+    /***
+     * Registers a {@link StoreListener}.
+     * @param listener
+     */
+    public void addListener(StoreListener listener) {
+        storeListeners.add(listener);
+    }
+
+    /***
+     * Unregisters a {@link StoreListener}.
+     * @param listener
+     */
+    public void removeListener(StoreListener listener) {
+        storeListeners.remove(listener);
+    }
+
+    /***
+     * Lists all {@link StoreListener}s.
+     */
+    public Set<StoreListener> getStoreListeners() {
+        return storeListeners;
+    }
+}
diff --git a/src/main/java/org/apache/servicemix/store/base/BaseStoreFactory.java b/src/main/java/org/apache/servicemix/store/base/BaseStoreFactory.java
new file mode 100644
index 0000000..bc5b0eb
--- /dev/null
+++ b/src/main/java/org/apache/servicemix/store/base/BaseStoreFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.store.base;
+
+import org.apache.servicemix.store.StoreFactory;
+import org.apache.servicemix.store.StoreListener;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/**
+ * @author: iocanel
+ */
+public abstract class BaseStoreFactory implements StoreFactory {
+
+    protected Set<StoreListener> storeListeners = new LinkedHashSet<StoreListener>();
+
+    public Set<StoreListener> getStoreListeners() {
+        return storeListeners;
+    }
+
+    public void setStoreListeners(Set<StoreListener> storeListeners) {
+        this.storeListeners = storeListeners;
+    }
+}
diff --git a/src/main/java/org/apache/servicemix/store/hazelcast/HazelcastStore.java b/src/main/java/org/apache/servicemix/store/hazelcast/HazelcastStore.java
index 3d82555..54732db 100644
--- a/src/main/java/org/apache/servicemix/store/hazelcast/HazelcastStore.java
+++ b/src/main/java/org/apache/servicemix/store/hazelcast/HazelcastStore.java
@@ -21,10 +21,10 @@
 import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.core.IdGenerator;
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.Map;
+
+import org.apache.servicemix.store.base.BaseStore;
 import org.apache.servicemix.store.Entry;
-import org.apache.servicemix.store.Store;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,7 +32,7 @@
  *
  * @author iocanel
  */
-public class HazelcastStore implements Store, Serializable {
+public class HazelcastStore extends BaseStore {
 
     private static final Logger LOG = LoggerFactory.getLogger(HazelcastStore.class);
 
@@ -106,6 +106,7 @@
     public void store(String id, Object data) throws IOException {
         LOG.debug("Storing object with id: " + id);
         datas.put(id, new Entry(data));
+        fireAddedEvent(id,data);
     }
 
     /**
@@ -138,7 +139,10 @@
             evict();
         }
         Entry entry = datas.remove(id);
-        return entry != null ? entry.getData() : null;
+        if(entry != null) {
+          fireRemovedEvent(id,entry.getData());
+            return entry.getData();
+        } else return null;
     }
 
     /**
@@ -163,7 +167,12 @@
             long age = now - datas.get(key).getTime();
             if (age > timeout) {
                 LOG.debug("Removing object with id " + key + " from store after " + age + " ms");
+                Entry entry = datas.get(key);
                 datas.remove(key);
+
+                if(entry != null) {
+                    fireEvictedEvent(key,entry.getData());
+                }
             }
         }
     }
diff --git a/src/main/java/org/apache/servicemix/store/hazelcast/HazelcastStoreFactory.java b/src/main/java/org/apache/servicemix/store/hazelcast/HazelcastStoreFactory.java
index 4ee5eb3..4b3c19c 100644
--- a/src/main/java/org/apache/servicemix/store/hazelcast/HazelcastStoreFactory.java
+++ b/src/main/java/org/apache/servicemix/store/hazelcast/HazelcastStoreFactory.java
@@ -25,13 +25,14 @@
 import java.util.Map;
 
 import org.apache.servicemix.store.Store;
-import org.apache.servicemix.store.StoreFactory;
+import org.apache.servicemix.store.StoreListener;
+import org.apache.servicemix.store.base.BaseStoreFactory;
 
 
 /**
  * @author iocanel
  */
-public class HazelcastStoreFactory implements StoreFactory {
+public class HazelcastStoreFactory extends BaseStoreFactory {
 
     private Map<String, HazelcastStore> stores;
 
@@ -54,6 +55,10 @@
             } else {
                 store = new HazelcastStore(hazelcastInstance, storeName, timeout);
             }
+
+            for(StoreListener listener:storeListeners) {
+                store.addListener(listener);
+            }
             stores.put(name, store);
         }
         return store;
diff --git a/src/main/java/org/apache/servicemix/store/jdbc/JdbcStore.java b/src/main/java/org/apache/servicemix/store/jdbc/JdbcStore.java
index 74812c3..310b60b 100644
--- a/src/main/java/org/apache/servicemix/store/jdbc/JdbcStore.java
+++ b/src/main/java/org/apache/servicemix/store/jdbc/JdbcStore.java
@@ -23,11 +23,11 @@
 import java.io.ObjectOutputStream;
 import java.sql.Connection;
 
-import org.apache.servicemix.store.Store;
+import org.apache.servicemix.store.base.BaseStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class JdbcStore implements Store {
+public class JdbcStore extends BaseStore {
 
     private static final Logger LOG = LoggerFactory.getLogger(JdbcStore.class);
 
@@ -55,6 +55,7 @@
             out.close();
             connection = factory.getDataSource().getConnection();
             factory.getAdapter().doStoreData(connection, name + ":" + id, buffer.toByteArray());
+            fireAddedEvent(id,data);
         } catch (Exception e) {
             throw (IOException) new IOException("Error storing object").initCause(e);
         } finally {
@@ -79,6 +80,7 @@
                 ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data));
                 result = ois.readObject();
                 factory.getAdapter().doRemoveData(connection, name + ":" + id);
+                fireRemovedEvent(id, data);
             }
             return result;
         } catch (Exception e) {
diff --git a/src/main/java/org/apache/servicemix/store/jdbc/JdbcStoreFactory.java b/src/main/java/org/apache/servicemix/store/jdbc/JdbcStoreFactory.java
index dfedfdd..5faff59 100644
--- a/src/main/java/org/apache/servicemix/store/jdbc/JdbcStoreFactory.java
+++ b/src/main/java/org/apache/servicemix/store/jdbc/JdbcStoreFactory.java
@@ -29,9 +29,10 @@
 import org.apache.servicemix.jdbc.JDBCAdapterFactory;
 import org.apache.servicemix.jdbc.Statements;
 import org.apache.servicemix.store.Store;
-import org.apache.servicemix.store.StoreFactory;
+import org.apache.servicemix.store.StoreListener;
+import org.apache.servicemix.store.base.BaseStoreFactory;
 
-public class JdbcStoreFactory implements StoreFactory {
+public class JdbcStoreFactory extends BaseStoreFactory {
 
     private boolean transactional;
     private boolean clustered;
@@ -77,6 +78,9 @@
         JdbcStore store = stores.get(name);
         if (store == null) {
             store = new JdbcStore(this, name);
+            for(StoreListener listener:storeListeners) {
+                store.addListener(listener);
+            }
             stores.put(name, store);
         }
         return store;
diff --git a/src/main/java/org/apache/servicemix/store/memory/MemoryStore.java b/src/main/java/org/apache/servicemix/store/memory/MemoryStore.java
index 5420d70..d44df12 100644
--- a/src/main/java/org/apache/servicemix/store/memory/MemoryStore.java
+++ b/src/main/java/org/apache/servicemix/store/memory/MemoryStore.java
@@ -21,7 +21,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.servicemix.id.IdGenerator;
-import org.apache.servicemix.store.Store;
+import org.apache.servicemix.store.base.BaseStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,7 +31,7 @@
  * 
  * @author gnodet
  */
-public class MemoryStore implements Store {
+public class MemoryStore extends BaseStore {
 
     private static final Logger LOG = LoggerFactory.getLogger(MemoryStore.class);
 
@@ -50,6 +50,7 @@
     public void store(String id, Object data) throws IOException {
         LOG.debug("Storing object with id: " + id);
         datas.put(id, data);
+        fireAddedEvent(id,data);
     }
 
     public String store(Object data) throws IOException {
@@ -60,7 +61,9 @@
 
     public Object load(String id) throws IOException {
         LOG.debug("Loading/Removing object with id: " + id);
-        return datas.remove(id);
+        Object data = datas.remove(id);
+        fireEvictedEvent(id,data);
+        return data;
     }
 
     public Object peek(String id) throws IOException {
diff --git a/src/main/java/org/apache/servicemix/store/memory/MemoryStoreFactory.java b/src/main/java/org/apache/servicemix/store/memory/MemoryStoreFactory.java
index 367d888..d84365f 100644
--- a/src/main/java/org/apache/servicemix/store/memory/MemoryStoreFactory.java
+++ b/src/main/java/org/apache/servicemix/store/memory/MemoryStoreFactory.java
@@ -23,6 +23,8 @@
 import org.apache.servicemix.id.IdGenerator;
 import org.apache.servicemix.store.Store;
 import org.apache.servicemix.store.StoreFactory;
+import org.apache.servicemix.store.StoreListener;
+import org.apache.servicemix.store.base.BaseStoreFactory;
 
 /**
  * {@link StoreFactory} for creating memory-based {@link Store} implementations
@@ -30,7 +32,7 @@
  * If a timeout has been specified, a {@link TimeoutMemoryStore} will be created,
  * otherwise the factory will build a plain {@link MemoryStore}
  */
-public class MemoryStoreFactory implements StoreFactory {
+public class MemoryStoreFactory extends BaseStoreFactory {
 
     private IdGenerator idGenerator = new IdGenerator();
     private Map<String, MemoryStore> stores = new HashMap<String, MemoryStore>();
@@ -47,6 +49,10 @@
             } else {
                 store = new TimeoutMemoryStore(idGenerator, timeout);
             }
+
+            for(StoreListener listener:storeListeners) {
+                store.addListener(listener);
+            }
             stores.put(name, store);
         }
         return store;
diff --git a/src/main/java/org/apache/servicemix/store/memory/TimeoutMemoryStore.java b/src/main/java/org/apache/servicemix/store/memory/TimeoutMemoryStore.java
index 350fc1a..bb09444 100644
--- a/src/main/java/org/apache/servicemix/store/memory/TimeoutMemoryStore.java
+++ b/src/main/java/org/apache/servicemix/store/memory/TimeoutMemoryStore.java
@@ -46,6 +46,7 @@
     public void store(String id, Object data) throws IOException {
         LOG.debug("Storing object with id: " + id);
         datas.put(id, new Entry(data));
+        fireAddedEvent(id,data);
     }
 
     /**
@@ -58,7 +59,11 @@
         evict();
         LOG.debug("Loading object with id:" + id);
         Entry entry = datas.remove(id);
-        return entry == null ? null : entry.getTime();
+        if(entry != null) {
+            Object data = entry.getData();
+            fireRemovedEvent(id,data);
+            return data;
+        } else return null;
     }
 
     private void evict() {
@@ -67,6 +72,10 @@
             long age = now - datas.get(key).getTime();
             if (age > timeout) {
                 LOG.debug("Removing object with id " + key + " from store after " + age + " ms");
+                Entry entry = datas.get(key);
+                if(entry != null) {
+                    fireEvictedEvent(key,entry.getData());
+                }
                 datas.remove(key);
             }
         }
diff --git a/src/main/java/org/apache/servicemix/store/mongo/MongoStore.java b/src/main/java/org/apache/servicemix/store/mongo/MongoStore.java
index dc0f892..3d0c275 100644
--- a/src/main/java/org/apache/servicemix/store/mongo/MongoStore.java
+++ b/src/main/java/org/apache/servicemix/store/mongo/MongoStore.java
@@ -17,7 +17,8 @@
 package org.apache.servicemix.store.mongo;
 
 import com.mongodb.*;
-import org.apache.servicemix.store.Store;
+import org.apache.servicemix.store.base.BaseStore;
+import org.slf4j.LoggerFactory;
 
 import java.io.*;
 
@@ -29,7 +30,9 @@
  * @author iocanel
  * @author jbonofre
  */
-public class MongoStore implements Store {
+public class MongoStore extends BaseStore {
+
+    private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MongoStore.class);
 
     private static final String ID = "_id";
     private static final String DATA = "data";
@@ -94,22 +97,29 @@
      */
     public void store(String id, Object data) throws IOException {
         DBObject object = new BasicDBObject();
+        ObjectOutputStream out=null;
         try {
             ByteArrayOutputStream buffer = new ByteArrayOutputStream();
-            ObjectOutputStream out = new ObjectOutputStream(buffer);
+            out = new ObjectOutputStream(buffer);
             out.writeObject(data);
-            out.close();
+
             object.put(ID, id);
             object.put(DATA, buffer.toByteArray());
             object.put(TIMESTAMP, System.currentTimeMillis());
         } catch (Exception e) {
             throw (IOException) new IOException("Error storing object").initCause(e);
+        } finally {
+            if(out != null) {
+                out.close();
+            }
         }
         WriteResult result = collection.insert(object);
         // check result for errors
         if (result.getError() != null) {
             throw new IOException(result.getError());
         }
+
+        fireAddedEvent(id,data);
     }
 
     /**
@@ -155,8 +165,13 @@
             byte[] data = (byte[]) item.get(DATA);
             if (data != null) {
                 ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data));
-                obj = ois.readObject();
+                try {
+                    obj = ois.readObject();
+                } finally {
+                    ois.close();
+                }
             }
+            fireRemovedEvent(id,data);
         } catch (Exception e) {
             throw (IOException) new IOException("Error loading object").initCause(e);
         }
@@ -182,7 +197,11 @@
             byte[] data = (byte[]) item.get(DATA);
             if (data != null) {
                 ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data));
-                obj = ois.readObject();
+                try {
+                    obj = ois.readObject();
+                } finally {
+                ois.close();
+                }
             }
         } catch (Exception e) {
             throw (IOException) new IOException("Error loading object").initCause(e);
@@ -199,7 +218,48 @@
         if (timeout != null) {
             DBObject object = new BasicDBObject();
             object.put(TIMESTAMP, new BasicDBObject("&lt", System.currentTimeMillis() - timeout));
-            collection.remove(object);
+            DBCursor items = collection.find(object);
+            WriteResult result = collection.remove(object);
+
+            for (DBObject item : items) {
+                String id = null;
+                Object data = null;
+                if (item != null) {
+
+                    byte[] idBytes = (byte[]) item.get(ID);
+                    byte[] dataBytes = (byte[]) item.get(DATA);
+
+                    if (data != null) {
+                        try {
+                            ObjectInputStream ois = null;
+
+                            ois = new ObjectInputStream(new ByteArrayInputStream(idBytes));
+                            try {
+                                id = (String) ois.readObject();
+                            } finally {
+                                ois.close();
+                            }
+
+                            ois = new ObjectInputStream(new ByteArrayInputStream(dataBytes));
+                            try {
+                                data = ois.readObject();
+                                ois.close();
+                            } finally {
+                                ois.close();
+                            }
+
+                            if(id != null) {
+                                fireEvictedEvent(id,data);
+                            }
+
+                        } catch (IOException e) {
+                            LOG.error("Error evicting object from store",e);
+                        } catch (ClassNotFoundException e) {
+                            LOG.error("Error evicting object from store",e);
+                        }
+                    }
+                }
+            }
         }
     }
 }
diff --git a/src/main/java/org/apache/servicemix/store/mongo/MongoStoreFactory.java b/src/main/java/org/apache/servicemix/store/mongo/MongoStoreFactory.java
index d5eef62..3914d84 100644
--- a/src/main/java/org/apache/servicemix/store/mongo/MongoStoreFactory.java
+++ b/src/main/java/org/apache/servicemix/store/mongo/MongoStoreFactory.java
@@ -20,6 +20,8 @@
 import com.mongodb.Mongo;
 import org.apache.servicemix.store.Store;
 import org.apache.servicemix.store.StoreFactory;
+import org.apache.servicemix.store.StoreListener;
+import org.apache.servicemix.store.base.BaseStoreFactory;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -33,7 +35,7 @@
  * @author iocanel
  * @author jbonofre
  */
-public class MongoStoreFactory implements StoreFactory {
+public class MongoStoreFactory extends BaseStoreFactory {
 
     private Mongo mongo;
     private DB db;
@@ -82,6 +84,10 @@
             if (timeout != null)
                 store = new MongoStore(db, collection, timeout);
             else store = new MongoStore(db, collection);
+
+             for(StoreListener listener:storeListeners) {
+                store.addListener(listener);
+            }
             stores.put(key, store);
         }
         return store;
diff --git a/src/main/java/org/apache/servicemix/store/redis/RedisStore.java b/src/main/java/org/apache/servicemix/store/redis/RedisStore.java
index 0a6e577..273ab93 100644
--- a/src/main/java/org/apache/servicemix/store/redis/RedisStore.java
+++ b/src/main/java/org/apache/servicemix/store/redis/RedisStore.java
@@ -17,8 +17,8 @@
 package org.apache.servicemix.store.redis;
 
 
+import org.apache.servicemix.store.base.BaseStore;
 import org.apache.servicemix.store.Entry;
-import org.apache.servicemix.store.Store;
 import org.idevlab.rjc.RedisNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,7 +27,7 @@
 
 import java.io.*;
 
-public class RedisStore implements Store {
+public class RedisStore extends BaseStore {
 
     private static final Logger LOG = LoggerFactory.getLogger(RedisStore.class);
 
@@ -84,14 +84,20 @@
      */
     public void store(String id, Object data) throws IOException {
         LOG.debug("Storing object with id: " + id);
+        ObjectOutputStream out = null;
         try {
             ByteArrayOutputStream buffer = new ByteArrayOutputStream();
-            ObjectOutputStream out = new ObjectOutputStream(buffer);
+            out = new ObjectOutputStream(buffer);
             out.writeObject(new Entry(data));
             out.close();
             redisNode.set(id, encoder.encode(buffer.toByteArray()));
+            fireAddedEvent(id,data);
         } catch (Exception e) {
             throw (IOException) new IOException("Error storing object").initCause(e);
+        } finally {
+            if(out != null) {
+                out.close();
+            }
         }
     }
 
@@ -121,16 +127,28 @@
      */
     public Object load(String id) throws IOException {
         LOG.debug("Loading/Removing object with id: " + id);
-        Object result = null;
-        if (timeout > 0) {
-            evict();
+        Entry result = removeEntry(id);
+        if(result != null) {
+            fireRemovedEvent(id,result.getData());
         }
-        try {
-            result = parseEntry(redisNode.get(id));
-        } catch (ClassNotFoundException e) {
-            throw new IOException("Could not load object from store", e);
+        return result;
+    }
+
+    /**
+     * <p>
+     * Loads an object that has been previously stored under the specified key.
+     * The object is removed from the store.
+     * </p>
+     * @param id the id of the object
+     * @return the object, or <code>null></code> if the object could not be found
+     * @throws IOException if an error occurs
+     */
+    public Object evict(String id) throws IOException {
+        LOG.debug("Evicting object with id: " + id);
+        Entry result = removeEntry(id);
+        if(result != null) {
+            fireEvictedEvent(id, result.getData());
         }
-        redisNode.del(id);
         return result;
     }
 
@@ -154,6 +172,26 @@
         return result;
     }
 
+    /**
+     * Removes an object with the specified id.
+     * @param id
+     * @return
+     * @throws IOException
+     */
+    private Entry removeEntry(String id) throws IOException {
+        Entry result = null;
+        if (timeout > 0) {
+            evict();
+        }
+        try {
+            result = parseEntry(redisNode.get(id));
+        } catch (ClassNotFoundException e) {
+            throw new IOException("Could not load object from store", e);
+        }
+        redisNode.del(id);
+        return result;
+    }
+
 
     /**
      * Decodes a String to an Entry.
@@ -183,7 +221,7 @@
             }
             if (age > timeout) {
                 LOG.debug("Removing object with id " + key + " from store after " + age + " ms");
-                load(key);
+                evict(key);
             }
         }
     }
diff --git a/src/main/java/org/apache/servicemix/store/redis/RedisStoreFactory.java b/src/main/java/org/apache/servicemix/store/redis/RedisStoreFactory.java
index dba262b..cd021bd 100644
--- a/src/main/java/org/apache/servicemix/store/redis/RedisStoreFactory.java
+++ b/src/main/java/org/apache/servicemix/store/redis/RedisStoreFactory.java
@@ -18,17 +18,15 @@
 
 
 import org.apache.servicemix.store.Store;
-import org.apache.servicemix.store.StoreFactory;
+import org.apache.servicemix.store.StoreListener;
+import org.apache.servicemix.store.base.BaseStoreFactory;
 import org.idevlab.rjc.RedisNode;
-import org.idevlab.rjc.SingleRedisOperations;
-import org.idevlab.rjc.ds.DataSource;
-import org.idevlab.rjc.ds.SimpleDataSource;
 
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
-public class RedisStoreFactory implements StoreFactory {
+public class RedisStoreFactory extends BaseStoreFactory {
 
     private Map<String, RedisStore> stores = new HashMap<String, RedisStore>();
 
@@ -50,6 +48,10 @@
             } else {
                 store = new RedisStore(redisNode, storeName, timeout);
             }
+
+            for(StoreListener listener:storeListeners) {
+                store.addListener(listener);
+            }
             stores.put(name, store);
         }
         return store;
diff --git a/src/test/java/org/apache/servicemix/store/memory/TimeoutMemoryStoreTest.java b/src/test/java/org/apache/servicemix/store/memory/TimeoutMemoryStoreTest.java
index 195a4b5..c973278 100644
--- a/src/test/java/org/apache/servicemix/store/memory/TimeoutMemoryStoreTest.java
+++ b/src/test/java/org/apache/servicemix/store/memory/TimeoutMemoryStoreTest.java
@@ -19,20 +19,34 @@
 import junit.framework.TestCase;
 
 import org.apache.servicemix.store.Store;
+import org.apache.servicemix.store.StoreListener;
+import org.easymock.EasyMock;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import static org.easymock.EasyMock.*;
+
 
 /**
  * Test case for {@link TimeoutMemoryStore} 
  */
 public class TimeoutMemoryStoreTest extends TestCase {
     
-    private static final long TIMEOUT = 250L; 
+    private static final long TIMEOUT = 250L;
     
     private Store store;
     private final MemoryStoreFactory factory = new MemoryStoreFactory();
+
+    private StoreListener listener = createMock(StoreListener.class);
+
     
     public TimeoutMemoryStoreTest() {
         super();
         factory.setTimeout(TIMEOUT);
+        Set<StoreListener> listeners = new LinkedHashSet<StoreListener>();
+        listeners.add(listener);
+        factory.setStoreListeners(listeners);
     }
     
     @Override
@@ -46,10 +60,37 @@
         super.tearDown();
         factory.close(store);
     }
+
+    public void testAddAndRemove() throws Exception {
+        String id = "1";
+        String data = "Any kind of data...";
+        //Record behavior
+        listener.onAdd(id,data);
+        expectLastCall().times(1);
+        listener.onRemove(id,data);
+        expectLastCall().times(1);
+        replay(listener);
+        store.store(id,data);
+        store.peek(id);
+        store.load(id);
+        verify(listener);
+    }
     
     public void testTimeout() throws Exception {
-        String id = store.store("Any kind of data...");
-        Object data = store.load(id);
+        String data = "Any kind of data...";
+
+        //Record behavior
+        listener.onAdd(EasyMock.<String>anyObject(),EasyMock.<Object>anyObject());
+        expectLastCall().times(2);
+        listener.onRemove(EasyMock.<String>anyObject(), EasyMock.<Object>anyObject());
+        expectLastCall().once();
+        listener.onEvict(EasyMock.<String>anyObject(), EasyMock.<Object>anyObject());
+        expectLastCall().once();
+        replay(listener);
+
+
+        String id = store.store(data);
+        data = (String) store.load(id);
         assertNotNull(data);
         //now store it again and load it after the timeout
         store.store(id, data);
@@ -57,6 +98,6 @@
             wait(TIMEOUT * 2);
         }
         assertNull("Data should have been removed from store after timeout", store.load(id));
+        verify(listener);
     }
-
 }