Expose refresh policy (#280)
diff --git a/package/src/main/resources/etc/custom.system.properties b/package/src/main/resources/etc/custom.system.properties
index 668ec9f..92a0113 100644
--- a/package/src/main/resources/etc/custom.system.properties
+++ b/package/src/main/resources/etc/custom.system.properties
@@ -100,6 +100,10 @@
# hostA:9200,hostB:9200
# Note: the port number must be repeated for each host.
org.apache.unomi.elasticsearch.addresses=${env:UNOMI_ELASTICSEARCH_ADDRESSES:-localhost:9200}
+# refresh policy per item type in Json.
+# Valid values are WAIT_UNTIL/IMMEDIATE/NONE. The default refresh policy is NONE.
+# Example: "{"event":"WAIT_UNTIL","rule":"NONE"}
+org.apache.unomi.elasticsearch.itemTypeToRefreshPolicy=${env:UNOMI_ELASTICSEARCH_REFRESH_POLICY_PER_ITEM_TYPE:-}
org.apache.unomi.elasticsearch.fatalIllegalStateErrors=${env:UNOMI_ELASTICSEARCH_FATAL_STATE_ERRORS:-}
org.apache.unomi.elasticsearch.index.prefix=${env:UNOMI_ELASTICSEARCH_INDEXPREFIX:-context}
org.apache.unomi.elasticsearch.monthlyIndex.nbShards=${env:UNOMI_ELASTICSEARCH_MONTHLYINDEX_SHARDS:-5}
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 6ee5be0..eac14c7 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -17,13 +17,14 @@
package org.apache.unomi.persistence.elasticsearch;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.hazelcast.core.HazelcastInstance;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.config.RequestConfig;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.lucene.search.TotalHits;
@@ -167,6 +168,7 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
@@ -242,6 +244,7 @@
private boolean aggQueryThrowOnMissingDocs = false;
private Integer aggQueryMaxResponseSizeHttp = null;
private Integer clientSocketTimeout = null;
+ private Map<String, WriteRequest.RefreshPolicy> itemTypeToRefreshPolicy = new HashMap<>();
private Map<String, Map<String, Map<String, Object>>> knownMappings = new HashMap<>();
@@ -262,6 +265,13 @@
}
}
+ public void setItemTypeToRefreshPolicy(String itemTypeToRefreshPolicy) throws IOException {
+ if (!itemTypeToRefreshPolicy.isEmpty()) {
+ this.itemTypeToRefreshPolicy = new ObjectMapper().readValue(itemTypeToRefreshPolicy,
+ new TypeReference<HashMap<String, WriteRequest.RefreshPolicy>>() {});
+ }
+ }
+
public void setFatalIllegalStateErrors(String fatalIllegalStateErrors) {
this.fatalIllegalStateErrors = Arrays.stream(fatalIllegalStateErrors.split(","))
.map(i -> i.trim()).filter(i -> !i.isEmpty()).toArray(String[]::new);
@@ -842,6 +852,11 @@
}
@Override
+ public boolean isConsistent(Item item) {
+ return getRefreshPolicy(item.getItemType()) != WriteRequest.RefreshPolicy.NONE;
+ }
+
+ @Override
public boolean save(final Item item) {
return save(item, useBatchingForSave, alwaysOverwrite);
}
@@ -887,6 +902,7 @@
try {
if (bulkProcessor == null || !useBatching) {
+ indexRequest.setRefreshPolicy(getRefreshPolicy(item.getItemType()));
IndexResponse response = client.index(indexRequest, RequestOptions.DEFAULT);
setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm());
} else {
@@ -2403,4 +2419,11 @@
return INDEX_DATE_PREFIX + d;
}
+ private WriteRequest.RefreshPolicy getRefreshPolicy(String itemType) {
+ if (itemTypeToRefreshPolicy.containsKey(itemType)) {
+ return itemTypeToRefreshPolicy.get(itemType);
+ }
+ return WriteRequest.RefreshPolicy.NONE;
+ }
+
}
diff --git a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 5c2d12c..62c3cdf 100644
--- a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -56,6 +56,7 @@
<cm:property name="clientSocketTimeout" value="" />
<cm:property name="aggQueryMaxResponseSizeHttp" value="" />
<cm:property name="aggQueryThrowOnMissingDocs" value="false" />
+ <cm:property name="itemTypeToRefreshPolicy" value="" />
<cm:property name="itemClassesToCache" value="" />
<cm:property name="useBatchingForSave" value="false" />
<cm:property name="useBatchingForUpdate" value="true" />
@@ -129,6 +130,8 @@
<property name="aggregateQueryBucketSize" value="${es.aggregateQueryBucketSize}" />
<property name="aggQueryMaxResponseSizeHttp" value="${es.aggQueryMaxResponseSizeHttp}" />
<property name="aggQueryThrowOnMissingDocs" value="${es.aggQueryThrowOnMissingDocs}" />
+ <property name="itemTypeToRefreshPolicy" value="${es.itemTypeToRefreshPolicy}" />
+
<property name="clientSocketTimeout" value="${es.clientSocketTimeout}" />
<property name="metricsService" ref="metricsService" />
diff --git a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
index ce2fb67..71c2577 100644
--- a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
+++ b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
@@ -60,6 +60,11 @@
# max socket timeout in millis
clientSocketTimeout=${org.apache.unomi.elasticsearch.clientSocketTimeout:-}
+# refresh policy per item type in Json.
+# Valid values are WAIT_UNTIL/IMMEDIATE/NONE. The default refresh policy is NONE.
+# Example: "{"event":"WAIT_UNTIL","rule":"NONE"}
+itemTypeToRefreshPolicy=${org.apache.unomi.elasticsearch.itemTypeToRefreshPolicy:-}
+
# Retrun error in docs are missing in es aggregation calculation
aggQueryThrowOnMissingDocs=${org.apache.unomi.elasticsearch.aggQueryThrowOnMissingDocs:-false}
diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index 6fde7ee..36a4b50 100644
--- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -111,6 +111,14 @@
Object getSetting(String fieldName) throws NoSuchFieldException, IllegalAccessException;
/**
+ * Return true if the item which is saved in the persistence service is consistent
+ *
+ * @param item the item to the check if consistent
+ * @return {@code true} if the item is consistent, false otherwise
+ */
+ boolean isConsistent(Item item);
+
+ /**
* Persists the specified Item in the context server.
*
* @param item the item to persist
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/SetEventOccurenceCountAction.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/SetEventOccurenceCountAction.java
index cc04165..19b1afa 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/SetEventOccurenceCountAction.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/SetEventOccurenceCountAction.java
@@ -117,8 +117,10 @@
LocalDateTime eventTime = LocalDateTime.ofInstant(event.getTimeStamp().toInstant(),ZoneId.of("UTC"));
- if (inTimeRange(eventTime, numberOfDays, fromDateTime, toDateTime)) {
- count++;
+ if (!persistenceService.isConsistent(event)) {
+ if (inTimeRange(eventTime, numberOfDays, fromDateTime, toDateTime)) {
+ count++;
+ }
}
pastEvents.put((String) pastEventCondition.getParameter("generatedPropertyKey"), count);