UNOMI-371 add optional support for optimistic concurrency control (if_seq_no) (#223)
diff --git a/api/src/main/java/org/apache/unomi/api/Item.java b/api/src/main/java/org/apache/unomi/api/Item.java
index 72f0ae6..2d5ff71 100644
--- a/api/src/main/java/org/apache/unomi/api/Item.java
+++ b/api/src/main/java/org/apache/unomi/api/Item.java
@@ -21,6 +21,7 @@
import org.slf4j.LoggerFactory;
import java.io.Serializable;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -63,6 +64,7 @@
protected String itemType;
protected String scope;
protected Long version;
+ protected Map<String, Object> systemMetadata = new HashMap<>();
public Item() {
this.itemType = getItemType(this.getClass());
@@ -140,4 +142,12 @@
public void setVersion(Long version) {
this.version = version;
}
+
+ public Object getSystemMetadata(String key) {
+ return systemMetadata.get(key);
+ }
+
+ public void setSystemMetadata(String key, Object value) {
+ systemMetadata.put(key, value);
+ }
}
diff --git a/extensions/lists-extension/services/src/main/java/org/apache/unomi/services/UserListServiceImpl.java b/extensions/lists-extension/services/src/main/java/org/apache/unomi/services/UserListServiceImpl.java
index dc3bbc8..37ca72e 100644
--- a/extensions/lists-extension/services/src/main/java/org/apache/unomi/services/UserListServiceImpl.java
+++ b/extensions/lists-extension/services/src/main/java/org/apache/unomi/services/UserListServiceImpl.java
@@ -94,7 +94,7 @@
if(index != -1){
((List) profileSystemProperties.get("lists")).remove(index);
profileSystemProperties.put("lastUpdated", new Date());
- persistenceService.update(p.getItemId(), null, Profile.class, "systemProperties", profileSystemProperties);
+ persistenceService.update(p, null, Profile.class, "systemProperties", profileSystemProperties);
}
}
}
diff --git a/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java b/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java
index 1675e11..3d3d68d 100644
--- a/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java
+++ b/extensions/privacy-extension/services/src/main/java/org/apache/unomi/privacy/internal/PrivacyServiceImpl.java
@@ -137,7 +137,7 @@
persistenceService.save(session);
List<Event> events = eventService.searchEvents(session.getItemId(), new String[0], null, 0, -1, null).getList();
for (Event event : events) {
- persistenceService.update(event.getItemId(), event.getTimeStamp(), Event.class, "profileId", newProfile.getItemId());
+ persistenceService.update(event, event.getTimeStamp(), Event.class, "profileId", newProfile.getItemId());
}
}
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceWithoutOverwriteIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceWithoutOverwriteIT.java
new file mode 100644
index 0000000..5c2ed5b
--- /dev/null
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceWithoutOverwriteIT.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+package org.apache.unomi.itests;
+
+import org.apache.unomi.api.Profile;
+import org.apache.unomi.api.services.DefinitionsService;
+import org.apache.unomi.api.services.ProfileService;
+import org.apache.unomi.persistence.spi.PersistenceService;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerSuite;
+import org.ops4j.pax.exam.util.Filter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.ops4j.pax.exam.CoreOptions.systemProperty;
+
+/**
+ * An integration test for the profile service
+ */
+@RunWith(PaxExam.class)
+@ExamReactorStrategy(PerSuite.class)
+public class ProfileServiceWithoutOverwriteIT extends BaseIT {
+ private final static Logger LOGGER = LoggerFactory.getLogger(ProfileServiceWithoutOverwriteIT.class);
+
+ private final static String TEST_PROFILE_ID = "test-profile-id";
+
+ @Configuration
+ public Option[] config() throws InterruptedException {
+ List<Option> options = new ArrayList<>();
+ options.addAll(Arrays.asList(super.config()));
+ options.add(systemProperty("org.apache.unomi.elasticsearch.throwExceptions").value("true"));
+ options.add(systemProperty("org.apache.unomi.elasticsearch.alwaysOverwrite").value("false"));
+ return options.toArray(new Option[0]);
+ }
+
+ @Inject @Filter(timeout = 600000)
+ protected ProfileService profileService;
+
+ @Inject
+ @Filter(timeout = 600000)
+ protected PersistenceService persistenceService;
+
+ @Inject
+ @Filter(timeout = 600000)
+ protected DefinitionsService definitionsService;
+
+ @Before
+ public void setUp() {
+ TestUtils.removeAllProfiles(definitionsService, persistenceService);
+ }
+
+ private Profile setupWithoutOverwriteTests() {
+ Profile profile = new Profile();
+ profile.setItemId(TEST_PROFILE_ID);
+ profile.setProperty("country", "test-country");
+ profile.setProperty("state", "test-state");
+ profileService.save(profile);
+
+ return profile;
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testSaveProfileWithoutOverwriteSameProfileThrowsException() {
+ Profile profile = setupWithoutOverwriteTests();
+ profile.setProperty("country", "test2-country");
+ profileService.save(profile);
+ }
+
+ @Test
+ public void testSaveProfileWithoutOverwriteSavesAfterReload() throws InterruptedException {
+ Profile profile = setupWithoutOverwriteTests();
+ String profileId = profile.getItemId();
+ Thread.sleep(4000);
+
+ Profile updatedProfile = profileService.load(profileId);
+ updatedProfile.setProperty("country", "test2-country");
+ profileService.save(updatedProfile);
+
+ Thread.sleep(4000);
+
+ Profile profileWithNewCountry = profileService.load(profileId);
+ assertEquals(profileWithNewCountry.getProperty("country"), "test2-country");
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testSaveProfileWithoutOverwriteWrongSeqNoThrowsException() throws InterruptedException {
+ Profile profile = setupWithoutOverwriteTests();
+ String profileId = profile.getItemId();
+
+ Thread.sleep(4000);
+
+ Profile updatedProfile = profileService.load(profileId);
+ updatedProfile.setProperty("country", "test2-country");
+ updatedProfile.setMetadata("seq_no", 1L);
+ profileService.save(updatedProfile);
+ }
+}
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 9a1e6b1..411913d 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -23,7 +23,6 @@
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.config.RequestConfig;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.lucene.search.TotalHits;
@@ -50,6 +49,7 @@
import org.apache.unomi.persistence.spi.aggregate.NumericRangeAggregate;
import org.apache.unomi.persistence.spi.aggregate.TermsAggregate;
import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
@@ -63,6 +63,7 @@
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
@@ -77,6 +78,7 @@
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.client.core.MainResponse;
@@ -180,6 +182,9 @@
public static final String BULK_PROCESSOR_FLUSH_INTERVAL = "bulkProcessor.flushInterval";
public static final String BULK_PROCESSOR_BACKOFF_POLICY = "bulkProcessor.backoffPolicy";
public static final String INDEX_DATE_PREFIX = "date-";
+ public static final String SEQ_NO = "seq_no";
+ public static final String PRIMARY_TERM = "primary_term";
+
private static final Logger logger = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName());
private static boolean throwExceptions = false;
private RestHighLevelClient client;
@@ -234,6 +239,7 @@
private Integer aggQueryMaxResponseSizeHttp = null;
private Integer clientSocketTimeout = null;
+ private boolean alwaysOverwrite = true;
private Map<String, Map<String, Map<String, Object>>> knownMappings = new HashMap<>();
@@ -408,6 +414,11 @@
public void setThrowExceptions(boolean throwExceptions) {
this.throwExceptions = throwExceptions;
}
+ public void setAlwaysOverwrite(boolean alwaysOverwrite) {
+ this.alwaysOverwrite = alwaysOverwrite;
+ }
+
+
public void start() throws Exception {
// on startup
@@ -793,8 +804,7 @@
if (response.isExists()) {
String sourceAsString = response.getSourceAsString();
final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
- value.setItemId(response.getId());
- value.setVersion(response.getVersion());
+ setMetadata(value, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm());
putInCache(itemId, value);
return value;
} else {
@@ -818,13 +828,28 @@
}
+ private void setMetadata(Item item, String id, long version, long seqNo, long primaryTerm) {
+ item.setItemId(id);
+ item.setVersion(version);
+ item.setSystemMetadata(SEQ_NO, seqNo);
+ item.setSystemMetadata(PRIMARY_TERM, primaryTerm);
+ }
+
@Override
public boolean save(final Item item) {
- return save(item, useBatchingForSave);
+ return save(item, useBatchingForSave, alwaysOverwrite);
}
@Override
public boolean save(final Item item, final boolean useBatching) {
+ return save(item, useBatching, alwaysOverwrite);
+ }
+
+ @Override
+ public boolean save(final Item item, final Boolean useBatchingOption, final Boolean alwaysOverwriteOption) {
+ final boolean useBatching = useBatchingOption == null ? this.useBatchingForSave : useBatchingOption;
+ final boolean alwaysOverwrite = alwaysOverwriteOption == null ? this.alwaysOverwrite : alwaysOverwriteOption;
+
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".saveItem", this.bundleContext, this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws Exception {
try {
@@ -836,13 +861,28 @@
IndexRequest indexRequest = new IndexRequest(index);
indexRequest.id(itemId);
indexRequest.source(source, XContentType.JSON);
+
+ if (!alwaysOverwrite) {
+ Long seqNo = (Long)item.getSystemMetadata(SEQ_NO);
+ Long primaryTerm = (Long)item.getSystemMetadata(PRIMARY_TERM);
+
+ if (seqNo != null && primaryTerm != null) {
+ indexRequest.setIfSeqNo(seqNo);
+ indexRequest.setIfPrimaryTerm(primaryTerm);
+ }
+ else {
+ indexRequest.opType(DocWriteRequest.OpType.CREATE);
+ }
+ }
+
if (routingByType.containsKey(itemType)) {
indexRequest.routing(routingByType.get(itemType));
}
try {
if (bulkProcessor == null || !useBatching) {
- client.index(indexRequest, RequestOptions.DEFAULT);
+ IndexResponse response = client.index(indexRequest, RequestOptions.DEFAULT);
+ setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm());
} else {
bulkProcessor.add(indexRequest);
}
@@ -865,26 +905,43 @@
}
@Override
- public boolean update(final String itemId, final Date dateHint, final Class clazz, final String propertyName, final Object propertyValue) {
- return update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue));
+ public boolean update(final Item item, final Date dateHint, final Class clazz, final String propertyName, final Object propertyValue) {
+ return update(item, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue));
}
@Override
- public boolean update(final String itemId, final Date dateHint, final Class clazz, final Map source) {
+ public boolean update(final Item item, final Date dateHint, final Class clazz, final Map source) {
+ return update(item, dateHint, clazz, source, alwaysOverwrite);
+ }
+
+ @Override
+ public boolean update(final Item item, final Date dateHint, final Class clazz, final Map source, final boolean alwaysOverwrite) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateItem", this.bundleContext, this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws Exception {
try {
String itemType = Item.getItemType(clazz);
- UpdateRequest updateRequest = new UpdateRequest(getIndex(itemType, dateHint), itemId);
+ UpdateRequest updateRequest = new UpdateRequest(getIndex(itemType, dateHint), item.getItemId());
updateRequest.doc(source);
+
+ if (!alwaysOverwrite) {
+ Long seqNo = (Long)item.getSystemMetadata(SEQ_NO);
+ Long primaryTerm = (Long)item.getSystemMetadata(PRIMARY_TERM);
+
+ if (seqNo != null && primaryTerm != null) {
+ updateRequest.setIfSeqNo(seqNo);
+ updateRequest.setIfPrimaryTerm(primaryTerm);
+ }
+ }
+
if (bulkProcessor == null) {
- client.update(updateRequest, RequestOptions.DEFAULT);
+ UpdateResponse response = client.update(updateRequest, RequestOptions.DEFAULT);
+ setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm());
} else {
bulkProcessor.add(updateRequest);
}
return true;
} catch (IndexNotFoundException e) {
- throw new Exception("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e);
+ throw new Exception("No index found for itemType=" + clazz.getName() + "itemId=" + item.getItemId(), e);
}
}
}.catchingExecuteInClassLoader(true);
@@ -953,7 +1010,7 @@
}
@Override
- public boolean updateWithScript(final String itemId, final Date dateHint, final Class<?> clazz, final String script, final Map<String, Object> scriptParams) {
+ public boolean updateWithScript(final Item item, final Date dateHint, final Class<?> clazz, final String script, final Map<String, Object> scriptParams) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithScript", this.bundleContext, this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws Exception {
try {
@@ -963,17 +1020,26 @@
Script actualScript = new Script(ScriptType.INLINE, "painless", script, scriptParams);
- UpdateRequest updateRequest = new UpdateRequest(index, itemId);
+ UpdateRequest updateRequest = new UpdateRequest(index, item.getItemId());
+
+ Long seqNo = (Long)item.getSystemMetadata(SEQ_NO);
+ Long primaryTerm = (Long)item.getSystemMetadata(PRIMARY_TERM);
+
+ if (seqNo != null && primaryTerm != null) {
+ updateRequest.setIfSeqNo(seqNo);
+ updateRequest.setIfPrimaryTerm(primaryTerm);
+ }
updateRequest.script(actualScript);
if (bulkProcessor == null) {
- client.update(updateRequest, RequestOptions.DEFAULT);
+ UpdateResponse response = client.update(updateRequest, RequestOptions.DEFAULT);
+ setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm());
} else {
bulkProcessor.add(updateRequest);
}
return true;
} catch (IndexNotFoundException e) {
- throw new Exception("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e);
+ throw new Exception("No index found for itemType=" + clazz.getName() + "itemId=" + item.getItemId(), e);
}
}
}.catchingExecuteInClassLoader(true);
@@ -1596,6 +1662,7 @@
SearchRequest searchRequest = new SearchRequest(getIndexNameForQuery(itemType));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.fetchSource(true)
+ .seqNoAndPrimaryTerm(true)
.query(query)
.size(size < 0 ? defaultQueryLimit : size)
.from(offset);
@@ -1653,8 +1720,7 @@
// add hit to results
String sourceAsString = searchHit.getSourceAsString();
final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
- value.setItemId(searchHit.getId());
- value.setVersion(searchHit.getVersion());
+ setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm());
results.add(value);
}
@@ -1684,8 +1750,7 @@
for (SearchHit searchHit : searchHits) {
String sourceAsString = searchHit.getSourceAsString();
final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
- value.setItemId(searchHit.getId());
- value.setVersion(searchHit.getVersion());
+ setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm());
results.add(value);
}
}
@@ -1731,8 +1796,7 @@
// add hit to results
String sourceAsString = searchHit.getSourceAsString();
final T value = ESCustomObjectMapper.getObjectMapper().readValue(sourceAsString, clazz);
- value.setItemId(searchHit.getId());
- value.setVersion(searchHit.getVersion());
+ setMetadata(value, searchHit.getId(), searchHit.getVersion(), searchHit.getSeqNo(), searchHit.getPrimaryTerm());
results.add(value);
}
}
diff --git a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 9f10ae2..5e746ad 100644
--- a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -64,6 +64,8 @@
<cm:property name="sslEnable" value="false" />
<cm:property name="sslTrustAllCertificates" value="false" />
<cm:property name="throwExceptions" value="false" />
+ <cm:property name="alwaysOverwrite" value="true" />
+
</cm:default-properties>
</cm:property-placeholder>
@@ -138,6 +140,7 @@
<property name="sslEnable" value="${es.sslEnable}" />
<property name="sslTrustAllCertificates" value="${es.sslTrustAllCertificates}" />
<property name="throwExceptions" value="${es.throwExceptions}" />
+ <property name="alwaysOverwrite" value="${es.alwaysOverwrite}" />
</bean>
<!-- We use a listener here because using the list directly for listening to proxies coming from the same bundle didn't seem to work -->
diff --git a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
index 147e99f..ac30c91 100644
--- a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
+++ b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
@@ -26,7 +26,6 @@
monthlyIndex.numberOfReplicas=${org.apache.unomi.elasticsearch.monthlyIndex.nbReplicas:-0}
monthlyIndex.indexMappingTotalFieldsLimit=${org.apache.unomi.elasticsearch.monthlyIndex.indexMappingTotalFieldsLimit:-1000}
monthlyIndex.indexMaxDocValueFieldsSearch=${org.apache.unomi.elasticsearch.monthlyIndex.indexMaxDocValueFieldsSearch:-1000}
-monthlyIndex.itemsMonthlyIndexedOverride=${org.apache.unomi.elasticsearch.monthlyIndex.itemsMonthlyIndexedOverride:-event,session}
numberOfShards=${org.apache.unomi.elasticsearch.defaultIndex.nbShards:-5}
numberOfReplicas=${org.apache.unomi.elasticsearch.defaultIndex.nbReplicas:-0}
indexMappingTotalFieldsLimit=${org.apache.unomi.elasticsearch.defaultIndex.indexMappingTotalFieldsLimit:-1000}
@@ -73,4 +72,6 @@
sslTrustAllCertificates=${org.apache.unomi.elasticsearch.sslTrustAllCertificates:-false}
# Errors
-throwExceptions=${org.apache.unomi.elasticsearch.throwExceptions:-false}
\ No newline at end of file
+throwExceptions=${org.apache.unomi.elasticsearch.throwExceptions:-false}
+
+alwaysOverwrite=${org.apache.unomi.elasticsearch.alwaysOverwrite:-true}
\ No newline at end of file
diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index 2b91b2f..ef669a0 100644
--- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -128,41 +128,65 @@
boolean save(Item item, boolean useBatching);
/**
+ * Persists the specified Item in the context server.
+ *
+ * @param item the item to persist
+ * @param useBatching whether to use batching or not for saving the item. If activating there may be a delay between
+ * the call to this method and the actual saving in the persistence backend
+ * @param alwaysOverwrite whether to overwrite a document even if we are holding an old item when saving
+ *
+ * @return {@code true} if the item was properly persisted, {@code false} otherwise
+ */
+ boolean save(Item item, Boolean useBatching, Boolean alwaysOverwrite);
+
+ /**
* Updates the item of the specified class and identified by the specified identifier with new property values provided as name - value pairs in the specified Map.
*
- * @param itemId the identifier of the item we want to update
+ * @param item the item we want to update
* @param dateHint a Date helping in identifying where the item is located
* @param clazz the Item subclass of the item to update
* @param source a Map with entries specifying as key the property name to update and as value its new value
* @return {@code true} if the update was successful, {@code false} otherwise
*/
- boolean update(String itemId, Date dateHint, Class<?> clazz, Map<?, ?> source);
+ boolean update(Item item, Date dateHint, Class<?> clazz, Map<?, ?> source);
/**
* Updates the item of the specified class and identified by the specified identifier with a new property value for the specified property name. Same as
* {@code update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue))}
*
- * @param itemId the identifier of the item we want to update
+ * @param item the item we want to update
* @param dateHint a Date helping in identifying where the item is located
* @param clazz the Item subclass of the item to update
* @param propertyName the name of the property to update
* @param propertyValue the new value of the property
* @return {@code true} if the update was successful, {@code false} otherwise
*/
- boolean update(String itemId, Date dateHint, Class<?> clazz, String propertyName, Object propertyValue);
+ boolean update(Item item, Date dateHint, Class<?> clazz, String propertyName, Object propertyValue);
+
+ /**
+ * Updates the item of the specified class and identified by the specified identifier with new property values provided as name - value pairs in the specified Map.
+ *
+ * @param item the item we want to update
+ * @param dateHint a Date helping in identifying where the item is located
+ * @param clazz the Item subclass of the item to update
+ * @param source a Map with entries specifying as key the property name to update and as value its new value
+ * @param alwaysOverwrite whether to overwrite a document even if we are holding an old item when saving
+ * @return {@code true} if the update was successful, {@code false} otherwise
+ */
+ boolean update(Item item, Date dateHint, Class<?> clazz, Map<?, ?> source, final boolean alwaysOverwrite);
/**
* Updates the item of the specified class and identified by the specified identifier with a new property value for the specified property name. Same as
* {@code update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue))}
*
- * @param itemId the identifier of the item we want to update
+ * @param item the item we want to update
* @param dateHint a Date helping in identifying where the item is located
* @param clazz the Item subclass of the item to update
* @param script inline script
* @param scriptParams script params
* @return {@code true} if the update was successful, {@code false} otherwise
*/
- boolean updateWithScript(String itemId, Date dateHint, Class<?> clazz, String script, Map<String, Object> scriptParams);
+ boolean updateWithScript(Item item, Date dateHint, Class<?> clazz, String script, Map<String, Object> scriptParams);
/**
* Updates the items of the specified class by a query with a new property value for the specified property name
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
index a496ddb..ffdf626 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/actions/MergeProfilesOnPropertyAction.java
@@ -178,12 +178,12 @@
}
for (Session session : sessions) {
- persistenceService.update(session.getItemId(), session.getTimeStamp(), Session.class, "profileId", anonymousBrowsing ? null : masterProfileId);
+ persistenceService.update(session, session.getTimeStamp(), Session.class, "profileId", anonymousBrowsing ? null : masterProfileId);
}
List<Event> events = persistenceService.query("profileId", profileId, null, Event.class);
for (Event event : events) {
- persistenceService.update(event.getItemId(), event.getTimeStamp(), Event.class, "profileId", anonymousBrowsing ? null : masterProfileId);
+ persistenceService.update(event, event.getTimeStamp(), Event.class, "profileId", anonymousBrowsing ? null : masterProfileId);
}
// we must mark all the profiles that we merged into the master as merged with the master, and they will
// be deleted upon next load
@@ -192,7 +192,7 @@
sourceMap.put("mergedWith", masterProfileId);
profile.setSystemProperty("lastUpdated", new Date());
sourceMap.put("systemProperties", profile.getSystemProperties());
- persistenceService.update(profile.getItemId(), null, Profile.class, sourceMap);
+ persistenceService.update(profile, null, Profile.class, sourceMap);
}
}
} catch (Exception e) {
diff --git a/plugins/mail/src/main/java/org/apache/unomi/plugins/mail/actions/SendMailAction.java b/plugins/mail/src/main/java/org/apache/unomi/plugins/mail/actions/SendMailAction.java
index 8e03175..acdb1eb 100644
--- a/plugins/mail/src/main/java/org/apache/unomi/plugins/mail/actions/SendMailAction.java
+++ b/plugins/mail/src/main/java/org/apache/unomi/plugins/mail/actions/SendMailAction.java
@@ -116,7 +116,7 @@
event.getProfile().setSystemProperty("notificationAck", profileNotif);
event.getProfile().setSystemProperty("lastUpdated", new Date());
- persistenceService.update(event.getProfile().getItemId(), null, Profile.class, "systemProperties", event.getProfile().getSystemProperties());
+ persistenceService.update(event.getProfile(), null, Profile.class, "systemProperties", event.getProfile().getSystemProperties());
ST stringTemplate = new ST(template, '$', '$');
stringTemplate.add("profile", event.getProfile());
diff --git a/services/src/main/java/org/apache/unomi/services/impl/events/EventServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/events/EventServiceImpl.java
index 3f5cd23..59da4e0 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/events/EventServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/events/EventServiceImpl.java
@@ -166,7 +166,7 @@
boolean saveSucceeded = true;
if (event.isPersistent()) {
- saveSucceeded = persistenceService.save(event);
+ saveSucceeded = persistenceService.save(event, null, true);
}
int changes;
diff --git a/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
index d77173c..734d856 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
@@ -475,7 +475,7 @@
}
allRuleStatistics.put(ruleStatistics.getItemId(), ruleStatistics);
if (mustPersist) {
- persistenceService.save(ruleStatistics);
+ persistenceService.save(ruleStatistics, null, true);
}
}
// now let's iterate over the rules coming from the persistence service, as we may have new ones.
diff --git a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
index 36a2a67..e971432 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
@@ -365,7 +365,7 @@
sourceMap.put("segments", profileToRemove.getSegments());
profileToRemove.setSystemProperty("lastUpdated", new Date());
sourceMap.put("systemProperties", profileToRemove.getSystemProperties());
- persistenceService.update(profileToRemove.getItemId(), null, Profile.class, sourceMap);
+ persistenceService.update(profileToRemove, null, Profile.class, sourceMap);
updatedProfileCount++;
}
logger.info("Removed segment from {} profiles in {} ms", updatedProfileCount, System.currentTimeMillis() - profileRemovalStartTime);
@@ -724,7 +724,7 @@
// todo remove profile properties ?
persistenceService.remove(previousRule.getItemId(), Rule.class);
} else {
- persistenceService.update(previousRule.getItemId(), null, Rule.class, "linkedItems", previousRule.getLinkedItems());
+ persistenceService.update(previousRule, null, Rule.class, "linkedItems", previousRule.getLinkedItems());
}
}
}
@@ -862,7 +862,7 @@
systemProperties.put("lastUpdated", new Date());
Profile profile = new Profile();
profile.setItemId(profileId);
- persistenceService.update(profile.getItemId(), null, Profile.class, "systemProperties", systemProperties);
+ persistenceService.update(profile, null, Profile.class, "systemProperties", systemProperties);
} catch (Exception e) {
logger.error("Error updating profile {} past event system properties", profileId, e);
}
@@ -930,7 +930,7 @@
sourceMap.put("segments", profileToAdd.getSegments());
profileToAdd.setSystemProperty("lastUpdated", new Date());
sourceMap.put("systemProperties", profileToAdd.getSystemProperties());
- persistenceService.update(profileToAdd.getItemId(), null, Profile.class, sourceMap);
+ persistenceService.update(profileToAdd, null, Profile.class, sourceMap);
Event profileUpdated = new Event("profileUpdated", null, profileToAdd, null, null, profileToAdd, new Date());
profileUpdated.setPersistent(false);
eventService.send(profileUpdated);
@@ -950,7 +950,7 @@
sourceMap.put("segments", profileToRemove.getSegments());
profileToRemove.setSystemProperty("lastUpdated", new Date());
sourceMap.put("systemProperties", profileToRemove.getSystemProperties());
- persistenceService.update(profileToRemove.getItemId(), null, Profile.class, sourceMap);
+ persistenceService.update(profileToRemove, null, Profile.class, sourceMap);
Event profileUpdated = new Event("profileUpdated", null, profileToRemove, null, null, profileToRemove, new Date());
profileUpdated.setPersistent(false);
eventService.send(profileUpdated);
@@ -973,7 +973,7 @@
sourceMap.put("segments", profileToRemove.getSegments());
profileToRemove.setSystemProperty("lastUpdated", new Date());
sourceMap.put("systemProperties", profileToRemove.getSystemProperties());
- persistenceService.update(profileToRemove.getItemId(), null, Profile.class, sourceMap);
+ persistenceService.update(profileToRemove, null, Profile.class, sourceMap);
Event profileUpdated = new Event("profileUpdated", null, profileToRemove, null, null, profileToRemove, new Date());
profileUpdated.setPersistent(false);
eventService.send(profileUpdated);