| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.unomi.services.services; |
| |
| import org.apache.unomi.api.*; |
| import org.apache.unomi.api.actions.Action; |
| import org.apache.unomi.api.actions.ActionExecutor; |
| import org.apache.unomi.api.conditions.Condition; |
| import org.apache.unomi.api.query.Query; |
| import org.apache.unomi.api.rules.Rule; |
| import org.apache.unomi.api.rules.RuleStatistics; |
| import org.apache.unomi.api.services.*; |
| import org.apache.unomi.persistence.spi.CustomObjectMapper; |
| import org.apache.unomi.persistence.spi.PersistenceService; |
| import org.apache.unomi.services.actions.ActionExecutorDispatcher; |
| import org.osgi.framework.*; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.net.URL; |
| import java.util.*; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| public class RulesServiceImpl implements RulesService, EventListenerService, SynchronousBundleListener { |
| |
| public static final String RULE_QUERY_PREFIX = "rule_"; |
| private static final Logger logger = LoggerFactory.getLogger(RulesServiceImpl.class.getName()); |
| private BundleContext bundleContext; |
| |
| private PersistenceService persistenceService; |
| |
| private DefinitionsService definitionsService; |
| |
| private EventService eventService; |
| |
| private PatchService patchService; |
| |
| private ActionExecutorDispatcher actionExecutorDispatcher; |
| private List<Rule> allRules; |
| |
| private Timer rulesTimer; |
| private Timer ruleStatisticsTimer; |
| |
| private Map<String,RuleStatistics> allRuleStatistics = new ConcurrentHashMap<>(); |
| |
| public void setBundleContext(BundleContext bundleContext) { |
| this.bundleContext = bundleContext; |
| } |
| |
| public void setPersistenceService(PersistenceService persistenceService) { |
| this.persistenceService = persistenceService; |
| } |
| |
| public void setDefinitionsService(DefinitionsService definitionsService) { |
| this.definitionsService = definitionsService; |
| } |
| |
| public void setEventService(EventService eventService) { |
| this.eventService = eventService; |
| } |
| |
| public void setActionExecutorDispatcher(ActionExecutorDispatcher actionExecutorDispatcher) { |
| this.actionExecutorDispatcher = actionExecutorDispatcher; |
| } |
| |
| public void setPatchService(PatchService patchService) { |
| this.patchService = patchService; |
| } |
| |
| public void bindExecutor(ServiceReference<ActionExecutor> actionExecutorServiceReference) { |
| ActionExecutor actionExecutor = bundleContext.getService(actionExecutorServiceReference); |
| actionExecutorDispatcher.addExecutor(actionExecutorServiceReference.getProperty("actionExecutorId").toString(), actionExecutor); |
| } |
| |
| public void unbindExecutor(ServiceReference<ActionExecutor> actionExecutorServiceReference) { |
| if (actionExecutorServiceReference == null) { |
| return; |
| } |
| actionExecutorDispatcher.removeExecutor(actionExecutorServiceReference.getProperty("actionExecutorId").toString()); |
| } |
| |
| public void postConstruct() { |
| logger.debug("postConstruct {" + bundleContext.getBundle() + "}"); |
| |
| loadPredefinedRules(bundleContext); |
| for (Bundle bundle : bundleContext.getBundles()) { |
| if (bundle.getBundleContext() != null && bundle.getBundleId() != bundleContext.getBundle().getBundleId()) { |
| loadPredefinedRules(bundle.getBundleContext()); |
| } |
| } |
| |
| bundleContext.addBundleListener(this); |
| |
| initializeTimers(); |
| logger.info("Rule service initialized."); |
| } |
| |
| public void preDestroy() { |
| bundleContext.removeBundleListener(this); |
| cancelTimers(); |
| logger.info("Rule service shutdown."); |
| } |
| |
| private void cancelTimers() { |
| if(rulesTimer != null) { |
| rulesTimer.cancel(); |
| } |
| if (ruleStatisticsTimer != null) { |
| ruleStatisticsTimer.cancel(); |
| } |
| logger.info("Rule purge: Purge unscheduled"); |
| } |
| |
| private void processBundleStartup(BundleContext bundleContext) { |
| if (bundleContext == null) { |
| return; |
| } |
| loadPredefinedRules(bundleContext); |
| } |
| |
| private void processBundleStop(BundleContext bundleContext) { |
| if (bundleContext == null) { |
| return; |
| } |
| } |
| |
| private void loadPredefinedRules(BundleContext bundleContext) { |
| Enumeration<URL> predefinedRuleEntries = bundleContext.getBundle().findEntries("META-INF/cxs/rules", "*.json", true); |
| if (predefinedRuleEntries == null) { |
| return; |
| } |
| |
| // First apply patches on existing items |
| patchService.patch(bundleContext.getBundle().findEntries("META-INF/cxs/rules", "*-patch.json", true), Rule.class); |
| |
| while (predefinedRuleEntries.hasMoreElements()) { |
| URL predefinedRuleURL = predefinedRuleEntries.nextElement(); |
| if (!predefinedRuleURL.getFile().endsWith("-patch.json")) { |
| logger.debug("Found predefined rule at " + predefinedRuleURL + ", loading... "); |
| |
| try { |
| Rule rule = CustomObjectMapper.getObjectMapper().readValue(predefinedRuleURL, Rule.class); |
| // Register only if rule does not exist yet |
| if (getRule(rule.getMetadata().getId()) == null) { |
| setRule(rule); |
| logger.info("Predefined rule with id {} registered", rule.getMetadata().getId()); |
| } else { |
| logger.info("The predefined rule with id {} is already registered, this rule will be skipped", rule.getMetadata().getId()); |
| } |
| } catch (IOException e) { |
| logger.error("Error while loading rule definition " + predefinedRuleURL, e); |
| } |
| } |
| } |
| } |
| |
| public Set<Rule> getMatchingRules(Event event) { |
| Set<Rule> matchedRules = new LinkedHashSet<Rule>(); |
| |
| Boolean hasEventAlreadyBeenRaisedForSession = null; |
| Boolean hasEventAlreadyBeenRaisedForProfile = null; |
| |
| List<Rule> allItems = allRules; |
| |
| for (Rule rule : allItems) { |
| if (!rule.getMetadata().isEnabled()) { |
| continue; |
| } |
| RuleStatistics ruleStatistics = getLocalRuleStatistics(rule); |
| long ruleConditionStartTime = System.currentTimeMillis(); |
| String scope = rule.getMetadata().getScope(); |
| if (scope.equals(Metadata.SYSTEM_SCOPE) || scope.equals(event.getScope())) { |
| Condition eventCondition = definitionsService.extractConditionBySystemTag(rule.getCondition(), "eventCondition"); |
| |
| if (eventCondition == null) { |
| updateRuleStatistics(ruleStatistics, ruleConditionStartTime); |
| continue; |
| } |
| |
| if (!persistenceService.testMatch(eventCondition, event)) { |
| updateRuleStatistics(ruleStatistics, ruleConditionStartTime); |
| continue; |
| } |
| |
| Condition sourceCondition = definitionsService.extractConditionBySystemTag(rule.getCondition(), "sourceEventCondition"); |
| if (sourceCondition != null && !persistenceService.testMatch(sourceCondition, event.getSource())) { |
| updateRuleStatistics(ruleStatistics, ruleConditionStartTime); |
| continue; |
| } |
| |
| if (rule.isRaiseEventOnlyOnceForProfile()) { |
| hasEventAlreadyBeenRaisedForProfile = hasEventAlreadyBeenRaisedForProfile != null ? hasEventAlreadyBeenRaisedForProfile : eventService.hasEventAlreadyBeenRaised(event, false); |
| if (hasEventAlreadyBeenRaisedForProfile) { |
| updateRuleStatistics(ruleStatistics, ruleConditionStartTime); |
| continue; |
| } |
| } else if (rule.isRaiseEventOnlyOnceForSession()) { |
| hasEventAlreadyBeenRaisedForSession = hasEventAlreadyBeenRaisedForSession != null ? hasEventAlreadyBeenRaisedForSession : eventService.hasEventAlreadyBeenRaised(event, true); |
| if (hasEventAlreadyBeenRaisedForSession) { |
| updateRuleStatistics(ruleStatistics, ruleConditionStartTime); |
| continue; |
| } |
| } |
| |
| Condition profileCondition = definitionsService.extractConditionBySystemTag(rule.getCondition(), "profileCondition"); |
| if (profileCondition != null && !persistenceService.testMatch(profileCondition, event.getProfile())) { |
| updateRuleStatistics(ruleStatistics, ruleConditionStartTime); |
| continue; |
| } |
| Condition sessionCondition = definitionsService.extractConditionBySystemTag(rule.getCondition(), "sessionCondition"); |
| if (sessionCondition != null && !persistenceService.testMatch(sessionCondition, event.getSession())) { |
| updateRuleStatistics(ruleStatistics, ruleConditionStartTime); |
| continue; |
| } |
| matchedRules.add(rule); |
| } |
| } |
| |
| return matchedRules; |
| } |
| |
| private RuleStatistics getLocalRuleStatistics(Rule rule) { |
| RuleStatistics ruleStatistics = this.allRuleStatistics.get(rule.getItemId()); |
| if (ruleStatistics == null) { |
| ruleStatistics = new RuleStatistics(rule.getItemId()); |
| } |
| return ruleStatistics; |
| } |
| |
| private void updateRuleStatistics(RuleStatistics ruleStatistics, long ruleConditionStartTime) { |
| long totalRuleConditionTime = System.currentTimeMillis() - ruleConditionStartTime; |
| ruleStatistics.setLocalConditionsTime(ruleStatistics.getLocalConditionsTime() + totalRuleConditionTime); |
| allRuleStatistics.put(ruleStatistics.getItemId(), ruleStatistics); |
| } |
| |
| private List<Rule> getAllRules() { |
| List<Rule> allItems = persistenceService.getAllItems(Rule.class, 0, -1, "priority").getList(); |
| for (Rule rule : allItems) { |
| ParserHelper.resolveConditionType(definitionsService, rule.getCondition()); |
| ParserHelper.resolveActionTypes(definitionsService, rule.getActions()); |
| } |
| return allItems; |
| } |
| |
| |
| public boolean canHandle(Event event) { |
| return true; |
| } |
| |
| public int onEvent(Event event) { |
| Set<Rule> rules = getMatchingRules(event); |
| |
| int changes = EventService.NO_CHANGE; |
| for (Rule rule : rules) { |
| logger.debug("Fired rule " + rule.getMetadata().getId() + " for " + event.getEventType() + " - " + event.getItemId()); |
| long actionsStartTime = System.currentTimeMillis(); |
| for (Action action : rule.getActions()) { |
| changes |= actionExecutorDispatcher.execute(action, event); |
| } |
| long totalActionsTime = System.currentTimeMillis() - actionsStartTime; |
| Event ruleFired = new Event("ruleFired", event.getSession(), event.getProfile(), event.getScope(), event, rule, event.getTimeStamp()); |
| ruleFired.getAttributes().putAll(event.getAttributes()); |
| ruleFired.setPersistent(false); |
| changes |= eventService.send(ruleFired); |
| |
| RuleStatistics ruleStatistics = getLocalRuleStatistics(rule); |
| ruleStatistics.setLocalExecutionCount(ruleStatistics.getLocalExecutionCount()+1); |
| ruleStatistics.setLocalActionsTime(ruleStatistics.getLocalActionsTime() + totalActionsTime); |
| this.allRuleStatistics.put(rule.getItemId(), ruleStatistics); |
| } |
| return changes; |
| } |
| |
| @Override |
| public RuleStatistics getRuleStatistics(String ruleId) { |
| if (allRuleStatistics.containsKey(ruleId)) { |
| return allRuleStatistics.get(ruleId); |
| } |
| return persistenceService.load(ruleId, RuleStatistics.class); |
| } |
| |
| public Map<String,RuleStatistics> getAllRuleStatistics() { |
| return allRuleStatistics; |
| } |
| |
| @Override |
| public void resetAllRuleStatistics() { |
| Condition matchAllCondition = new Condition(definitionsService.getConditionType("matchAllCondition")); |
| persistenceService.removeByQuery(matchAllCondition,RuleStatistics.class); |
| allRuleStatistics.clear(); |
| } |
| |
| public Set<Metadata> getRuleMetadatas() { |
| Set<Metadata> metadatas = new HashSet<Metadata>(); |
| for (Rule rule : persistenceService.getAllItems(Rule.class, 0, 50, null).getList()) { |
| metadatas.add(rule.getMetadata()); |
| } |
| return metadatas; |
| } |
| |
| public PartialList<Metadata> getRuleMetadatas(Query query) { |
| if(query.isForceRefresh()){ |
| persistenceService.refresh(); |
| } |
| definitionsService.resolveConditionType(query.getCondition()); |
| List<Metadata> descriptions = new LinkedList<>(); |
| PartialList<Rule> rules = persistenceService.query(query.getCondition(), query.getSortby(), Rule.class, query.getOffset(), query.getLimit()); |
| for (Rule definition : rules.getList()) { |
| descriptions.add(definition.getMetadata()); |
| } |
| return new PartialList<>(descriptions, rules.getOffset(), rules.getPageSize(), rules.getTotalSize()); |
| } |
| |
| public PartialList<Rule> getRuleDetails(Query query) { |
| if (query.isForceRefresh()) { |
| persistenceService.refresh(); |
| } |
| definitionsService.resolveConditionType(query.getCondition()); |
| PartialList<Rule> rules = persistenceService.query(query.getCondition(), query.getSortby(), Rule.class, query.getOffset(), query.getLimit()); |
| List<Rule> details = new LinkedList<>(); |
| details.addAll(rules.getList()); |
| return new PartialList<>(details, rules.getOffset(), rules.getPageSize(), rules.getTotalSize()); |
| } |
| |
| public Rule getRule(String ruleId) { |
| Rule rule = persistenceService.load(ruleId, Rule.class); |
| if (rule != null) { |
| if (rule.getCondition() != null) { |
| ParserHelper.resolveConditionType(definitionsService, rule.getCondition()); |
| } |
| if (rule.getActions() != null) { |
| ParserHelper.resolveActionTypes(definitionsService, rule.getActions()); |
| } |
| } |
| return rule; |
| } |
| |
| public void setRule(Rule rule) { |
| if (rule.getMetadata().getScope() == null) { |
| rule.getMetadata().setScope("systemscope"); |
| } |
| Condition condition = rule.getCondition(); |
| if (condition != null) { |
| if (rule.getMetadata().isEnabled() && !rule.getMetadata().isMissingPlugins()) { |
| ParserHelper.resolveConditionType(definitionsService, condition); |
| definitionsService.extractConditionBySystemTag(condition, "eventCondition"); |
| } |
| } |
| persistenceService.save(rule); |
| } |
| |
| public Set<Condition> getTrackedConditions(Item source){ |
| Set<Condition> trackedConditions = new HashSet<>(); |
| for (Rule r : allRules) { |
| if (!r.getMetadata().isEnabled()) { |
| continue; |
| } |
| Condition trackedCondition = definitionsService.extractConditionBySystemTag(r.getCondition(), "trackedCondition"); |
| if(trackedCondition != null){ |
| Condition sourceEventPropertyCondition = definitionsService.extractConditionBySystemTag(r.getCondition(), "sourceEventCondition"); |
| if(source != null && sourceEventPropertyCondition != null) { |
| ParserHelper.resolveConditionType(definitionsService, sourceEventPropertyCondition); |
| if(persistenceService.testMatch(sourceEventPropertyCondition, source)){ |
| trackedConditions.add(trackedCondition); |
| } |
| } else { |
| trackedConditions.add(trackedCondition); |
| } |
| |
| } |
| } |
| return trackedConditions; |
| } |
| |
| public void removeRule(String ruleId) { |
| persistenceService.remove(ruleId, Rule.class); |
| } |
| |
| private void initializeTimers() { |
| rulesTimer = new Timer(); |
| TimerTask task = new TimerTask() { |
| @Override |
| public void run() { |
| allRules = getAllRules(); |
| } |
| }; |
| rulesTimer.schedule(task, 0, 1000); |
| ruleStatisticsTimer = new Timer(); |
| TimerTask statisticsTask = new TimerTask() { |
| @Override |
| public void run() { |
| syncRuleStatistics(); |
| } |
| }; |
| ruleStatisticsTimer.schedule(statisticsTask, 0, 10000); |
| } |
| |
| public void bundleChanged(BundleEvent event) { |
| switch (event.getType()) { |
| case BundleEvent.STARTED: |
| processBundleStartup(event.getBundle().getBundleContext()); |
| break; |
| case BundleEvent.STOPPING: |
| processBundleStop(event.getBundle().getBundleContext()); |
| break; |
| } |
| } |
| |
| private void syncRuleStatistics() { |
| List<RuleStatistics> allPersistedRuleStatisticsList = persistenceService.getAllItems(RuleStatistics.class); |
| Map<String,RuleStatistics> allPersistedRuleStatistics = new HashMap<>(); |
| for (RuleStatistics ruleStatistics : allPersistedRuleStatisticsList) { |
| allPersistedRuleStatistics.put(ruleStatistics.getItemId(), ruleStatistics); |
| } |
| // first we iterate over the rules we have in memory |
| for (RuleStatistics ruleStatistics : allRuleStatistics.values()) { |
| boolean mustPersist = false; |
| if (allPersistedRuleStatistics.containsKey(ruleStatistics.getItemId())) { |
| // we must sync with the data coming from the persistence service. |
| RuleStatistics persistedRuleStatistics = allPersistedRuleStatistics.get(ruleStatistics.getItemId()); |
| ruleStatistics.setExecutionCount(persistedRuleStatistics.getExecutionCount() + ruleStatistics.getLocalExecutionCount()); |
| if (ruleStatistics.getLocalExecutionCount() > 0) { |
| ruleStatistics.setLocalExecutionCount(0); |
| mustPersist = true; |
| } |
| ruleStatistics.setConditionsTime(persistedRuleStatistics.getConditionsTime() + ruleStatistics.getLocalConditionsTime()); |
| if (ruleStatistics.getLocalConditionsTime() > 0) { |
| ruleStatistics.setLocalConditionsTime(0); |
| mustPersist = true; |
| } |
| ruleStatistics.setActionsTime(persistedRuleStatistics.getActionsTime() + ruleStatistics.getLocalActionsTime()); |
| if (ruleStatistics.getLocalActionsTime() > 0) { |
| ruleStatistics.setLocalActionsTime(0); |
| mustPersist = true; |
| } |
| ruleStatistics.setLastSyncDate(new Date()); |
| } else { |
| ruleStatistics.setExecutionCount(ruleStatistics.getExecutionCount() + ruleStatistics.getLocalExecutionCount()); |
| if (ruleStatistics.getLocalExecutionCount() > 0) { |
| ruleStatistics.setLocalExecutionCount(0); |
| mustPersist = true; |
| } |
| ruleStatistics.setConditionsTime(ruleStatistics.getConditionsTime() + ruleStatistics.getLocalConditionsTime()); |
| if (ruleStatistics.getLocalConditionsTime() > 0) { |
| ruleStatistics.setLocalConditionsTime(0); |
| mustPersist = true; |
| } |
| ruleStatistics.setActionsTime(ruleStatistics.getActionsTime() + ruleStatistics.getLocalActionsTime()); |
| if (ruleStatistics.getLocalActionsTime() > 0) { |
| ruleStatistics.setLocalActionsTime(0); |
| mustPersist = true; |
| } |
| ruleStatistics.setLastSyncDate(new Date()); |
| } |
| allRuleStatistics.put(ruleStatistics.getItemId(), ruleStatistics); |
| if (mustPersist) { |
| persistenceService.save(ruleStatistics); |
| } |
| } |
| // now let's iterate over the rules coming from the persistence service, as we may have new ones. |
| for (RuleStatistics ruleStatistics : allPersistedRuleStatistics.values()) { |
| if (!allRuleStatistics.containsKey(ruleStatistics.getItemId())) { |
| allRuleStatistics.put(ruleStatistics.getItemId(), ruleStatistics); |
| } |
| } |
| } |
| } |