blob: 5b81819b9f35fa62d96d253a943eb9aed78f58bc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.unomi.services.impl.events;
import com.fasterxml.jackson.databind.JsonNode;
import inet.ipaddr.IPAddress;
import inet.ipaddr.IPAddressString;
import org.apache.commons.lang3.StringUtils;
import org.apache.unomi.api.Event;
import org.apache.unomi.api.EventProperty;
import org.apache.unomi.api.Metadata;
import org.apache.unomi.api.PartialList;
import org.apache.unomi.api.PropertyType;
import org.apache.unomi.api.Session;
import org.apache.unomi.api.ValueType;
import org.apache.unomi.api.actions.ActionPostExecutor;
import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.api.query.Query;
import org.apache.unomi.api.services.*;
import org.apache.unomi.persistence.spi.CustomObjectMapper;
import org.apache.unomi.persistence.spi.PersistenceService;
import org.apache.unomi.persistence.spi.aggregate.TermsAggregate;
import org.apache.unomi.api.utils.ParserHelper;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
public class EventServiceImpl implements EventService {
private static final Logger logger = LoggerFactory.getLogger(EventServiceImpl.class.getName());
private static final int MAX_RECURSION_DEPTH = 10;
private List<EventListenerService> eventListeners = new CopyOnWriteArrayList<EventListenerService>();
private PersistenceService persistenceService;
private DefinitionsService definitionsService;
private SourceService sourceService;
private BundleContext bundleContext;
private Set<String> predefinedEventTypeIds = new LinkedHashSet<String>();
private Set<String> restrictedEventTypeIds = new LinkedHashSet<String>();
private Map<String, ThirdPartyServer> thirdPartyServers = new HashMap<>();
private Boolean shouldBeCheckedEventSourceId;
public void setThirdPartyConfiguration(Map<String, String> thirdPartyConfiguration) {
this.thirdPartyServers = new HashMap<>();
for (Map.Entry<String, String> entry : thirdPartyConfiguration.entrySet()) {
String[] keys = StringUtils.split(entry.getKey(),'.');
if (keys[0].equals("thirdparty")) {
if (!thirdPartyServers.containsKey(keys[1])) {
thirdPartyServers.put(keys[1], new ThirdPartyServer(keys[1]));
}
ThirdPartyServer thirdPartyServer = thirdPartyServers.get(keys[1]);
if (keys[2].equals("allowedEvents")) {
HashSet<String> allowedEvents = new HashSet<>(Arrays.asList(StringUtils.split(entry.getValue(), ',')));
restrictedEventTypeIds.addAll(allowedEvents);
thirdPartyServer.setAllowedEvents(allowedEvents);
} else if (keys[2].equals("key")) {
thirdPartyServer.setKey(entry.getValue());
} else if (keys[2].equals("ipAddresses")) {
Set<IPAddress> ipAddresses = new HashSet<>();
for (String ip : StringUtils.split(entry.getValue(), ',')) {
IPAddress ipAddress = new IPAddressString(ip.trim()).getAddress();
ipAddresses.add(ipAddress);
}
thirdPartyServer.setIpAddresses(ipAddresses);
}
}
}
}
public void setPredefinedEventTypeIds(Set<String> predefinedEventTypeIds) {
this.predefinedEventTypeIds = predefinedEventTypeIds;
}
public void setRestrictedEventTypeIds(Set<String> restrictedEventTypeIds) {
this.restrictedEventTypeIds = restrictedEventTypeIds;
}
public void setShouldBeCheckedEventSourceId(boolean shouldBeCheckedEventSourceId) {
this.shouldBeCheckedEventSourceId = shouldBeCheckedEventSourceId;
}
public void setPersistenceService(PersistenceService persistenceService) {
this.persistenceService = persistenceService;
}
public void setDefinitionsService(DefinitionsService definitionsService) {
this.definitionsService = definitionsService;
}
public void setSourceService(SourceService sourceService) {
this.sourceService = sourceService;
}
public void setBundleContext(BundleContext bundleContext) {
this.bundleContext = bundleContext;
}
public boolean isEventAllowed(Event event, String thirdPartyId) {
if (restrictedEventTypeIds.contains(event.getEventType())) {
return thirdPartyServers.containsKey(thirdPartyId) && thirdPartyServers.get(thirdPartyId).getAllowedEvents().contains(event.getEventType());
}
return true;
}
public String authenticateThirdPartyServer(String key, String ip) {
logger.debug("Authenticating third party server with key: " + key + " and IP: " + ip);
if (key != null) {
for (Map.Entry<String, ThirdPartyServer> entry : thirdPartyServers.entrySet()) {
ThirdPartyServer server = entry.getValue();
if (server.getKey().equals(key)) {
IPAddress ipAddress = new IPAddressString(ip).getAddress();
for (IPAddress serverIpAddress : server.getIpAddresses()) {
if (serverIpAddress.contains(ipAddress)) {
return server.getId();
}
}
}
}
logger.warn("Could not authenticate any third party servers for key: {}", key);
}
return null;
}
public int send(Event event) {
return send(event, 0);
}
private int send(Event event, int depth) {
if (shouldBeCheckedEventSourceId == Boolean.TRUE && sourceService.load(event.getSourceId()) == null) {
logger.warn("Event sending was rejected, because source with sourceId=\"{}\" does not registered in the system.", event.getSourceId());
return NO_CHANGE;
}
if (depth > MAX_RECURSION_DEPTH) {
logger.warn("Max recursion depth reached");
return NO_CHANGE;
}
boolean saveSucceeded = true;
if (event.isPersistent()) {
saveSucceeded = persistenceService.save(event, null, true);
}
int changes;
if (saveSucceeded) {
changes = NO_CHANGE;
final Session session = event.getSession();
if (event.isPersistent() && session != null) {
session.setLastEventDate(event.getTimeStamp());
}
if (event.getProfile() != null) {
for (EventListenerService eventListenerService : eventListeners) {
if (eventListenerService.canHandle(event)) {
changes |= eventListenerService.onEvent(event);
}
}
// At the end of the processing event execute the post executor actions
for (ActionPostExecutor actionPostExecutor : event.getActionPostExecutors()) {
changes |= actionPostExecutor.execute() ? changes : NO_CHANGE;
}
if ((changes & PROFILE_UPDATED) == PROFILE_UPDATED) {
Event profileUpdated = new Event("profileUpdated", session, event.getProfile(), event.getSourceId(), event.getSource(), event.getProfile(), event.getTimeStamp());
profileUpdated.setPersistent(false);
profileUpdated.getAttributes().putAll(event.getAttributes());
changes |= send(profileUpdated, depth + 1);
if (session != null && session.getProfileId() != null) {
changes |= SESSION_UPDATED;
session.setProfile(event.getProfile());
}
}
}
} else {
changes = ERROR;
}
return changes;
}
@Override
public List<EventProperty> getEventProperties() {
Map<String, Map<String, Object>> mappings = persistenceService.getPropertiesMapping(Event.ITEM_TYPE);
List<EventProperty> props = new ArrayList<>(mappings.size());
getEventProperties(mappings, props, "");
return props;
}
@SuppressWarnings("unchecked")
private void getEventProperties(Map<String, Map<String, Object>> mappings, List<EventProperty> props, String prefix) {
for (Map.Entry<String, Map<String, Object>> e : mappings.entrySet()) {
if (e.getValue().get("properties") != null) {
getEventProperties((Map<String, Map<String, Object>>) e.getValue().get("properties"), props, prefix + e.getKey() + ".");
} else {
props.add(new EventProperty(prefix + e.getKey(), (String) e.getValue().get("type")));
}
}
}
private List<PropertyType> getEventPropertyTypes() {
Map<String, Map<String, Object>> mappings = persistenceService.getPropertiesMapping(Event.ITEM_TYPE);
return new ArrayList<>(getEventPropertyTypes(mappings));
}
@SuppressWarnings("unchecked")
private Set<PropertyType> getEventPropertyTypes(Map<String, Map<String, Object>> mappings) {
Set<PropertyType> properties = new LinkedHashSet<>();
for (Map.Entry<String, Map<String, Object>> e : mappings.entrySet()) {
Set<PropertyType> childProperties = null;
Metadata propertyMetadata = new Metadata(null, e.getKey(), e.getKey(), null);
Set<String> systemTags = new HashSet<>();
propertyMetadata.setSystemTags(systemTags);
PropertyType propertyType = new PropertyType(propertyMetadata);
propertyType.setTarget("event");
ValueType valueType = null;
if (e.getValue().get("properties") != null) {
childProperties = getEventPropertyTypes((Map<String, Map<String, Object>>) e.getValue().get("properties"));
valueType = definitionsService.getValueType("set");
if (childProperties != null && childProperties.size() > 0) {
propertyType.setChildPropertyTypes(childProperties);
}
} else {
valueType = mappingTypeToValueType( (String) e.getValue().get("type"));
}
propertyType.setValueTypeId(valueType.getId());
propertyType.setValueType(valueType);
properties.add(propertyType);
}
return properties;
}
private ValueType mappingTypeToValueType(String mappingType) {
if ("text".equals(mappingType)) {
return definitionsService.getValueType("string");
} else if ("date".equals(mappingType)) {
return definitionsService.getValueType("date");
} else if ("long".equals(mappingType)) {
return definitionsService.getValueType("integer");
} else if ("boolean".equals(mappingType)) {
return definitionsService.getValueType("boolean");
} else if ("set".equals(mappingType)) {
return definitionsService.getValueType("set");
} else if ("object".equals(mappingType)) {
return definitionsService.getValueType("set");
} else {
return definitionsService.getValueType("unknown");
}
}
public Set<String> getEventTypeIds() {
Map<String, Long> dynamicEventTypeIds = persistenceService.aggregateWithOptimizedQuery(null, new TermsAggregate("eventType"), Event.ITEM_TYPE);
Set<String> eventTypeIds = new LinkedHashSet<String>(predefinedEventTypeIds);
eventTypeIds.addAll(dynamicEventTypeIds.keySet());
eventTypeIds.remove("_filtered");
return eventTypeIds;
}
@Override
public PartialList<Event> searchEvents(Condition condition, int offset, int size) {
ParserHelper.resolveConditionType(definitionsService, condition, "event search");
return persistenceService.query(condition, "timeStamp", Event.class, offset, size);
}
@Override
public PartialList<Event> searchEvents(String sessionId, String[] eventTypes, String query, int offset, int size, String sortBy) {
List<Condition> conditions = new ArrayList<Condition>();
Condition condition = new Condition(definitionsService.getConditionType("eventPropertyCondition"));
condition.setParameter("propertyName", "sessionId");
condition.setParameter("propertyValue", sessionId);
condition.setParameter("comparisonOperator", "equals");
conditions.add(condition);
condition = new Condition(definitionsService.getConditionType("booleanCondition"));
condition.setParameter("operator", "or");
List<Condition> subConditions = new ArrayList<Condition>();
for (String eventType : eventTypes) {
Condition subCondition = new Condition(definitionsService.getConditionType("eventPropertyCondition"));
subCondition.setParameter("propertyName", "eventType");
subCondition.setParameter("propertyValue", eventType);
subCondition.setParameter("comparisonOperator", "equals");
subConditions.add(subCondition);
}
condition.setParameter("subConditions", subConditions);
conditions.add(condition);
condition = new Condition(definitionsService.getConditionType("booleanCondition"));
condition.setParameter("operator", "and");
condition.setParameter("subConditions", conditions);
if (StringUtils.isNotBlank(query)) {
return persistenceService.queryFullText(query, condition, sortBy, Event.class, offset, size);
} else {
return persistenceService.query(condition, sortBy, Event.class, offset, size);
}
}
@Override
public PartialList<Event> search(Query query) {
if (query.getCondition() != null && definitionsService.resolveConditionType(query.getCondition())) {
if (StringUtils.isNotBlank(query.getText())) {
return persistenceService.queryFullText(query.getText(), query.getCondition(), query.getSortby(), Event.class, query.getOffset(), query.getLimit());
} else {
return persistenceService.query(query.getCondition(), query.getSortby(), Event.class, query.getOffset(), query.getLimit());
}
} else {
if (StringUtils.isNotBlank(query.getText())) {
return persistenceService.queryFullText(query.getText(), query.getSortby(), Event.class, query.getOffset(), query.getLimit());
} else {
return persistenceService.getAllItems(Event.class, query.getOffset(), query.getLimit(), query.getSortby());
}
}
}
@Override
public Event getEvent(String id) {
return persistenceService.load(id, Event.class);
}
public boolean hasEventAlreadyBeenRaised(Event event) {
Event pastEvent = this.persistenceService.load(event.getItemId(), Event.class);
if (pastEvent != null && pastEvent.getVersion() >= 1) {
if ((pastEvent.getSessionId() != null && pastEvent.getSessionId().equals(event.getSessionId())) ||
(pastEvent.getProfileId() != null && pastEvent.getProfileId().equals(event.getProfileId()))) {
return true;
}
}
return false;
}
public boolean hasEventAlreadyBeenRaised(Event event, boolean session) {
List<Condition> conditions = new ArrayList<Condition>();
Condition profileIdCondition = new Condition(definitionsService.getConditionType("eventPropertyCondition"));
if (session) {
profileIdCondition.setParameter("propertyName", "sessionId");
profileIdCondition.setParameter("propertyValue", event.getSessionId());
} else {
profileIdCondition.setParameter("propertyName", "profileId");
profileIdCondition.setParameter("propertyValue", event.getProfileId());
}
profileIdCondition.setParameter("comparisonOperator", "equals");
conditions.add(profileIdCondition);
Condition condition = new Condition(definitionsService.getConditionType("eventPropertyCondition"));
condition.setParameter("propertyName", "eventType");
condition.setParameter("propertyValue", event.getEventType());
condition.setParameter("comparisonOperator", "equals");
conditions.add(condition);
condition = new Condition(definitionsService.getConditionType("eventPropertyCondition"));
condition.setParameter("propertyName", "target.itemId");
condition.setParameter("propertyValue", event.getTarget().getItemId());
condition.setParameter("comparisonOperator", "equals");
conditions.add(condition);
condition = new Condition(definitionsService.getConditionType("eventPropertyCondition"));
condition.setParameter("propertyName", "target.itemType");
condition.setParameter("propertyValue", event.getTarget().getItemType());
condition.setParameter("comparisonOperator", "equals");
conditions.add(condition);
Condition andCondition = new Condition(definitionsService.getConditionType("booleanCondition"));
andCondition.setParameter("operator", "and");
andCondition.setParameter("subConditions", conditions);
long size = persistenceService.queryCount(andCondition, Event.ITEM_TYPE);
return size > 0;
}
public void bind(ServiceReference<EventListenerService> serviceReference) {
EventListenerService eventListenerService = bundleContext.getService(serviceReference);
eventListeners.add(eventListenerService);
}
public void unbind(ServiceReference<EventListenerService> serviceReference) {
if (serviceReference != null) {
EventListenerService eventListenerService = bundleContext.getService(serviceReference);
eventListeners.remove(eventListenerService);
}
}
public void removeProfileEvents(String profileId){
Condition profileCondition = new Condition();
profileCondition.setConditionType(definitionsService.getConditionType("eventPropertyCondition"));
profileCondition.setParameter("propertyName", "profileId");
profileCondition.setParameter("comparisonOperator", "equals");
profileCondition.setParameter("propertyValue", profileId);
persistenceService.removeByQuery(profileCondition,Event.class);
}
}