| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.usergrid.services.notifications.impl; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Properties; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.usergrid.batch.JobExecution; |
| import org.apache.usergrid.persistence.Entity; |
| import org.apache.usergrid.persistence.EntityManager; |
| import org.apache.usergrid.persistence.EntityRef; |
| import org.apache.usergrid.persistence.PathQuery; |
| import org.apache.usergrid.persistence.Query; |
| import org.apache.usergrid.persistence.SimpleEntityRef; |
| import org.apache.usergrid.persistence.core.metrics.MetricsFactory; |
| import org.apache.usergrid.persistence.entities.Device; |
| import org.apache.usergrid.persistence.entities.Notification; |
| import org.apache.usergrid.persistence.entities.Notifier; |
| import org.apache.usergrid.persistence.entities.Receipt; |
| import org.apache.usergrid.persistence.entities.User; |
| import org.apache.usergrid.persistence.index.utils.UUIDUtils; |
| import org.apache.usergrid.persistence.queue.LegacyQueueManager; |
| import org.apache.usergrid.persistence.queue.LegacyQueueMessage; |
| import org.apache.usergrid.services.notifications.ApplicationQueueManager; |
| import org.apache.usergrid.services.notifications.ApplicationQueueMessage; |
| import org.apache.usergrid.services.notifications.JobScheduler; |
| import org.apache.usergrid.services.notifications.NotificationsService; |
| import org.apache.usergrid.services.notifications.ProviderAdapter; |
| import org.apache.usergrid.services.notifications.ProviderAdapterFactory; |
| import org.apache.usergrid.services.notifications.TaskManager; |
| import org.apache.usergrid.services.notifications.TaskTracker; |
| |
| import com.codahale.metrics.Meter; |
| |
| import rx.Observable; |
| import rx.Subscriber; |
| import rx.functions.Func1; |
| import rx.schedulers.Schedulers; |
| |
| |
| |
| public class ApplicationQueueManagerImpl implements ApplicationQueueManager { |
| |
| private static final Logger logger = LoggerFactory.getLogger(ApplicationQueueManagerImpl.class); |
| |
| private final EntityManager em; |
| private final LegacyQueueManager qm; |
| private final JobScheduler jobScheduler; |
| private final MetricsFactory metricsFactory; |
| private final String queueName; |
| private final Meter queueMeter; |
| private final Meter sendMeter; |
| private int concurrencyFactor; |
| |
| private final static String PUSH_PROCESSING_MAXTHREADS_PROP = "usergrid.push.async.processing.threads"; |
| private final static String PUSH_PROCESSING_QUEUESIZE_PROP = "usergrid.push.async.processing.queue.size"; |
| private final static String PUSH_PROCESSING_CONCURRENCY_PROP = "usergrid.push.async.processing.concurrency"; |
| |
| HashMap<Object, ProviderAdapter> notifierHashMap; // only retrieve notifiers once |
| |
| |
| |
| //private final Scheduler scheduler; |
| |
| |
| |
| |
| public ApplicationQueueManagerImpl(JobScheduler jobScheduler, EntityManager entityManager, |
| LegacyQueueManager legacyQueueManager, MetricsFactory metricsFactory, |
| Properties properties) { |
| this.em = entityManager; |
| this.qm = legacyQueueManager; |
| this.jobScheduler = jobScheduler; |
| this.metricsFactory = metricsFactory; |
| this.queueName = getQueueNames(properties); |
| this.queueMeter = metricsFactory.getMeter(ApplicationQueueManagerImpl.class, "notification.queue"); |
| this.sendMeter = metricsFactory.getMeter(NotificationsService.class, "queue.send"); |
| this.concurrencyFactor = Integer.valueOf(System.getProperty(PUSH_PROCESSING_CONCURRENCY_PROP, "50")); |
| |
| |
| /** |
| int maxAsyncThreads; |
| int workerQueueSize; |
| |
| try { |
| |
| maxAsyncThreads = Integer.valueOf(System.getProperty(PUSH_PROCESSING_MAXTHREADS_PROP, "200")); |
| workerQueueSize = Integer.valueOf(System.getProperty(PUSH_PROCESSING_QUEUESIZE_PROP, "2000")); |
| |
| } catch (Exception e){ |
| |
| // if junk is passed into the property, just default the values |
| maxAsyncThreads = 200; |
| workerQueueSize = 2000; |
| this.concurrencyFactor = 50; |
| |
| } |
| |
| |
| // create our own executor which has a bounded queue w/ caller runs policy for rejected tasks |
| this.scheduler = Schedulers.from(TaskExecutorFactory |
| .createTaskExecutor( "push-device-io", maxAsyncThreads, workerQueueSize, |
| TaskExecutorFactory.RejectionAction.CALLERRUNS )); |
| **/ |
| |
| } |
| |
| private boolean scheduleQueueJob(Notification notification) throws Exception { |
| return jobScheduler.scheduleQueueJob(notification); |
| } |
| |
| @Override |
| public void queueNotification(final Notification notification, final JobExecution jobExecution) throws Exception { |
| if (scheduleQueueJob(notification)) { |
| em.update(notification); |
| return; |
| } |
| |
| long startTime = System.currentTimeMillis(); |
| |
| if (notification.getCanceled() == Boolean.TRUE) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("notification " + notification.getUuid() + " canceled"); |
| } |
| if (jobExecution != null) { |
| jobExecution.killed(); |
| } |
| return; |
| } |
| |
| final PathQuery<Device> pathQuery = notification.getPathQuery().buildPathQuery(); //devices query |
| final AtomicInteger deviceCount = new AtomicInteger(); //count devices so you can make a judgement on batching |
| final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<>(); //build up list of issues |
| |
| // Get devices in querystring, and make sure you have access |
| if (pathQuery != null) { |
| final HashMap<Object, ProviderAdapter> notifierMap = getAdapterMap(); |
| if (logger.isTraceEnabled()) { |
| logger.trace("notification {} start query", notification.getUuid()); |
| } |
| |
| if(logger.isTraceEnabled()) { |
| logger.trace("Notification {} started processing", notification.getUuid()); |
| } |
| |
| |
| |
| // The main iterator can use graph traversal or index querying based on payload property. Default is Index. |
| final Iterator<Device> iterator; |
| if( notification.getUseGraph()){ |
| iterator = pathQuery.graphIterator(em); |
| }else{ |
| iterator = pathQuery.iterator(em); |
| } |
| |
| /**** Old code to scheduler large sets of data, but now the processing is fired off async in the background. |
| Leaving this only a reference of how to do it, if needed in future. |
| |
| //if there are more pages (defined by PAGE_SIZE) you probably want this to be async, |
| //also if this is already a job then don't reschedule |
| |
| if (iterator instanceof ResultsIterator |
| && ((ResultsIterator) iterator).hasPages() && jobExecution == null) { |
| |
| if(logger.isTraceEnabled()){ |
| logger.trace("Scheduling notification job as it has multiple pages of devices."); |
| } |
| jobScheduler.scheduleQueueJob(notification, true); |
| em.update(notification); |
| return; |
| } |
| ****/ |
| |
| final UUID appId = em.getApplication().getUuid(); |
| final Map<String, Object> payloads = notification.getPayloads(); |
| |
| final Func1<EntityRef, Optional<ApplicationQueueMessage>> sendMessageFunction = deviceRef -> { |
| |
| try { |
| |
| //logger.info("Preparing notification queue message for device: {}", deviceRef.getUuid()); |
| |
| long now = System.currentTimeMillis(); |
| |
| String notifierId = null; |
| String notifierKey = null; |
| |
| //find the device notifier info, match it to the payload |
| for (Map.Entry<String, Object> entry : payloads.entrySet()) { |
| ProviderAdapter adapter = notifierMap.get(entry.getKey().toLowerCase()); |
| now = System.currentTimeMillis(); |
| String providerId = getProviderId(deviceRef, adapter.getNotifier()); |
| if (providerId != null) { |
| notifierId = providerId; |
| notifierKey = entry.getKey().toLowerCase(); |
| break; |
| } |
| if (logger.isTraceEnabled()) { |
| logger.trace("Provider query for notification {} device {} took {} ms", notification.getUuid(), deviceRef.getUuid(), (System.currentTimeMillis() - now)); |
| } |
| } |
| |
| if (notifierId == null) { |
| //TODO need to leverage optional here |
| return Optional.empty(); |
| } |
| |
| ApplicationQueueMessage message = new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId); |
| if (notification.getQueued() == null) { |
| |
| // update queued time |
| notification.setQueued(System.currentTimeMillis()); |
| |
| } |
| |
| deviceCount.incrementAndGet(); |
| |
| return Optional.of(message); |
| |
| |
| } catch (Exception deviceLoopException) { |
| logger.error("Failed to add device", deviceLoopException); |
| errorMessages.add("Failed to add device: " + deviceRef.getUuid() + ", error:" + deviceLoopException); |
| |
| return Optional.empty(); |
| } |
| |
| }; |
| |
| |
| final Map<String, Object> filters = notification.getFilters(); |
| |
| Observable processMessagesObservable = Observable.create(new IteratorObservable<EntityRef>(iterator)) |
| |
| .flatMap( entityRef -> { |
| |
| return Observable.just(entityRef).flatMap(ref->{ |
| |
| List<Entity> entities = new ArrayList<>(); |
| |
| if( ref.getType().equals(User.ENTITY_TYPE)){ |
| |
| Query devicesQuery = new Query(); |
| devicesQuery.setCollection("devices"); |
| devicesQuery.setResultsLevel(Query.Level.CORE_PROPERTIES); |
| devicesQuery.setLimit(50); // for now, assume a user has no more than 50 devices |
| |
| try { |
| |
| entities = em.searchCollection(new SimpleEntityRef("user", ref.getUuid()), devicesQuery.getCollection(), devicesQuery).getEntities(); |
| |
| }catch (Exception e){ |
| |
| logger.error("Unable to load devices for user: {}", ref.getUuid()); |
| return Observable.empty(); |
| } |
| |
| |
| }else if ( ref.getType().equals(Device.ENTITY_TYPE)){ |
| |
| try{ |
| entities.add(em.get(ref)); |
| |
| }catch(Exception e){ |
| |
| logger.error("Unable to load device: {}", ref.getUuid()); |
| return Observable.empty(); |
| |
| } |
| |
| } |
| return Observable.from(entities); |
| |
| }) |
| .filter( device -> { |
| |
| if(logger.isTraceEnabled()) { |
| logger.trace("Filtering device: {}", device.getUuid()); |
| } |
| |
| if(notification.getUseGraph() && filters.size() > 0 ) { |
| |
| for (Map.Entry<String, Object> entry : filters.entrySet()) { |
| |
| if ((device.getDynamicProperties().get(entry.getKey()) != null && |
| device.getDynamicProperties().get(entry.getKey()).equals(entry.getValue())) || |
| |
| (device.getProperties().get(entry.getKey()) != null && |
| device.getProperties().get(entry.getKey()).equals(entry.getValue())) |
| |
| ) { |
| |
| |
| return true; |
| } |
| |
| } |
| if(logger.isTraceEnabled()) { |
| logger.trace("Push notification filter did not match for notification {}, so removing from notification", |
| device.getUuid(), notification.getUuid()); |
| } |
| return false; |
| |
| |
| } |
| |
| return true; |
| |
| }) |
| .map(sendMessageFunction) |
| .subscribeOn(Schedulers.io()); |
| |
| }, concurrencyFactor) |
| .distinct( queueMessage -> { |
| |
| if(queueMessage.isPresent()) { |
| return queueMessage.get().getDeviceId(); |
| } |
| |
| return UUIDUtils.newTimeUUID(); // this should be distinct, default handling for the Optional.empty() case |
| |
| } ) |
| .doOnNext( message -> { |
| try { |
| |
| if(message.isPresent()){ |
| |
| if(logger.isTraceEnabled()) { |
| logger.trace("Queueing notification message for device: {}", message.get().getDeviceId()); |
| } |
| qm.sendMessageToLocalRegion( message.get(), Boolean.TRUE ); |
| queueMeter.mark(); |
| } |
| |
| } catch (Exception e) { |
| |
| if(message.isPresent()){ |
| logger.error("Unable to queue notification for notification UUID {} and device UUID {} ", |
| message.get().getNotificationId(), message.get().getDeviceId()); |
| } |
| else{ |
| logger.error("Unable to queue notification as it's not present when trying to send to queue"); |
| } |
| |
| } |
| |
| |
| }) |
| .doOnError(throwable -> { |
| |
| logger.error("Error while processing devices for notification : {}, error: {}", notification.getUuid(), throwable.getMessage()); |
| notification.setProcessingFinished(-1L); |
| notification.setDeviceProcessedCount(deviceCount.get()); |
| logger.warn("Partial notification. Only {} devices processed for notification {}", |
| deviceCount.get(), notification.getUuid()); |
| try { |
| em.update(notification); |
| }catch (Exception e){ |
| logger.error("Error updating negative processing status when processing failed."); |
| } |
| |
| }) |
| .doOnCompleted( () -> { |
| |
| try { |
| notification.setProcessingFinished(System.currentTimeMillis()); |
| notification.setDeviceProcessedCount(deviceCount.get()); |
| em.update(notification); |
| if(logger.isTraceEnabled()) { |
| logger.trace("Notification {} finished processing {} device(s)", notification.getUuid(), deviceCount.get()); |
| } |
| |
| } catch (Exception e) { |
| logger.error("Unable to set processing finished timestamp for notification"); |
| } |
| |
| }); |
| |
| processMessagesObservable.subscribeOn(Schedulers.io()).subscribe(); // fire the queuing into the background |
| |
| } |
| |
| // update queued time |
| Map<String, Object> properties = new HashMap<>(2); |
| properties.put("queued", notification.getQueued()); |
| properties.put("state", notification.getState()); |
| if (errorMessages.size() > 0) { |
| if (notification.getErrorMessage() == null) { |
| notification.setErrorMessage("There was a problem delivering all of your notifications. See deliveryErrors in properties"); |
| } |
| } |
| |
| notification.addProperties(properties); |
| em.update(notification); |
| |
| |
| // if no devices, go ahead and mark the batch finished |
| if (deviceCount.get() <= 0 ) { |
| TaskManager taskManager = new TaskManager(em, notification); |
| taskManager.finishedBatch(); |
| } |
| |
| |
| } |
| |
| /** |
| * only need to get notifiers once. will reset on next batch |
| * |
| * @return |
| */ |
| private HashMap<Object, ProviderAdapter> getAdapterMap() { |
| if (notifierHashMap == null) { |
| long now = System.currentTimeMillis(); |
| notifierHashMap = new HashMap<>(); |
| Query query = new Query(); |
| query.setCollection("notifiers"); |
| query.setLimit(100); |
| PathQuery<Notifier> pathQuery = new PathQuery<>( |
| new SimpleEntityRef(em.getApplicationRef()), |
| query |
| ); |
| Iterator<Notifier> notifierIterator = pathQuery.iterator(em); |
| int count = 0; |
| while (notifierIterator.hasNext()) { |
| Notifier notifier = notifierIterator.next(); |
| String name = notifier.getName() != null ? notifier.getName() : ""; |
| UUID uuid = notifier.getUuid() != null ? notifier.getUuid() : UUID.randomUUID(); |
| ProviderAdapter providerAdapter = ProviderAdapterFactory.getProviderAdapter(notifier, em); |
| notifierHashMap.put(name.toLowerCase(), providerAdapter); |
| notifierHashMap.put(uuid, providerAdapter); |
| notifierHashMap.put(uuid.toString(), providerAdapter); |
| if (count++ >= 100) { |
| logger.error("ApplicationQueueManager: too many notifiers...breaking out ", notifierHashMap.size()); |
| break; |
| } |
| } |
| } |
| return notifierHashMap; |
| } |
| |
| /** |
| * send batches of notifications to provider |
| * |
| * @param messages |
| * @throws Exception |
| */ |
| @Override |
| public Observable sendBatchToProviders(final List<LegacyQueueMessage> messages, final String queuePath) { |
| if (logger.isTraceEnabled()) { |
| logger.trace("sending batch of {} notifications.", messages.size()); |
| } |
| |
| final Map<Object, ProviderAdapter> notifierMap = getAdapterMap(); |
| final ApplicationQueueManagerImpl proxy = this; |
| final ConcurrentHashMap<UUID, TaskManager> taskMap = new ConcurrentHashMap<UUID, TaskManager>(messages.size()); |
| final ConcurrentHashMap<UUID, Notification> notificationMap = new ConcurrentHashMap<UUID, Notification>(messages.size()); |
| |
| final Func1<LegacyQueueMessage, ApplicationQueueMessage> func = queueMessage -> { |
| boolean messageCommitted = false; |
| ApplicationQueueMessage message = null; |
| try { |
| message = (ApplicationQueueMessage) queueMessage.getBody(); |
| if (logger.isTraceEnabled()) { |
| logger.trace("start sending notification for device {} for Notification: {} on thread {}", message.getDeviceId(), message.getNotificationId(), Thread.currentThread().getId()); |
| } |
| |
| UUID deviceUUID = message.getDeviceId(); |
| |
| Notification notification = notificationMap.get(message.getNotificationId()); |
| if (notification == null) { |
| notification = em.get(message.getNotificationId(), Notification.class); |
| notificationMap.putIfAbsent(message.getNotificationId(), notification); |
| } |
| TaskManager taskManager = taskMap.get(message.getNotificationId()); |
| if (taskManager == null) { |
| taskManager = new TaskManager(em, notification); |
| taskMap.putIfAbsent(message.getNotificationId(), taskManager); |
| taskManager = taskMap.get(message.getNotificationId()); |
| } |
| |
| final Map<String, Object> payloads = notification.getPayloads(); |
| final Map<String, Object> translatedPayloads = translatePayloads(payloads, notifierMap); |
| if (logger.isTraceEnabled()) { |
| logger.trace("sending notification for device {} for Notification: {}", deviceUUID, notification.getUuid()); |
| } |
| |
| try { |
| String notifierName = message.getNotifierKey().toLowerCase(); |
| ProviderAdapter providerAdapter = notifierMap.get(notifierName.toLowerCase()); |
| Object payload = translatedPayloads.get(notifierName); |
| |
| TaskTracker tracker = null; |
| |
| if(notification.getSaveReceipts()){ |
| |
| final Receipt receipt = |
| new Receipt( notification.getUuid(), message.getNotifierId(), payload, deviceUUID ); |
| tracker = |
| new TaskTracker( providerAdapter.getNotifier(), taskManager, receipt, deviceUUID ); |
| |
| } |
| else { |
| |
| tracker = |
| new TaskTracker( providerAdapter.getNotifier(), taskManager, null, deviceUUID ); |
| } |
| if (!isOkToSend(notification)) { |
| tracker.failed(0, "Notification is duplicate/expired/cancelled."); |
| } else { |
| if (payload == null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("selected device {} for notification {} doesn't have a valid payload. skipping.", deviceUUID, notification.getUuid()); |
| } |
| tracker.failed(0, "failed to match payload to " + message.getNotifierId() + " notifier"); |
| } else { |
| long now = System.currentTimeMillis(); |
| try { |
| providerAdapter.sendNotification(message.getNotifierId(), payload, notification, tracker); |
| } catch (Exception e) { |
| tracker.failed(0, e.getMessage()); |
| } finally { |
| if (logger.isTraceEnabled()) { |
| logger.trace("sending to device {} for Notification: {} duration {} ms", deviceUUID, notification.getUuid(), (System.currentTimeMillis() - now)); |
| } |
| } |
| } |
| } |
| messageCommitted = true; |
| } finally { |
| sendMeter.mark(); |
| } |
| |
| } catch (Exception e) { |
| logger.error("Failure while sending", e); |
| try { |
| if (!messageCommitted && queuePath != null) { |
| qm.commitMessage(queueMessage); |
| } |
| } catch (Exception queueException) { |
| logger.error("Failed to commit message.", queueException); |
| } |
| } |
| return message; |
| }; |
| |
| //from each queue message, process them in parallel up to 10 at a time |
| Observable queueMessageObservable = Observable.from(messages).flatMap(queueMessage -> { |
| |
| |
| return Observable.just(queueMessage).map(func).buffer(messages.size()).map(queueMessages -> { |
| //for gcm this will actually send notification |
| for (ProviderAdapter providerAdapter : notifierMap.values()) { |
| try { |
| providerAdapter.doneSendingNotifications(); |
| } catch (Exception e) { |
| logger.error("providerAdapter.doneSendingNotifications: ", e); |
| } |
| } |
| //TODO: check if a notification is done and mark it |
| HashMap<UUID, ApplicationQueueMessage> notifications = new HashMap<>(); |
| for (ApplicationQueueMessage message : queueMessages) { |
| if (notifications.get(message.getNotificationId()) == null) { |
| try { |
| TaskManager taskManager = taskMap.get(message.getNotificationId()); |
| notifications.put(message.getNotificationId(), message); |
| taskManager.finishedBatch(); |
| } catch (Exception e) { |
| logger.error("Failed to finish batch", e); |
| } |
| } |
| } |
| return notifications; |
| }).doOnError(throwable -> logger.error("Failed while sending", throwable)); |
| }, 10); |
| |
| return queueMessageObservable; |
| } |
| |
| @Override |
| public void stop() { |
| for (ProviderAdapter adapter : getAdapterMap().values()) { |
| try { |
| adapter.stop(); |
| } catch (Exception e) { |
| logger.error("failed to stop adapter", e); |
| } |
| } |
| } |
| |
| |
| /** |
| * Validates that a notifier and adapter exists to send notifications to. For the example payload |
| * |
| * { "payloads" : {"winphone":"mymessage","apple":"mymessage"} } |
| * |
| * Notifiers with name "winphone" and "apple" must exist. |
| */ |
| private Map<String, Object> translatePayloads(Map<String, Object> payloads, |
| Map<Object, ProviderAdapter> notifierMap) throws Exception { |
| |
| final Map<String, Object> translatedPayloads = new HashMap<String, Object>(payloads.size()); |
| |
| for (Map.Entry<String, Object> entry : payloads.entrySet()) { |
| |
| String payloadKey = entry.getKey().toLowerCase(); |
| Object payloadValue = entry.getValue(); |
| |
| //look for adapter from payload map |
| ProviderAdapter providerAdapter = notifierMap.get(payloadKey); |
| if (providerAdapter != null) { |
| |
| //translate payload to usable information |
| Object translatedPayload = payloadValue != null ? providerAdapter.translatePayload(payloadValue) : null; |
| if (translatedPayload != null) { |
| translatedPayloads.put(payloadKey, translatedPayload); |
| } |
| |
| } |
| } |
| return translatedPayloads; |
| } |
| |
| public static String getQueueNames(Properties properties) { |
| |
| String name = properties.getProperty(ApplicationQueueManagerImpl.DEFAULT_QUEUE_PROPERTY, |
| ApplicationQueueManagerImpl.DEFAULT_QUEUE_NAME); |
| return name; |
| |
| } |
| |
| private static final class IteratorObservable<T> implements rx.Observable.OnSubscribe<T> { |
| private final Iterator<T> input; |
| |
| private IteratorObservable(final Iterator input) { |
| this.input = input; |
| } |
| |
| @Override |
| public void call(final Subscriber<? super T> subscriber) { |
| |
| /** |
| * You would replace this code with your file reading. Instead of emitting from an iterator, |
| * you would create a bean object that represents the entity, and then emit it |
| */ |
| |
| try { |
| while (!subscriber.isUnsubscribed() && input.hasNext()) { |
| |
| //send our input to the next |
| subscriber.onNext((T) input.next()); |
| |
| } |
| |
| //tell the subscriber we don't have any more data |
| subscriber.onCompleted(); |
| |
| } catch (Throwable t) { |
| logger.error("failed on subscriber", t); |
| subscriber.onError(t); |
| } |
| } |
| } |
| |
| @Override |
| public void asyncCheckForInactiveDevices() throws Exception { |
| Collection<ProviderAdapter> providerAdapters = getAdapterMap().values(); |
| for (final ProviderAdapter providerAdapter : providerAdapters) { |
| try { |
| if (providerAdapter != null) { |
| if (logger.isTraceEnabled()) { |
| logger.trace("checking notifier {} for inactive devices", providerAdapter.getNotifier()); |
| } |
| providerAdapter.removeInactiveDevices(); |
| |
| if (logger.isTraceEnabled()) { |
| logger.trace("finished checking notifier {} for inactive devices", providerAdapter.getNotifier()); |
| } |
| } |
| } catch (Exception e) { |
| // not essential so don't fail, but log |
| logger.error("checkForInactiveDevices", e); |
| |
| } |
| } |
| } |
| |
| |
| private boolean isOkToSend(Notification notification) { |
| |
| if (notification.getCanceled() == Boolean.TRUE) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Notification {} canceled. Not sending.", |
| notification.getUuid()); |
| } |
| return false; |
| } |
| if (notification.isExpired()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Notification {} expired. Not sending.", |
| notification.getUuid()); |
| } |
| return false; |
| } |
| return true; |
| } |
| |
| |
| private String getProviderId(EntityRef device, Notifier notifier) throws Exception { |
| try { |
| Object value = em.getProperty(device, notifier.getName() + NOTIFIER_ID_POSTFIX); |
| if (value == null) { |
| value = em.getProperty(device, notifier.getUuid() + NOTIFIER_ID_POSTFIX); |
| } |
| return value != null ? value.toString() : null; |
| } catch (Exception e) { |
| logger.error("Error getting notifier for device {}, proceeding with rest of batch", device, e); |
| return null; |
| } |
| } |
| |
| |
| } |