UNOMI-204 : added configurations parameters
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 87a46eb..dd8c48f 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -156,7 +156,7 @@
private String minimalElasticSearchVersion = "5.0.0";
private String maximalElasticSearchVersion = "5.7.0";
- private String aggregateQueryBucketSize = "5000";
+ private int aggregateQueryBucketSize = 5000;
private String transportClientClassName = null;
private String transportClientProperties = null;
@@ -259,7 +259,7 @@
this.maximalElasticSearchVersion = maximalElasticSearchVersion;
}
- public void setAggregateQueryBucketSize(String aggregateQueryBucketSize) {
+ public void setAggregateQueryBucketSize(int aggregateQueryBucketSize) {
this.aggregateQueryBucketSize = aggregateQueryBucketSize;
}
@@ -1578,7 +1578,7 @@
fieldName = getPropertyNameWithData(fieldName, itemType);
//default
if (fieldName != null) {
- bucketsAggregation = AggregationBuilders.terms("buckets").field(fieldName).size(Integer.parseInt(aggregateQueryBucketSize));
+ bucketsAggregation = AggregationBuilders.terms("buckets").field(fieldName).size(aggregateQueryBucketSize);
if (aggregate instanceof TermsAggregate) {
TermsAggregate termsAggregate = (TermsAggregate) aggregate;
if (termsAggregate.getPartition() > -1 && termsAggregate.getNumPartitions() > -1) {
diff --git a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
index 324a626..dd0d0ec 100644
--- a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
+++ b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg
@@ -45,3 +45,6 @@
# The following setting is used to set the aggregate query bucket size
aggregateQueryBucketSize=5000
+
+# Maximum size allowed for an elastic "ids" query
+maximumIdsQueryCount=5000
\ No newline at end of file
diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
index 0265a42..c8aeaca 100644
--- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
+++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
@@ -30,8 +30,6 @@
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.*;
@@ -41,8 +39,8 @@
private PersistenceService persistenceService;
private SegmentService segmentService;
- private int maximumIdsQueryCount = 1000;
- private int termsAggregatePartitionSize = 1000;
+ private int maximumIdsQueryCount = 5000;
+ private int aggregateQueryBucketSize = 5000;
public void setDefinitionsService(DefinitionsService definitionsService) {
this.definitionsService = definitionsService;
@@ -56,8 +54,8 @@
this.maximumIdsQueryCount = maximumIdsQueryCount;
}
- public void setTermsAggregatePartitionSize(int termsAggregatePartitionSize) {
- this.termsAggregatePartitionSize = termsAggregatePartitionSize;
+ public void setAggregateQueryBucketSize(int aggregateQueryBucketSize) {
+ this.aggregateQueryBucketSize = aggregateQueryBucketSize;
}
public void setSegmentService(SegmentService segmentService) {
@@ -95,7 +93,7 @@
Map<String, Double> m = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE);
long card = m.get("_card").longValue();
- int numParts = (int) (card / termsAggregatePartitionSize);
+ int numParts = (int) (card / aggregateQueryBucketSize) + 2;
for (int i = 0; i < numParts; i++) {
Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
if (eventCountByProfile != null) {
@@ -133,7 +131,7 @@
if (minimumEventCount != 1 || maximumEventCount != Integer.MAX_VALUE) {
// Event count specified, must check occurences count for each profile
int result = 0;
- int numParts = (int) (card / termsAggregatePartitionSize);
+ int numParts = (int) (card / aggregateQueryBucketSize) + 2;
for (int i = 0; i < numParts; i++) {
Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
int j = 0;
diff --git a/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 9aa9d6c..5a10214 100644
--- a/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/plugins/baseplugin/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -29,6 +29,14 @@
</cm:default-properties>
</cm:property-placeholder>
+ <cm:property-placeholder persistent-id="org.apache.unomi.persistence.elasticsearch"
+ update-strategy="reload" placeholder-prefix="${es.">
+ <cm:default-properties>
+ <cm:property name="maximumIdsQueryCount" value="5000"/>
+ <cm:property name="aggregateQueryBucketSize" value="5000"/>
+ </cm:default-properties>
+ </cm:property-placeholder>
+
<reference id="definitionsService" interface="org.apache.unomi.api.services.DefinitionsService"/>
<reference id="persistenceService" interface="org.apache.unomi.persistence.spi.PersistenceService"/>
<reference id="profileService" interface="org.apache.unomi.api.services.ProfileService"/>
@@ -94,6 +102,8 @@
<property name="definitionsService" ref="definitionsService"/>
<property name="persistenceService" ref="persistenceService"/>
<property name="segmentService" ref="segmentService"/>
+ <property name="maximumIdsQueryCount" value="${es.maximumIdsQueryCount}"/>
+ <property name="aggregateQueryBucketSize" value="${es.aggregateQueryBucketSize}"/>
</bean>
</service>
diff --git a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
index c68f691..ec74525 100644
--- a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
@@ -57,7 +57,7 @@
private List<Scoring> allScoring;
private Timer segmentTimer;
private int segmentUpdateBatchSize = 1000;
- private int termsAggregatePartitionSize = 1000;
+ private int aggregateQueryBucketSize = 5000;
public SegmentServiceImpl() {
logger.info("Initializing segment service...");
@@ -83,8 +83,8 @@
this.segmentUpdateBatchSize = segmentUpdateBatchSize;
}
- public void setTermsAggregatePartitionSize(int termsAggregatePartitionSize) {
- this.termsAggregatePartitionSize = termsAggregatePartitionSize;
+ public void setAggregateQueryBucketSize(int aggregateQueryBucketSize) {
+ this.aggregateQueryBucketSize = aggregateQueryBucketSize;
}
public void postConstruct() {
@@ -774,7 +774,7 @@
Map<String, Double> m = persistenceService.getSingleValuesMetrics(andCondition, new String[]{"card"}, "profileId.keyword", Event.ITEM_TYPE);
long card = m.get("_card").longValue();
- int numParts = (int) (card / termsAggregatePartitionSize);
+ int numParts = (int) (card / aggregateQueryBucketSize) + 2;
for (int i = 0; i < numParts; i++) {
Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(andCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) {
diff --git a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index eed06f2..979c6be 100644
--- a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -44,6 +44,13 @@
</cm:default-properties>
</cm:property-placeholder>
+ <cm:property-placeholder persistent-id="org.apache.unomi.persistence.elasticsearch"
+ update-strategy="reload" placeholder-prefix="${es.">
+ <cm:default-properties>
+ <cm:property name="aggregateQueryBucketSize" value="5000"/>
+ </cm:default-properties>
+ </cm:property-placeholder>
+
<reference id="persistenceService"
interface="org.apache.unomi.persistence.spi.PersistenceService"/>
<reference id="httpService" interface="org.osgi.service.http.HttpService"/>
@@ -144,6 +151,7 @@
<property name="bundleContext" ref="blueprintBundleContext"/>
<property name="taskExecutionPeriod" value="86400000"/>
<property name="segmentUpdateBatchSize" value="${services.segment.update.batchSize}" />
+ <property name="aggregateQueryBucketSize" value="${es.aggregateQueryBucketSize}" />
</bean>
<service id="segmentService" ref="segmentServiceImpl">
<interfaces>