update segments with its own bulk request, and use retry for failures (#237)

diff --git a/package/src/main/resources/etc/custom.system.properties b/package/src/main/resources/etc/custom.system.properties
index 138eb0e..77c8d0d 100644
--- a/package/src/main/resources/etc/custom.system.properties
+++ b/package/src/main/resources/etc/custom.system.properties
@@ -151,6 +151,14 @@
 # When performing segment updates, this controls the size of the scrolling query size used to iterate over all the
 # profiles that need updating
 org.apache.unomi.segment.update.batchSize=${env:UNOMI_SEGMENT_UPDATE_BATCHSIZE:-1000}
+# Run Batch request separately for updating segments in profiles
+org.apache.unomi.segment.batch.update=${env:UNOMI_SEGMENT_BATCH_PROFILE_UPDATE:-false}
+# Send Profile Updated Event for every profile segment update
+org.apache.unomi.segment.send.profile.update.event=${env: UNOMI_SEGMENT_SEND_PROFILE_UPDATE_EVENT:-true}
+# When performing segment updates, can retry an update in case of an error to a single profile
+org.apache.unomi.services.segment.max.retries.update.profile.segment=${env:UNOMI_SEGMENT_UPDATE_MAX_RETRIES:-0}
+# When performing retry of segment update after a request was failed, delay of requests
+org.apache.unomi.services.segment.update.segment.retry.seconds.delay=${env:UNOMI_SEGMENT_UPDATE_RETRY_DELAY:-1}
 # The interval in milliseconds to use to reload the definitions (condition types and action types)
 org.apache.unomi.definitions.refresh.interval=${env:UNOMI_DEFINITIONS_REFRESH_INTERVAL:-10000}
 # The interval in milliseconds to use to reload the property types
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 2f5a3bc..ae73987 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -23,6 +23,7 @@
 import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.config.RequestConfig;
 import org.apache.http.conn.ssl.NoopHostnameVerifier;
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.lucene.search.TotalHits;
@@ -159,6 +160,7 @@
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -235,12 +237,12 @@
     private Set<String> itemClassesToCacheSet = new HashSet<>();
     private String itemClassesToCache;
     private boolean useBatchingForSave = false;
+    private boolean useBatchingForUpdate = true;
+    private boolean alwaysOverwrite = true;
     private boolean aggQueryThrowOnMissingDocs = false;
     private Integer aggQueryMaxResponseSizeHttp = null;
     private Integer clientSocketTimeout = null;
 
-    private boolean alwaysOverwrite = true;
-
     private Map<String, Map<String, Map<String, Object>>> knownMappings = new HashMap<>();
 
     public void setBundleContext(BundleContext bundleContext) {
@@ -390,6 +392,10 @@
         this.useBatchingForSave = useBatchingForSave;
     }
 
+    public void setUseBatchingForUpdate(boolean useBatchingForUpdate) {
+        this.useBatchingForUpdate = useBatchingForUpdate;
+    }
+
     public void setUsername(String username) {
         this.username = username;
     }
@@ -919,21 +925,9 @@
         Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateItem", this.bundleContext, this.fatalIllegalStateErrors) {
             protected Boolean execute(Object... args) throws Exception {
                 try {
-                    String itemType = Item.getItemType(clazz);
-                    UpdateRequest updateRequest = new UpdateRequest(getIndex(itemType, dateHint), item.getItemId());
-                    updateRequest.doc(source);
+                    UpdateRequest updateRequest = createUpdateRequest(clazz, dateHint, item, source, alwaysOverwrite);
 
-                    if (!alwaysOverwrite) {
-                        Long seqNo = (Long)item.getSystemMetadata(SEQ_NO);
-                        Long primaryTerm = (Long)item.getSystemMetadata(PRIMARY_TERM);
-
-                        if (seqNo != null && primaryTerm != null) {
-                            updateRequest.setIfSeqNo(seqNo);
-                            updateRequest.setIfPrimaryTerm(primaryTerm);
-                        }
-                    }
-
-                    if (bulkProcessor == null) {
+                    if (bulkProcessor == null || !useBatchingForUpdate) {
                         UpdateResponse response = client.update(updateRequest, RequestOptions.DEFAULT);
                         setMetadata(item, response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm());
                     } else {
@@ -952,6 +946,57 @@
         }
     }
 
+    private UpdateRequest createUpdateRequest(Class clazz, Date dateHint, Item item, Map source, boolean alwaysOverwrite) {
+        String itemType = Item.getItemType(clazz);
+        UpdateRequest updateRequest = new UpdateRequest(getIndex(itemType, dateHint), item.getItemId());
+        updateRequest.doc(source);
+
+        if (!alwaysOverwrite) {
+            Long seqNo = (Long) item.getSystemMetadata(SEQ_NO);
+            Long primaryTerm = (Long) item.getSystemMetadata(PRIMARY_TERM);
+
+            if (seqNo != null && primaryTerm != null) {
+                updateRequest.setIfSeqNo(seqNo);
+                updateRequest.setIfPrimaryTerm(primaryTerm);
+            }
+        }
+        return updateRequest;
+    }
+
+    @Override
+    public List<String> update(final Map<Item, Map> items, final Date dateHint, final Class clazz) {
+        if (items.size() == 0)
+            return new ArrayList<>();
+
+        List<String> result = new InClassLoaderExecute<List<String>>(metricsService, this.getClass().getName() + ".updateItems",  this.bundleContext, this.fatalIllegalStateErrors) {
+            protected List<String> execute(Object... args) throws Exception {
+                long batchRequestStartTime = System.currentTimeMillis();
+
+                BulkRequest bulkRequest = new BulkRequest();
+                items.forEach((item, source) -> {
+                    UpdateRequest updateRequest = createUpdateRequest(clazz, dateHint, item, source, alwaysOverwrite);
+                    bulkRequest.add(updateRequest);
+                });
+
+                BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
+                logger.debug("{} profiles updated with bulk segment in {}ms", bulkRequest.numberOfActions(), System.currentTimeMillis() - batchRequestStartTime);
+
+                List<String> failedItemsIds = new ArrayList<>();
+
+                if (bulkResponse.hasFailures()){
+                    Iterator<BulkItemResponse> iterator = bulkResponse.iterator();
+                    iterator.forEachRemaining(bulkItemResponse -> {
+                        failedItemsIds.add(bulkItemResponse.getId());
+                    });
+                }
+                return failedItemsIds;
+            }
+        }.catchingExecuteInClassLoader(true);
+
+        return result;
+    }
+
+
     @Override
     public boolean updateWithQueryAndScript(final Date dateHint, final Class<?> clazz, final String[] scripts, final Map<String, Object>[] scriptParams, final Condition[] conditions) {
         Boolean result = new InClassLoaderExecute<Boolean>(metricsService, this.getClass().getName() + ".updateWithQueryAndScript", this.bundleContext, this.fatalIllegalStateErrors) {
diff --git a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 5e746ad..5c2d12c 100644
--- a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -58,6 +58,7 @@
             <cm:property name="aggQueryThrowOnMissingDocs" value="false" />
             <cm:property name="itemClassesToCache" value="" />
             <cm:property name="useBatchingForSave" value="false" />
+            <cm:property name="useBatchingForUpdate" value="true" />
 
             <cm:property name="username" value="" />
             <cm:property name="password" value="" />
@@ -134,6 +135,7 @@
         <property name="hazelcastInstance" ref="hazelcastInstance" />
         <property name="itemClassesToCache" value="${es.itemClassesToCache}" />
         <property name="useBatchingForSave" value="${es.useBatchingForSave}" />
+        <property name="useBatchingForUpdate" value="${es.useBatchingForUpdate}" />
 
         <property name="username" value="${es.username}" />
         <property name="password" value="${es.password}" />
diff --git a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
index ac30c91..ce2fb67 100644
--- a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
+++ b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
@@ -74,4 +74,5 @@
 # Errors
 throwExceptions=${org.apache.unomi.elasticsearch.throwExceptions:-false}
 
-alwaysOverwrite=${org.apache.unomi.elasticsearch.alwaysOverwrite:-true}
\ No newline at end of file
+alwaysOverwrite=${org.apache.unomi.elasticsearch.alwaysOverwrite:-true}
+useBatchingForUpdate=${org.apache.unomi.elasticsearch.useBatchingForUpdate:-true}
diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
index c054ba0..bd66ce7 100644
--- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
+++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java
@@ -26,6 +26,7 @@
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
 
 /**
  * A service to provide persistence and retrieval of context server entities.
@@ -176,6 +177,17 @@
     boolean update(Item item, Date dateHint, Class<?> clazz, Map<?, ?> source, final boolean alwaysOverwrite);
 
     /**
+     * Updates Map of items of the specified class and identified by the specified identifier with a new property value for the specified property name. Same as
+     * {@code update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue))}
+     *
+     * @param items         A map the consist of item (key) and properties to update (value)
+     * @param dateHint      a Date helping in identifying where the item is located
+     * @param clazz         the Item subclass of the item to update
+     * @return List of failed Items Ids, if all succesful then returns an empty list. if the whole operation failed then will return null
+     */
+    List<String> update(Map<Item, Map> items, Date dateHint, Class clazz);
+
+    /**
      * Updates the item of the specified class and identified by the specified identifier with a new property value for the specified property name. Same as
      * {@code update(itemId, dateHint, clazz, Collections.singletonMap(propertyName, propertyValue))}
      *
diff --git a/services/pom.xml b/services/pom.xml
index 8ad7a80..7b277f2 100644
--- a/services/pom.xml
+++ b/services/pom.xml
@@ -75,6 +75,12 @@
         </dependency>
 
         <dependency>
+            <groupId>net.jodah</groupId>
+            <artifactId>failsafe</artifactId>
+            <version>2.4.0</version>
+        </dependency>
+
+        <dependency>
             <groupId>com.github.fge</groupId>
             <artifactId>json-patch</artifactId>
             <version>1.9</version>
diff --git a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
index a0a1a02..363395d 100644
--- a/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/impl/segments/SegmentServiceImpl.java
@@ -18,7 +18,10 @@
 package org.apache.unomi.services.impl.segments;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import net.jodah.failsafe.Failsafe;
+import net.jodah.failsafe.RetryPolicy;
 import org.apache.unomi.api.Event;
+import org.apache.unomi.api.Item;
 import org.apache.unomi.api.Metadata;
 import org.apache.unomi.api.PartialList;
 import org.apache.unomi.api.Profile;
@@ -46,6 +49,7 @@
 import java.io.IOException;
 import java.net.URL;
 import java.security.MessageDigest;
+import java.time.Duration;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 
@@ -65,7 +69,10 @@
     private int segmentUpdateBatchSize = 1000;
     private long segmentRefreshInterval = 1000;
     private int aggregateQueryBucketSize = 5000;
-
+    private int maxRetriesForUpdateProfileSegment = 0;
+    private long secondsDelayForRetryUpdateProfileSegment = 1;
+    private boolean batchSegmentProfileUpdate = false;
+    private boolean sendProfileUpdateEventForSegmentUpdate = true;
     private int maximumIdsQueryCount = 5000;
     private boolean pastEventsDisablePartitions = false;
 
@@ -109,6 +116,22 @@
         this.segmentRefreshInterval = segmentRefreshInterval;
     }
 
+    public void setMaxRetriesForUpdateProfileSegment(int maxRetriesForUpdateProfileSegment) {
+        this.maxRetriesForUpdateProfileSegment = maxRetriesForUpdateProfileSegment;
+    }
+
+    public void setSecondsDelayForRetryUpdateProfileSegment(long secondsDelayForRetryUpdateProfileSegment) {
+        this.secondsDelayForRetryUpdateProfileSegment = secondsDelayForRetryUpdateProfileSegment;
+    }
+
+    public void setBatchSegmentProfileUpdate(boolean batchSegmentProfileUpdate) {
+        this.batchSegmentProfileUpdate = batchSegmentProfileUpdate;
+    }
+
+    public void setSendProfileUpdateEventForSegmentUpdate(boolean sendProfileUpdateEventForSegmentUpdate){
+        this.sendProfileUpdateEventForSegmentUpdate = sendProfileUpdateEventForSegmentUpdate;
+    }
+
     public void postConstruct() {
         logger.debug("postConstruct {" + bundleContext.getBundle() + "}");
         loadPredefinedSegments(bundleContext);
@@ -361,15 +384,17 @@
             List<Profile> previousProfiles = persistenceService.query(segmentCondition, null, Profile.class);
             long updatedProfileCount = 0;
             long profileRemovalStartTime = System.currentTimeMillis();
-            for (Profile profileToRemove : previousProfiles) {
-                profileToRemove.getSegments().remove(segmentId);
-                Map<String,Object> sourceMap = new HashMap<>();
-                sourceMap.put("segments", profileToRemove.getSegments());
-                profileToRemove.setSystemProperty("lastUpdated", new Date());
-                sourceMap.put("systemProperties", profileToRemove.getSystemProperties());
-                persistenceService.update(profileToRemove, null, Profile.class, sourceMap);
-                updatedProfileCount++;
+            if (batchSegmentProfileUpdate && previousProfiles.size() > 0) {
+                batchUpdateProfilesSegment(segmentId, previousProfiles, false);
             }
+            else {
+                for (Profile profileToRemove : previousProfiles) {
+                    Map<String, Object> sourceMap = buildPropertiesMapForUpdateSegment(profileToRemove, segmentId, false);
+                    persistenceService.update(profileToRemove, null, Profile.class, sourceMap);
+                }
+            }
+
+            updatedProfileCount += previousProfiles.size();
             logger.info("Removed segment from {} profiles in {} ms", updatedProfileCount, System.currentTimeMillis() - profileRemovalStartTime);
 
             // update impacted segments
@@ -417,7 +442,6 @@
         return new DependentMetadata(segments, scorings);
     }
 
-
     public PartialList<Profile> getMatchingIndividuals(String segmentID, int offset, int size, String sortBy) {
         Segment segment = getSegmentDefinition(segmentID);
         if (segment == null) {
@@ -889,14 +913,14 @@
 
     private void updateExistingProfilesForSegment(Segment segment) {
         long updateProfilesForSegmentStartTime = System.currentTimeMillis();
-        Condition segmentCondition = new Condition();
-
         long updatedProfileCount = 0;
+        final String segmentId = segment.getItemId();
 
+        Condition segmentCondition = new Condition();
         segmentCondition.setConditionType(definitionsService.getConditionType("profilePropertyCondition"));
         segmentCondition.setParameter("propertyName", "segments");
         segmentCondition.setParameter("comparisonOperator", "equals");
-        segmentCondition.setParameter("propertyValue", segment.getItemId());
+        segmentCondition.setParameter("propertyValue", segmentId);
 
         if (segment.getMetadata().isEnabled()) {
 
@@ -921,76 +945,95 @@
             profilesToRemoveSubConditions.add(notNewSegmentCondition);
             profilesToRemoveCondition.setParameter("subConditions", profilesToRemoveSubConditions);
 
-            PartialList<Profile> profilesToRemove = persistenceService.query(profilesToRemoveCondition, null, Profile.class, 0, segmentUpdateBatchSize, "10m");
-            PartialList<Profile> profilesToAdd = persistenceService.query(profilesToAddCondition, null, Profile.class, 0, segmentUpdateBatchSize, "10m");
-
-            while (profilesToAdd.getList().size() > 0) {
-                long profilesToAddStartTime = System.currentTimeMillis();
-                for (Profile profileToAdd : profilesToAdd.getList()) {
-                    profileToAdd.getSegments().add(segment.getItemId());
-                    Map<String,Object> sourceMap = new HashMap<>();
-                    sourceMap.put("segments", profileToAdd.getSegments());
-                    profileToAdd.setSystemProperty("lastUpdated", new Date());
-                    sourceMap.put("systemProperties", profileToAdd.getSystemProperties());
-                    persistenceService.update(profileToAdd, null, Profile.class, sourceMap);
-                    Event profileUpdated = new Event("profileUpdated", null, profileToAdd, null, null, profileToAdd, new Date());
-                    profileUpdated.setPersistent(false);
-                    eventService.send(profileUpdated);
-                    updatedProfileCount++;
-                }
-                logger.info("{} profiles added to segment in {}ms", profilesToAdd.size(), System.currentTimeMillis() - profilesToAddStartTime);
-                profilesToAdd = persistenceService.continueScrollQuery(Profile.class, profilesToAdd.getScrollIdentifier(), profilesToAdd.getScrollTimeValidity());
-                if (profilesToAdd == null || profilesToAdd.getList().size() == 0) {
-                    break;
-                }
-            }
-            while (profilesToRemove.getList().size() > 0) {
-                long profilesToRemoveStartTime = System.currentTimeMillis();
-                for (Profile profileToRemove : profilesToRemove.getList()) {
-                    profileToRemove.getSegments().remove(segment.getItemId());
-                    Map<String,Object> sourceMap = new HashMap<>();
-                    sourceMap.put("segments", profileToRemove.getSegments());
-                    profileToRemove.setSystemProperty("lastUpdated", new Date());
-                    sourceMap.put("systemProperties", profileToRemove.getSystemProperties());
-                    persistenceService.update(profileToRemove, null, Profile.class, sourceMap);
-                    Event profileUpdated = new Event("profileUpdated", null, profileToRemove, null, null, profileToRemove, new Date());
-                    profileUpdated.setPersistent(false);
-                    eventService.send(profileUpdated);
-                    updatedProfileCount++;
-                }
-                logger.info("{} profiles removed from segment in {}ms", profilesToRemove.size(), System.currentTimeMillis() - profilesToRemoveStartTime );
-                profilesToRemove = persistenceService.continueScrollQuery(Profile.class, profilesToRemove.getScrollIdentifier(), profilesToRemove.getScrollTimeValidity());
-                if (profilesToRemove == null || profilesToRemove.getList().size() == 0) {
-                    break;
-                }
-            }
-
+            updatedProfileCount += updateProfilesSegment(profilesToAddCondition, segmentId, true);
+            updatedProfileCount += updateProfilesSegment(profilesToRemoveCondition, segmentId, false);
         } else {
-            PartialList<Profile> profilesToRemove = persistenceService.query(segmentCondition, null, Profile.class, 0, 200, "10m");
-            while (profilesToRemove.getList().size() > 0) {
-                long profilesToRemoveStartTime = System.currentTimeMillis();
-                for (Profile profileToRemove : profilesToRemove.getList()) {
-                    profileToRemove.getSegments().remove(segment.getItemId());
-                    Map<String,Object> sourceMap = new HashMap<>();
-                    sourceMap.put("segments", profileToRemove.getSegments());
-                    profileToRemove.setSystemProperty("lastUpdated", new Date());
-                    sourceMap.put("systemProperties", profileToRemove.getSystemProperties());
-                    persistenceService.update(profileToRemove, null, Profile.class, sourceMap);
-                    Event profileUpdated = new Event("profileUpdated", null, profileToRemove, null, null, profileToRemove, new Date());
-                    profileUpdated.setPersistent(false);
-                    eventService.send(profileUpdated);
-                    updatedProfileCount++;
-                }
-                logger.info("{} profiles removed from segment in {}ms", profilesToRemove.size(), System.currentTimeMillis() - profilesToRemoveStartTime);
-                profilesToRemove = persistenceService.continueScrollQuery(Profile.class, profilesToRemove.getScrollIdentifier(), profilesToRemove.getScrollTimeValidity());
-                if (profilesToRemove == null || profilesToRemove.getList().size() == 0) {
-                    break;
-                }
-            }
+            updatedProfileCount += updateProfilesSegment(segmentCondition, segmentId, false);
         }
         logger.info("{} profiles updated in {}ms", updatedProfileCount, System.currentTimeMillis() - updateProfilesForSegmentStartTime);
     }
 
+    private long updateProfilesSegment(Condition profilesToUpdateCondition, String segmentId, boolean isAdd){
+        long updatedProfileCount= 0;
+        PartialList<Profile> profiles = persistenceService.query(profilesToUpdateCondition, null, Profile.class, 0, segmentUpdateBatchSize, "10m");
+
+        while (profiles != null && profiles.getList().size() > 0) {
+            long startTime = System.currentTimeMillis();
+            if (batchSegmentProfileUpdate) {
+                batchUpdateProfilesSegment(segmentId, profiles.getList(), isAdd);
+            }
+            else { //send update profile one by one
+                for (Profile profileToUpdate : profiles.getList()) {
+                    Map<String, Object> sourceMap = buildPropertiesMapForUpdateSegment(profileToUpdate, segmentId, isAdd);
+                    persistenceService.update(profileToUpdate, null, Profile.class, sourceMap);
+                }
+            }
+           if (sendProfileUpdateEventForSegmentUpdate)
+                sendProfileUpdatedEvent(profiles.getList());
+
+            updatedProfileCount += profiles.size();
+            logger.info("{} profiles {} to segment {} in {}ms", profiles.size(), isAdd ? "added" : "removed", segmentId, System.currentTimeMillis() - startTime);
+
+            profiles = persistenceService.continueScrollQuery(Profile.class, profiles.getScrollIdentifier(), profiles.getScrollTimeValidity());
+        }
+
+        return updatedProfileCount;
+    }
+
+    private void batchUpdateProfilesSegment(String segmentId, List<Profile> profiles, boolean isAdd) {
+        Map<Item, Map> profileToPropertiesMap = new HashMap<>();
+        for (Profile profileToUpdate : profiles) {
+            Map<String,Object> propertiesToUpdate = buildPropertiesMapForUpdateSegment(profileToUpdate, segmentId, isAdd);
+            profileToPropertiesMap.put(profileToUpdate, propertiesToUpdate);
+        }
+        List<String> failedItemsIds = persistenceService.update(profileToPropertiesMap, null, Profile.class);
+        if (failedItemsIds != null)
+            failedItemsIds.forEach(s -> retryFailedSegmentUpdate(s, segmentId, isAdd));
+    }
+
+    private void retryFailedSegmentUpdate(String profileId, String segmentId, boolean isAdd){
+        if (maxRetriesForUpdateProfileSegment > 0){
+            RetryPolicy retryPolicy = new RetryPolicy()
+                    .withDelay(Duration.ofSeconds(secondsDelayForRetryUpdateProfileSegment))
+                    .withMaxRetries(maxRetriesForUpdateProfileSegment);
+
+            Failsafe.with(retryPolicy).
+                    run(executionContext -> {
+                        logger.warn("retry updating profile segment {}, profile {}, time {}", segmentId, profileId, new Date());
+                        Profile profileToAddUpdated = persistenceService.load(profileId, Profile.class);
+                        Map<String, Object> sourceMapToUpdate = buildPropertiesMapForUpdateSegment(profileToAddUpdated, segmentId, isAdd);
+                        boolean isUpdated = persistenceService.update(profileToAddUpdated, null, Profile.class, sourceMapToUpdate);
+                        if (isUpdated == false)
+                            throw new Exception(String.format("failed retry update profile segment {}, profile {}, time {}", segmentId, profileId, new Date()));
+                    });
+        }
+    }
+
+    private void sendProfileUpdatedEvent(List<Profile> profiles) {
+        for (Profile profileToAdd : profiles) {
+            sendProfileUpdatedEvent(profileToAdd);
+        }
+    }
+
+    private void sendProfileUpdatedEvent(Profile profile) {
+            Event profileUpdated = new Event("profileUpdated", null, profile, null, null, profile, new Date());
+            profileUpdated.setPersistent(false);
+            eventService.send(profileUpdated);
+    }
+
+    private Map<String, Object> buildPropertiesMapForUpdateSegment(Profile profile, String segmentId, boolean isAdd) {
+        if (isAdd)
+            profile.getSegments().add(segmentId);
+        else
+            profile.getSegments().remove(segmentId);
+
+        Map<String, Object> sourceMap = new HashMap<>();
+        sourceMap.put("segments", profile.getSegments());
+        profile.setSystemProperty("lastUpdated", new Date());
+        sourceMap.put("systemProperties", profile.getSystemProperties());
+        return sourceMap;
+    }
+
     private void updateExistingProfilesForScoring(Scoring scoring) {
         long startTime = System.currentTimeMillis();
         Condition scoringCondition = new Condition();
diff --git a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index f77c4fe..4a9ab5b 100644
--- a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -34,7 +34,11 @@
             <cm:property name="definitions.refresh.interval" value="10000"/>
             <cm:property name="properties.refresh.interval" value="10000"/>
             <cm:property name="segment.refresh.interval" value="1000"/>
+            <cm:property name="segment.max.retries.update.profile.segment" value="5"/>
+            <cm:property name="segment.retry.update.segment.seconds.delay" value="1"/>
             <cm:property name="segment.recalculate.period" value="1"/>
+            <cm:property name="segment.batch.update" value="false"/>
+            <cm:property name="segment.send.profile.update.event" value="true"/>
             <cm:property name="rules.refresh.interval" value="1000"/>
             <cm:property name="rules.statistics.refresh.interval" value="10000"/>
         </cm:default-properties>
@@ -180,6 +184,10 @@
         <property name="aggregateQueryBucketSize" value="${es.aggregateQueryBucketSize}"/>
         <property name="pastEventsDisablePartitions" value="${es.pastEventsDisablePartitions}"/>
         <property name="maximumIdsQueryCount" value="${es.maximumIdsQueryCount}"/>
+        <property name="maxRetriesForUpdateProfileSegment" value="${services.segment.max.retries.update.profile.segment}" />
+        <property name="secondsDelayForRetryUpdateProfileSegment" value="${services.segment.retry.update.segment.seconds.delay}" />
+        <property name="batchSegmentProfileUpdate" value="${services.segment.batch.update}" />
+        <property name="sendProfileUpdateEventForSegmentUpdate" value="${services.segment.send.profile.update.event}" />
 
     </bean>
     <service id="segmentService" ref="segmentServiceImpl">
diff --git a/services/src/main/resources/org.apache.unomi.services.cfg b/services/src/main/resources/org.apache.unomi.services.cfg
index 08295cb..37c9881 100644
--- a/services/src/main/resources/org.apache.unomi.services.cfg
+++ b/services/src/main/resources/org.apache.unomi.services.cfg
@@ -34,6 +34,18 @@
 # profiles that need updating
 segment.update.batchSize=${org.apache.unomi.segment.update.batchSize:-1000}
 
+# When performing segment updates, can retry an update in case of an error to a single profile
+segment.max.retries.update.profile.segment=${org.apache.unomi.services.segment.max.retries.update.profile.segment:-0}
+
+# When performing retry of segment update after a request was failed, delay of requests
+segment.retry.update.segment.seconds.delay=${org.apache.unomi.services.segment.update.segment.retry.seconds.delay:-1}
+
+# Run Batch request separately for updating segments in profiles
+segment.batch.update=${org.apache.unomi.segment.batch.update:-false}
+
+# Send Profile Updated Event for every profile segment update
+segment.send.profile.update.event=${org.apache.unomi.segment.send.profile.update.event:-true}
+
 # The interval in milliseconds to use to reload the definitions (condition types and action types)
 definitions.refresh.interval=${org.apache.unomi.definitions.refresh.interval:-10000}