Expose refresh policy (#280)

diff --git a/package/src/main/resources/etc/custom.system.properties b/package/src/main/resources/etc/custom.system.properties
index 668ec9f..92a0113 100644
--- a/package/src/main/resources/etc/custom.system.properties
+++ b/package/src/main/resources/etc/custom.system.properties
@@ -100,6 +100,10 @@
 # hostA:9200,hostB:9200
 # Note: the port number must be repeated for each host.
 org.apache.unomi.elasticsearch.addresses=${env:UNOMI_ELASTICSEARCH_ADDRESSES:-localhost:9200}
+# refresh policy per item type in Json.
+# Valid values are WAIT_UNTIL/IMMEDIATE/NONE. The default refresh policy is NONE.
+# Example: "{"event":"WAIT_UNTIL","rule":"NONE"}
+org.apache.unomi.elasticsearch.itemTypeToRefreshPolicy=${env:UNOMI_ELASTICSEARCH_REFRESH_POLICY_PER_ITEM_TYPE:-}
 org.apache.unomi.elasticsearch.fatalIllegalStateErrors=${env:UNOMI_ELASTICSEARCH_FATAL_STATE_ERRORS:-}
 org.apache.unomi.elasticsearch.index.prefix=${env:UNOMI_ELASTICSEARCH_INDEXPREFIX:-context}
 org.apache.unomi.elasticsearch.monthlyIndex.nbShards=${env:UNOMI_ELASTICSEARCH_MONTHLYINDEX_SHARDS:-5}
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 6ee5be0..eac14c7 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -17,13 +17,14 @@
 
 package org.apache.unomi.persistence.elasticsearch;
 
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.hazelcast.core.HazelcastInstance;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.HttpHost;
 import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.config.RequestConfig;
 import org.apache.http.conn.ssl.NoopHostnameVerifier;
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.lucene.search.TotalHits;
@@ -167,6 +168,7 @@
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.elasticsearch.index.query.QueryBuilders.termQuery;
 
@@ -242,6 +244,7 @@
     private boolean aggQueryThrowOnMissingDocs = false;
     private Integer aggQueryMaxResponseSizeHttp = null;
     private Integer clientSocketTimeout = null;
+    private Map<String, WriteRequest.RefreshPolicy> itemTypeToRefreshPolicy = new HashMap<>();
 
     private Map<String, Map<String, Map<String, Object>>> knownMappings = new HashMap<>();
 
@@ -262,6 +265,13 @@
         }
     }
 
+    public void setItemTypeToRefreshPolicy(String itemTypeToRefreshPolicy) throws IOException {
+        if (!itemTypeToRefreshPolicy.isEmpty()) {
+            this.itemTypeToRefreshPolicy = new ObjectMapper().readValue(itemTypeToRefreshPolicy,
+                        new TypeReference<HashMap<String, WriteRequest.RefreshPolicy>>() {});
+        }
+    }
+
     public void setFatalIllegalStateErrors(String fatalIllegalStateErrors) {
         this.fatalIllegalStateErrors = Arrays.stream(fatalIllegalStateErrors.split(","))
                 .map(i -> i.trim()).filter(i -> !i.isEmpty()).toArray(String[]::new);
@@ -842,6 +852,11 @@
     }
 
     @Override
+    public boolean isConsistent(Item item) {
+        return getRefreshPolicy(item.getItemType()) != WriteRequest.RefreshPolicy.NONE;
+    }
+
+    @Override
     public boolean save(final Item item) {
         return save(item, useBatchingForSave, alwaysOverwrite);
     }
@@ -887,6 +902,7 @@
 
                     try {
                         if (bulkProcessor == null || !useBatching) {
+                            indexRequest.setRefreshPolicy(getRefreshPolicy(item.getItemType()));
                             IndexResponse response = client.index(indexRequest, RequestOptions.DEFAULT);
                             setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm());
                         } else {
@@ -2403,4 +2419,11 @@
         return INDEX_DATE_PREFIX + d;
     }
 
+    private WriteRequest.RefreshPolicy getRefreshPolicy(String itemType) {
+        if (itemTypeToRefreshPolicy.containsKey(itemType)) {
+            return itemTypeToRefreshPolicy.get(itemType);
+        }
+        return WriteRequest.RefreshPolicy.NONE;
+    }
+
 }
diff --git a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 5c2d12c..62c3cdf 100644
--- a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -56,6 +56,7 @@
             <cm:property name="clientSocketTimeout" value="" />
             <cm:property name="aggQueryMaxResponseSizeHttp" value="" />
             <cm:property name="aggQueryThrowOnMissingDocs" value="false" />
+            <cm:property name="itemTypeToRefreshPolicy" value="" />
             <cm:property name="itemClassesToCache" value="" />
             <cm:property name="useBatchingForSave" value="false" />
             <cm:property name="useBatchingForUpdate" value="true" />
@@ -129,6 +130,8 @@
         <property name="aggregateQueryBucketSize" value="${es.aggregateQueryBucketSize}" />
         <property name="aggQueryMaxResponseSizeHttp" value="${es.aggQueryMaxResponseSizeHttp}" />
         <property name="aggQueryThrowOnMissingDocs" value="${es.aggQueryThrowOnMissingDocs}" />
+        <property name="itemTypeToRefreshPolicy" value="${es.itemTypeToRefreshPolicy}" />
+
         <property name="clientSocketTimeout" value="${es.clientSocketTimeout}" />
 
         <property name="metricsService" ref="metricsService" />
diff --git a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
index ce2fb67..71c2577 100644
--- a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
+++ b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
@@ -60,6 +60,11 @@
 # max socket timeout in millis
 clientSocketTimeout=${org.apache.unomi.elasticsearch.clientSocketTimeout:-}
 
+# refresh policy per item type in Json.
+# Valid values are WAIT_UNTIL/IMMEDIATE/NONE. The default refresh policy is NONE.
+# Example: "{"event":"WAIT_UNTIL","rule":"NONE"}
+itemTypeToRefreshPolicy=${org.apache.unomi.elasticsearch.itemTypeToRefreshPolicy:-}
+
 # Retrun error in docs are missing in es aggregation calculation
 aggQueryThrowOnMissingDocs=${org.apache.unomi.elasticsearch.aggQueryThrowOnMissingDocs:-false}
 
diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index 6fde7ee..36a4b50 100644
--- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -111,6 +111,14 @@
     Object getSetting(String fieldName) throws NoSuchFieldException, IllegalAccessException;
 
     /**
+     * Return true if the item which is saved in the persistence service is consistent
+     *
+     * @param item the item to the check if consistent
+     * @return {@code true} if the item is consistent, false otherwise
+     */
+    boolean isConsistent(Item item);
+
+    /**
      * Persists the specified Item in the context server.
      *
      * @param item the item to persist
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/SetEventOccurenceCountAction.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/SetEventOccurenceCountAction.java
index cc04165..19b1afa 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/SetEventOccurenceCountAction.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/SetEventOccurenceCountAction.java
@@ -117,8 +117,10 @@
 
         LocalDateTime eventTime = LocalDateTime.ofInstant(event.getTimeStamp().toInstant(),ZoneId.of("UTC"));
 
-        if (inTimeRange(eventTime, numberOfDays, fromDateTime, toDateTime)) {
-            count++;
+        if (!persistenceService.isConsistent(event)) {
+            if (inTimeRange(eventTime, numberOfDays, fromDateTime, toDateTime)) {
+                count++;
+            }
         }
 
         pastEvents.put((String) pastEventCondition.getParameter("generatedPropertyKey"), count);