blob: ce2b82ca1caed3adec94d6d3877fcdea484ffad2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.usergrid.services.notifications;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.EntityRef;
import org.apache.usergrid.persistence.SimpleEntityRef;
import org.apache.usergrid.persistence.entities.Device;
import org.apache.usergrid.persistence.entities.Notification;
import org.apache.usergrid.persistence.entities.Notifier;
import org.apache.usergrid.persistence.entities.Receipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
public class TaskManager {
private static final Logger logger = LoggerFactory.getLogger(TaskManager.class);
private Notification notification;
private AtomicLong successes = new AtomicLong();
private AtomicLong failures = new AtomicLong();
private EntityManager em;
private boolean hasFinished;
public TaskManager(EntityManager em, Notification notification) {
this.em = em;
this.notification = notification;
hasFinished = false;
}
public long getSuccesses(){return successes.get();}
public long getFailures(){ return failures.get();}
public void completed(Notifier notifier, UUID deviceUUID) throws Exception {
completed(notifier,null,deviceUUID,null);
}
public void completed(Notifier notifier, Receipt receipt, UUID deviceUUID, String newProviderId) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("REMOVED {}", deviceUUID);
}
try {
if (logger.isTraceEnabled()) {
logger.trace("notification {} removing device {} from remaining", notification.getUuid(), deviceUUID);
}
EntityRef deviceRef = new SimpleEntityRef(Device.ENTITY_TYPE, deviceUUID);
if (receipt != null) {
if (logger.isTraceEnabled()) {
logger.trace("notification {} sent to device {}. saving receipt.", notification.getUuid(), deviceUUID);
}
receipt.setSent(System.currentTimeMillis());
this.saveReceipt(notification, deviceRef, receipt,false);
if (logger.isTraceEnabled()) {
logger.trace("notification {} receipt saved for device {}", notification.getUuid(), deviceUUID);
}
successes.incrementAndGet();
}
if (newProviderId != null) {
if (logger.isTraceEnabled()) {
logger.trace("notification {} replacing device {} notifierId", notification.getUuid(), deviceUUID);
}
replaceProviderId(deviceRef, notifier, newProviderId);
}
if (logger.isTraceEnabled()) {
logger.trace("notification {} completed device {}", notification.getUuid(), deviceUUID);
}
} finally {
if (logger.isTraceEnabled()) {
logger.trace("COUNT is: {}", successes.get());
}
// if (hasFinished) { //process has finished but notifications are still coming in
// finishedBatch();
//
// }
}
}
public void failed(Notifier notifier, Receipt receipt, UUID deviceUUID, Object code, String message) throws Exception {
try {
if (logger.isDebugEnabled()) {
logger.debug("notification {} for device {} got error {}", notification.getUuid(), deviceUUID, code);
}
failures.incrementAndGet();
if(receipt!=null) {
if ( receipt.getUuid() != null ) {
successes.decrementAndGet();
}
receipt.setErrorCode( code );
receipt.setErrorMessage( message );
this.saveReceipt( notification, new SimpleEntityRef( Device.ENTITY_TYPE, deviceUUID ), receipt, true );
if ( logger.isDebugEnabled() ) {
logger.debug( "notification {} receipt saved for device {}", notification.getUuid(), deviceUUID );
}
}
} finally {
completed(notifier, deviceUUID);
finishedBatch();
}
}
/*
* called from TaskManager - creates a persistent receipt and updates the
* passed one w/ the UUID
*/
private void saveReceipt(EntityRef notification, EntityRef device, Receipt receipt, boolean hasError) throws Exception {
boolean debug = false;
if(this.notification != null){
debug = this.notification.getDebug();
}
if ( debug || hasError) {
List<EntityRef> entities = Arrays.asList(notification, device);
if (receipt.getUuid() == null) {
Receipt savedReceipt = em.create(receipt);
em.addToCollections(entities, Notification.RECEIPTS_COLLECTION, savedReceipt);
} else {
em.addToCollections(entities, Notification.RECEIPTS_COLLECTION, receipt);
}
}
}
protected void replaceProviderId(EntityRef device, Notifier notifier,
String newProviderId) throws Exception {
Object value = em.getProperty(device, notifier.getName()
+ ApplicationQueueManager.NOTIFIER_ID_POSTFIX);
if (value != null) {
em.setProperty(device, notifier.getName() + ApplicationQueueManager.NOTIFIER_ID_POSTFIX, newProviderId);
} else {
value = em.getProperty(device, notifier.getUuid()
+ ApplicationQueueManager.NOTIFIER_ID_POSTFIX);
if (value != null) {
em.setProperty(device, notifier.getUuid() + ApplicationQueueManager.NOTIFIER_ID_POSTFIX, newProviderId);
}
}
}
public void finishedBatch() throws Exception {
finishedBatch(true);
}
public void finishedBatch(boolean refreshNotification) throws Exception {
long successes = this.successes.get(); //reset counters
long failures = this.failures.get(); //reset counters
for (int i = 0; i < successes; i++) {
this.successes.decrementAndGet();
}
for (int i = 0; i < failures; i++) {
this.failures.decrementAndGet();
}
this.hasFinished = true;
// force refresh notification by fetching it
if (refreshNotification) {
notification = em.get(this.notification.getUuid(), Notification.class);
}
notification.updateStatistics(successes, failures);
notification.setModified(System.currentTimeMillis());
notification.setFinished(notification.getModified());
em.update(notification);
}
}