update segments with its own bulk request, and use retry for failures (#237)
diff --git a/package/src/main/resources/etc/custom.system.properties b/package/src/main/resources/etc/custom.system.properties
index 138eb0e..77c8d0d 100644
--- a/package/src/main/resources/etc/custom.system.properties
+++ b/package/src/main/resources/etc/custom.system.properties
@@ -151,6 +151,14 @@
# When performing segment updates, this controls the size of the scrolling query size used to iterate over all the
# profiles that need updating
org.apache.unomi.segment.update.batchSize=${env:UNOMI_SEGMENT_UPDATE_BATCHSIZE:-1000}
+# Run Batch request separately for updating segments in profiles
+org.apache.unomi.segment.batch.update=${env:UNOMI_SEGMENT_BATCH_PROFILE_UPDATE:-false}
+# Send Profile Updated Event for every profile segment update
+org.apache.unomi.segment.send.profile.update.event=${env: UNOMI_SEGMENT_SEND_PROFILE_UPDATE_EVENT:-true}
+# When performing segment updates, can retry an update in case of an error to a single profile
+org.apache.unomi.services.segment.max.retries.update.profile.segment=${env:UNOMI_SEGMENT_UPDATE_MAX_RETRIES:-0}
+# When performing retry of segment update after a request was failed, delay of requests
+org.apache.unomi.services.segment.update.segment.retry.seconds.delay=${env:UNOMI_SEGMENT_UPDATE_RETRY_DELAY:-1}
# The interval in milliseconds to use to reload the definitions (condition types and action types)
org.apache.unomi.definitions.refresh.interval=${env:UNOMI_DEFINITIONS_REFRESH_INTERVAL:-10000}
# The interval in milliseconds to use to reload the property types
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 2f5a3bc..ae73987 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -23,6 +23,7 @@
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.config.RequestConfig;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.lucene.search.TotalHits;
@@ -159,6 +160,7 @@
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -235,12 +237,12 @@
private Set<String> itemClassesToCacheSet = new HashSet<>();
private String itemClassesToCache;
private boolean useBatchingForSave = false;
+ private boolean useBatchingForUpdate = true;
+ private boolean alwaysOverwrite = true;
private boolean aggQueryThrowOnMissingDocs = false;
private Integer aggQueryMaxResponseSizeHttp = null;
private Integer clientSocketTimeout = null;
- private boolean alwaysOverwrite = true;
-
private Map<String, Map<String, Map<String, Object>>> knownMappings = new HashMap<>();
public void setBundleContext(BundleContext bundleContext) {
@@ -390,6 +392,10 @@
this.useBatchingForSave = useBatchingForSave;
}
+ public void setUseBatchingForUpdate(boolean useBatchingForUpdate) {
+ this.useBatchingForUpdate = useBatchingForUpdate;
+ }
+
public void setUsername(String username) {
this.username = username;
}
@@ -919,21 +925,9 @@
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateItem", this.bundleContext, this.fatalIllegalStateErrors) {
protected Boolean execute(Object... args) throws Exception {
try {
- String itemType = Item.getItemType(clazz);
- UpdateRequest updateRequest = new UpdateRequest(getIndex(itemType, dateHint), item.getItemId());
- updateRequest.doc(source);
+ UpdateRequest updateRequest = createUpdateRequest(clazz, dateHint, item, source, alwaysOverwrite);
- if (!alwaysOverwrite) {
- Long seqNo = (Long)item.getSystemMetadata(SEQ_NO);
- Long primaryTerm = (Long)item.getSystemMetadata(PRIMARY_TERM);
-
- if (seqNo != null && primaryTerm != null) {
- updateRequest.setIfSeqNo(seqNo);
- updateRequest.setIfPrimaryTerm(primaryTerm);
- }
- }
-
- if (bulkProcessor == null) {
+ if (bulkProcessor == null || !useBatchingForUpdate) {
UpdateResponse response = client.update(updateRequest, RequestOptions.DEFAULT);
setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm());
} else {
@@ -952,6 +946,57 @@
}
}
+ private UpdateRequest createUpdateRequest(Class clazz, Date dateHint, Item item, Map source, boolean alwaysOverwrite) {
+ String itemType = Item.getItemType(clazz);
+ UpdateRequest updateRequest = new UpdateRequest(getIndex(itemType, dateHint), item.getItemId());
+ updateRequest.doc(source);
+
+ if (!alwaysOverwrite) {
+ Long seqNo = (Long) item.getSystemMetadata(SEQ_NO);
+ Long primaryTerm = (Long) item.getSystemMetadata(PRIMARY_TERM);
+
+ if (seqNo != null && primaryTerm != null) {
+ updateRequest.setIfSeqNo(seqNo);
+ updateRequest.setIfPrimaryTerm(primaryTerm);
+ }
+ }
+ return updateRequest;
+ }
+
+ @Override
+ public List<String> update(final Map<Item, Map> items, final Date dateHint, final Class clazz) {
+ if (items.size() == 0)
+ return new ArrayList<>();
+
+ List<String> result = new InClassLoaderExecute<List<String>>(metricsService, this.getClass().getName() + ".updateItems", this.bundleContext, this.fatalIllegalStateErrors) {
+ protected List<String> execute(Object... args) throws Exception {
+ long batchRequestStartTime = System.currentTimeMillis();
+
+ BulkRequest bulkRequest = new BulkRequest();
+ items.forEach((item, source) -> {
+ UpdateRequest updateRequest = createUpdateRequest(clazz, dateHint, item, source, alwaysOverwrite);
+ bulkRequest.add(updateRequest);
+ });
+
+ BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
+ logger.debug("{} profiles updated with bulk segment in {}ms", bulkRequest.numberOfActions(), System.currentTimeMillis() - batchRequestStartTime);
+
+ List<String> failedItemsIds = new ArrayList<>();
+
+ if (bulkResponse.hasFailures()){
+ Iterator<BulkItemResponse> iterator = bulkResponse.iterator();
+ iterator.forEachRemaining(bulkItemResponse -> {
+ failedItemsIds.add(bulkItemResponse.getId());
+ });
+ }
+ return failedItemsIds;
+ }
+ }.catchingExecuteInClassLoader(true);
+
+ return result;
+ }
+
+
@Override
public boolean updateWithQueryAndScript(final Date dateHint, final Class<?> clazz, final String[] scripts, final Map<String, Object>[] scriptParams, final Condition[] conditions) {
Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithQueryAndScript", this.bundleContext, this.fatalIllegalStateErrors) {
diff --git a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 5e746ad..5c2d12c 100644
--- a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -58,6 +58,7 @@
<cm:property name="aggQueryThrowOnMissingDocs" value="false" />
<cm:property name="itemClassesToCache" value="" />
<cm:property name="useBatchingForSave" value="false" />
+ <cm:property name="useBatchingForUpdate" value="true" />
<cm:property name="username" value="" />
<cm:property name="password" value="" />
@@ -134,6 +135,7 @@
<property name="hazelcastInstance" ref="hazelcastInstance" />
<property name="itemClassesToCache" value="${es.itemClassesToCache}" />
<property name="useBatchingForSave" value="${es.useBatchingForSave}" />
+ <property name="useBatchingForUpdate" value="${es.useBatchingForUpdate}" />
<property name="username" value="${es.username}" />
<property name="password" value="${es.password}" />
diff --git a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
index ac30c91..ce2fb67 100644
--- a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
+++ b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
@@ -74,4 +74,5 @@
# Errors
throwExceptions=${org.apache.unomi.elasticsearch.throwExceptions:-false}
-alwaysOverwrite=${org.apache.unomi.elasticsearch.alwaysOverwrite:-true}
\ No newline at end of file
+alwaysOverwrite=${org.apache.unomi.elasticsearch.alwaysOverwrite:-true}
+useBatchingForUpdate=${org.apache.unomi.elasticsearch.useBatchingForUpdate:-true}
diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index c054ba0..bd66ce7 100644
--- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -26,6 +26,7 @@
import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.util.function.Consumer;
/**
* A service to provide persistence and retrieval of context server entities.
@@ -176,6 +177,17 @@
boolean update(Item item, Date dateHint, Class<?> clazz, Map<?, ?> source, final boolean alwaysOverwrite);
/**
+ * Updates Map of items of the specified class and identified by the specified identifier with a new property value for the specified property name. Same as
+ * {@code update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue))}
+ *
+ * @param items A map the consist of item (key) and properties to update (value)
+ * @param dateHint a Date helping in identifying where the item is located
+ * @param clazz the Item subclass of the item to update
+ * @return List of failed Items Ids, if all succesful then returns an empty list. if the whole operation failed then will return null
+ */
+ List<String> update(Map<Item, Map> items, Date dateHint, Class clazz);
+
+ /**
* Updates the item of the specified class and identified by the specified identifier with a new property value for the specified property name. Same as
* {@code update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue))}
*
diff --git a/services/pom.xml b/services/pom.xml
index 8ad7a80..7b277f2 100644
--- a/services/pom.xml
+++ b/services/pom.xml
@@ -75,6 +75,12 @@
</dependency>
<dependency>
+ <groupId>net.jodah</groupId>
+ <artifactId>failsafe</artifactId>
+ <version>2.4.0</version>
+ </dependency>
+
+ <dependency>
<groupId>com.github.fge</groupId>
<artifactId>json-patch</artifactId>
<version>1.9</version>
diff --git a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
index a0a1a02..363395d 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
@@ -18,7 +18,10 @@
package org.apache.unomi.services.impl.segments;
import com.fasterxml.jackson.core.JsonProcessingException;
+import net.jodah.failsafe.Failsafe;
+import net.jodah.failsafe.RetryPolicy;
import org.apache.unomi.api.Event;
+import org.apache.unomi.api.Item;
import org.apache.unomi.api.Metadata;
import org.apache.unomi.api.PartialList;
import org.apache.unomi.api.Profile;
@@ -46,6 +49,7 @@
import java.io.IOException;
import java.net.URL;
import java.security.MessageDigest;
+import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeUnit;
@@ -65,7 +69,10 @@
private int segmentUpdateBatchSize = 1000;
private long segmentRefreshInterval = 1000;
private int aggregateQueryBucketSize = 5000;
-
+ private int maxRetriesForUpdateProfileSegment = 0;
+ private long secondsDelayForRetryUpdateProfileSegment = 1;
+ private boolean batchSegmentProfileUpdate = false;
+ private boolean sendProfileUpdateEventForSegmentUpdate = true;
private int maximumIdsQueryCount = 5000;
private boolean pastEventsDisablePartitions = false;
@@ -109,6 +116,22 @@
this.segmentRefreshInterval = segmentRefreshInterval;
}
+ public void setMaxRetriesForUpdateProfileSegment(int maxRetriesForUpdateProfileSegment) {
+ this.maxRetriesForUpdateProfileSegment = maxRetriesForUpdateProfileSegment;
+ }
+
+ public void setSecondsDelayForRetryUpdateProfileSegment(long secondsDelayForRetryUpdateProfileSegment) {
+ this.secondsDelayForRetryUpdateProfileSegment = secondsDelayForRetryUpdateProfileSegment;
+ }
+
+ public void setBatchSegmentProfileUpdate(boolean batchSegmentProfileUpdate) {
+ this.batchSegmentProfileUpdate = batchSegmentProfileUpdate;
+ }
+
+ public void setSendProfileUpdateEventForSegmentUpdate(boolean sendProfileUpdateEventForSegmentUpdate){
+ this.sendProfileUpdateEventForSegmentUpdate = sendProfileUpdateEventForSegmentUpdate;
+ }
+
public void postConstruct() {
logger.debug("postConstruct {" + bundleContext.getBundle() + "}");
loadPredefinedSegments(bundleContext);
@@ -361,15 +384,17 @@
List<Profile> previousProfiles = persistenceService.query(segmentCondition, null, Profile.class);
long updatedProfileCount = 0;
long profileRemovalStartTime = System.currentTimeMillis();
- for (Profile profileToRemove : previousProfiles) {
- profileToRemove.getSegments().remove(segmentId);
- Map<String,Object> sourceMap = new HashMap<>();
- sourceMap.put("segments", profileToRemove.getSegments());
- profileToRemove.setSystemProperty("lastUpdated", new Date());
- sourceMap.put("systemProperties", profileToRemove.getSystemProperties());
- persistenceService.update(profileToRemove, null, Profile.class, sourceMap);
- updatedProfileCount++;
+ if (batchSegmentProfileUpdate && previousProfiles.size() > 0) {
+ batchUpdateProfilesSegment(segmentId, previousProfiles, false);
}
+ else {
+ for (Profile profileToRemove : previousProfiles) {
+ Map<String, Object> sourceMap = buildPropertiesMapForUpdateSegment(profileToRemove, segmentId, false);
+ persistenceService.update(profileToRemove, null, Profile.class, sourceMap);
+ }
+ }
+
+ updatedProfileCount += previousProfiles.size();
logger.info("Removed segment from {} profiles in {} ms", updatedProfileCount, System.currentTimeMillis() - profileRemovalStartTime);
// update impacted segments
@@ -417,7 +442,6 @@
return new DependentMetadata(segments, scorings);
}
-
public PartialList<Profile> getMatchingIndividuals(String segmentID, int offset, int size, String sortBy) {
Segment segment = getSegmentDefinition(segmentID);
if (segment == null) {
@@ -889,14 +913,14 @@
private void updateExistingProfilesForSegment(Segment segment) {
long updateProfilesForSegmentStartTime = System.currentTimeMillis();
- Condition segmentCondition = new Condition();
-
long updatedProfileCount = 0;
+ final String segmentId = segment.getItemId();
+ Condition segmentCondition = new Condition();
segmentCondition.setConditionType(definitionsService.getConditionType("profilePropertyCondition"));
segmentCondition.setParameter("propertyName", "segments");
segmentCondition.setParameter("comparisonOperator", "equals");
- segmentCondition.setParameter("propertyValue", segment.getItemId());
+ segmentCondition.setParameter("propertyValue", segmentId);
if (segment.getMetadata().isEnabled()) {
@@ -921,76 +945,95 @@
profilesToRemoveSubConditions.add(notNewSegmentCondition);
profilesToRemoveCondition.setParameter("subConditions", profilesToRemoveSubConditions);
- PartialList<Profile> profilesToRemove = persistenceService.query(profilesToRemoveCondition, null, Profile.class, 0, segmentUpdateBatchSize, "10m");
- PartialList<Profile> profilesToAdd = persistenceService.query(profilesToAddCondition, null, Profile.class, 0, segmentUpdateBatchSize, "10m");
-
- while (profilesToAdd.getList().size() > 0) {
- long profilesToAddStartTime = System.currentTimeMillis();
- for (Profile profileToAdd : profilesToAdd.getList()) {
- profileToAdd.getSegments().add(segment.getItemId());
- Map<String,Object> sourceMap = new HashMap<>();
- sourceMap.put("segments", profileToAdd.getSegments());
- profileToAdd.setSystemProperty("lastUpdated", new Date());
- sourceMap.put("systemProperties", profileToAdd.getSystemProperties());
- persistenceService.update(profileToAdd, null, Profile.class, sourceMap);
- Event profileUpdated = new Event("profileUpdated", null, profileToAdd, null, null, profileToAdd, new Date());
- profileUpdated.setPersistent(false);
- eventService.send(profileUpdated);
- updatedProfileCount++;
- }
- logger.info("{} profiles added to segment in {}ms", profilesToAdd.size(), System.currentTimeMillis() - profilesToAddStartTime);
- profilesToAdd = persistenceService.continueScrollQuery(Profile.class, profilesToAdd.getScrollIdentifier(), profilesToAdd.getScrollTimeValidity());
- if (profilesToAdd == null || profilesToAdd.getList().size() == 0) {
- break;
- }
- }
- while (profilesToRemove.getList().size() > 0) {
- long profilesToRemoveStartTime = System.currentTimeMillis();
- for (Profile profileToRemove : profilesToRemove.getList()) {
- profileToRemove.getSegments().remove(segment.getItemId());
- Map<String,Object> sourceMap = new HashMap<>();
- sourceMap.put("segments", profileToRemove.getSegments());
- profileToRemove.setSystemProperty("lastUpdated", new Date());
- sourceMap.put("systemProperties", profileToRemove.getSystemProperties());
- persistenceService.update(profileToRemove, null, Profile.class, sourceMap);
- Event profileUpdated = new Event("profileUpdated", null, profileToRemove, null, null, profileToRemove, new Date());
- profileUpdated.setPersistent(false);
- eventService.send(profileUpdated);
- updatedProfileCount++;
- }
- logger.info("{} profiles removed from segment in {}ms", profilesToRemove.size(), System.currentTimeMillis() - profilesToRemoveStartTime );
- profilesToRemove = persistenceService.continueScrollQuery(Profile.class, profilesToRemove.getScrollIdentifier(), profilesToRemove.getScrollTimeValidity());
- if (profilesToRemove == null || profilesToRemove.getList().size() == 0) {
- break;
- }
- }
-
+ updatedProfileCount += updateProfilesSegment(profilesToAddCondition, segmentId, true);
+ updatedProfileCount += updateProfilesSegment(profilesToRemoveCondition, segmentId, false);
} else {
- PartialList<Profile> profilesToRemove = persistenceService.query(segmentCondition, null, Profile.class, 0, 200, "10m");
- while (profilesToRemove.getList().size() > 0) {
- long profilesToRemoveStartTime = System.currentTimeMillis();
- for (Profile profileToRemove : profilesToRemove.getList()) {
- profileToRemove.getSegments().remove(segment.getItemId());
- Map<String,Object> sourceMap = new HashMap<>();
- sourceMap.put("segments", profileToRemove.getSegments());
- profileToRemove.setSystemProperty("lastUpdated", new Date());
- sourceMap.put("systemProperties", profileToRemove.getSystemProperties());
- persistenceService.update(profileToRemove, null, Profile.class, sourceMap);
- Event profileUpdated = new Event("profileUpdated", null, profileToRemove, null, null, profileToRemove, new Date());
- profileUpdated.setPersistent(false);
- eventService.send(profileUpdated);
- updatedProfileCount++;
- }
- logger.info("{} profiles removed from segment in {}ms", profilesToRemove.size(), System.currentTimeMillis() - profilesToRemoveStartTime);
- profilesToRemove = persistenceService.continueScrollQuery(Profile.class, profilesToRemove.getScrollIdentifier(), profilesToRemove.getScrollTimeValidity());
- if (profilesToRemove == null || profilesToRemove.getList().size() == 0) {
- break;
- }
- }
+ updatedProfileCount += updateProfilesSegment(segmentCondition, segmentId, false);
}
logger.info("{} profiles updated in {}ms", updatedProfileCount, System.currentTimeMillis() - updateProfilesForSegmentStartTime);
}
+ private long updateProfilesSegment(Condition profilesToUpdateCondition, String segmentId, boolean isAdd){
+ long updatedProfileCount= 0;
+ PartialList<Profile> profiles = persistenceService.query(profilesToUpdateCondition, null, Profile.class, 0, segmentUpdateBatchSize, "10m");
+
+ while (profiles != null && profiles.getList().size() > 0) {
+ long startTime = System.currentTimeMillis();
+ if (batchSegmentProfileUpdate) {
+ batchUpdateProfilesSegment(segmentId, profiles.getList(), isAdd);
+ }
+ else { //send update profile one by one
+ for (Profile profileToUpdate : profiles.getList()) {
+ Map<String, Object> sourceMap = buildPropertiesMapForUpdateSegment(profileToUpdate, segmentId, isAdd);
+ persistenceService.update(profileToUpdate, null, Profile.class, sourceMap);
+ }
+ }
+ if (sendProfileUpdateEventForSegmentUpdate)
+ sendProfileUpdatedEvent(profiles.getList());
+
+ updatedProfileCount += profiles.size();
+ logger.info("{} profiles {} to segment {} in {}ms", profiles.size(), isAdd ? "added" : "removed", segmentId, System.currentTimeMillis() - startTime);
+
+ profiles = persistenceService.continueScrollQuery(Profile.class, profiles.getScrollIdentifier(), profiles.getScrollTimeValidity());
+ }
+
+ return updatedProfileCount;
+ }
+
+ private void batchUpdateProfilesSegment(String segmentId, List<Profile> profiles, boolean isAdd) {
+ Map<Item, Map> profileToPropertiesMap = new HashMap<>();
+ for (Profile profileToUpdate : profiles) {
+ Map<String,Object> propertiesToUpdate = buildPropertiesMapForUpdateSegment(profileToUpdate, segmentId, isAdd);
+ profileToPropertiesMap.put(profileToUpdate, propertiesToUpdate);
+ }
+ List<String> failedItemsIds = persistenceService.update(profileToPropertiesMap, null, Profile.class);
+ if (failedItemsIds != null)
+ failedItemsIds.forEach(s -> retryFailedSegmentUpdate(s, segmentId, isAdd));
+ }
+
+ private void retryFailedSegmentUpdate(String profileId, String segmentId, boolean isAdd){
+ if (maxRetriesForUpdateProfileSegment > 0){
+ RetryPolicy retryPolicy = new RetryPolicy()
+ .withDelay(Duration.ofSeconds(secondsDelayForRetryUpdateProfileSegment))
+ .withMaxRetries(maxRetriesForUpdateProfileSegment);
+
+ Failsafe.with(retryPolicy).
+ run(executionContext -> {
+ logger.warn("retry updating profile segment {}, profile {}, time {}", segmentId, profileId, new Date());
+ Profile profileToAddUpdated = persistenceService.load(profileId, Profile.class);
+ Map<String, Object> sourceMapToUpdate = buildPropertiesMapForUpdateSegment(profileToAddUpdated, segmentId, isAdd);
+ boolean isUpdated = persistenceService.update(profileToAddUpdated, null, Profile.class, sourceMapToUpdate);
+ if (isUpdated == false)
+ throw new Exception(String.format("failed retry update profile segment {}, profile {}, time {}", segmentId, profileId, new Date()));
+ });
+ }
+ }
+
+ private void sendProfileUpdatedEvent(List<Profile> profiles) {
+ for (Profile profileToAdd : profiles) {
+ sendProfileUpdatedEvent(profileToAdd);
+ }
+ }
+
+ private void sendProfileUpdatedEvent(Profile profile) {
+ Event profileUpdated = new Event("profileUpdated", null, profile, null, null, profile, new Date());
+ profileUpdated.setPersistent(false);
+ eventService.send(profileUpdated);
+ }
+
+ private Map<String, Object> buildPropertiesMapForUpdateSegment(Profile profile, String segmentId, boolean isAdd) {
+ if (isAdd)
+ profile.getSegments().add(segmentId);
+ else
+ profile.getSegments().remove(segmentId);
+
+ Map<String, Object> sourceMap = new HashMap<>();
+ sourceMap.put("segments", profile.getSegments());
+ profile.setSystemProperty("lastUpdated", new Date());
+ sourceMap.put("systemProperties", profile.getSystemProperties());
+ return sourceMap;
+ }
+
private void updateExistingProfilesForScoring(Scoring scoring) {
long startTime = System.currentTimeMillis();
Condition scoringCondition = new Condition();
diff --git a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index f77c4fe..4a9ab5b 100644
--- a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -34,7 +34,11 @@
<cm:property name="definitions.refresh.interval" value="10000"/>
<cm:property name="properties.refresh.interval" value="10000"/>
<cm:property name="segment.refresh.interval" value="1000"/>
+ <cm:property name="segment.max.retries.update.profile.segment" value="5"/>
+ <cm:property name="segment.retry.update.segment.seconds.delay" value="1"/>
<cm:property name="segment.recalculate.period" value="1"/>
+ <cm:property name="segment.batch.update" value="false"/>
+ <cm:property name="segment.send.profile.update.event" value="true"/>
<cm:property name="rules.refresh.interval" value="1000"/>
<cm:property name="rules.statistics.refresh.interval" value="10000"/>
</cm:default-properties>
@@ -180,6 +184,10 @@
<property name="aggregateQueryBucketSize" value="${es.aggregateQueryBucketSize}"/>
<property name="pastEventsDisablePartitions" value="${es.pastEventsDisablePartitions}"/>
<property name="maximumIdsQueryCount" value="${es.maximumIdsQueryCount}"/>
+ <property name="maxRetriesForUpdateProfileSegment" value="${services.segment.max.retries.update.profile.segment}" />
+ <property name="secondsDelayForRetryUpdateProfileSegment" value="${services.segment.retry.update.segment.seconds.delay}" />
+ <property name="batchSegmentProfileUpdate" value="${services.segment.batch.update}" />
+ <property name="sendProfileUpdateEventForSegmentUpdate" value="${services.segment.send.profile.update.event}" />
</bean>
<service id="segmentService" ref="segmentServiceImpl">
diff --git a/services/src/main/resources/org.apache.unomi.services.cfg b/services/src/main/resources/org.apache.unomi.services.cfg
index 08295cb..37c9881 100644
--- a/services/src/main/resources/org.apache.unomi.services.cfg
+++ b/services/src/main/resources/org.apache.unomi.services.cfg
@@ -34,6 +34,18 @@
# profiles that need updating
segment.update.batchSize=${org.apache.unomi.segment.update.batchSize:-1000}
+# When performing segment updates, can retry an update in case of an error to a single profile
+segment.max.retries.update.profile.segment=${org.apache.unomi.services.segment.max.retries.update.profile.segment:-0}
+
+# When performing retry of segment update after a request was failed, delay of requests
+segment.retry.update.segment.seconds.delay=${org.apache.unomi.services.segment.update.segment.retry.seconds.delay:-1}
+
+# Run Batch request separately for updating segments in profiles
+segment.batch.update=${org.apache.unomi.segment.batch.update:-false}
+
+# Send Profile Updated Event for every profile segment update
+segment.send.profile.update.event=${org.apache.unomi.segment.send.profile.update.event:-true}
+
# The interval in milliseconds to use to reload the definitions (condition types and action types)
definitions.refresh.interval=${org.apache.unomi.definitions.refresh.interval:-10000}