blob: db805f19edab4c66fd56bbcd991c507112574feb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.bam.model;
import java.util.Date;
import javax.persistence.CascadeType;
import javax.persistence.Entity;
import javax.persistence.FetchType;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
import javax.persistence.ManyToOne;
import javax.persistence.Table;
import javax.persistence.Temporal;
import javax.persistence.TemporalType;
import javax.persistence.Transient;
import org.apache.camel.bam.processor.ProcessContext;
import org.apache.camel.bam.rules.ActivityRules;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The default state for a specific activity within a process
*
* @version
*/
@Entity
@Table(
name = "CAMEL_ACTIVITYSTATE"
)
public class ActivityState extends TemporalEntity {
private static final transient Logger LOG = LoggerFactory.getLogger(ActivityState.class);
private ProcessInstance processInstance;
private Integer receivedMessageCount = 0;
private ActivityDefinition activityDefinition;
private Date timeExpected;
private Date timeOverdue;
private Integer escalationLevel = 0;
@Override
public String toString() {
return "ActivityState[" + getId() + " on " + getProcessInstance() + " " + getActivityDefinition() + "]";
}
public synchronized void processExchange(ActivityRules activityRules, ProcessContext context) throws Exception {
int messageCount = 0;
Integer count = getReceivedMessageCount();
if (count != null) {
messageCount = count.intValue();
}
setReceivedMessageCount(++messageCount);
if (messageCount == 1) {
onFirstMessage(context);
}
int expectedMessages = activityRules.getExpectedMessages();
if (messageCount == expectedMessages) {
onExpectedMessage(context);
} else if (messageCount > expectedMessages) {
onExcessMessage(context);
}
}
/**
* Returns true if this state is for the given activity
*/
public boolean isActivity(ActivityRules activityRules) {
return ObjectHelper.equal(getActivityDefinition(), activityRules.getActivityDefinition());
}
// Properties
// -----------------------------------------------------------------------
@ManyToOne(fetch = FetchType.LAZY, cascade = {CascadeType.PERSIST, CascadeType.MERGE })
public ProcessInstance getProcessInstance() {
return processInstance;
}
public void setProcessInstance(ProcessInstance processInstance) {
this.processInstance = processInstance;
processInstance.getActivityStates().add(this);
}
@ManyToOne(fetch = FetchType.LAZY, cascade = {CascadeType.PERSIST, CascadeType.MERGE })
public ActivityDefinition getActivityDefinition() {
return activityDefinition;
}
public void setActivityDefinition(ActivityDefinition activityDefinition) {
this.activityDefinition = activityDefinition;
}
public Integer getEscalationLevel() {
return escalationLevel;
}
public void setEscalationLevel(Integer escalationLevel) {
this.escalationLevel = escalationLevel;
}
public Integer getReceivedMessageCount() {
return receivedMessageCount;
}
public void setReceivedMessageCount(Integer receivedMessageCount) {
this.receivedMessageCount = receivedMessageCount;
}
@Temporal(TemporalType.TIME)
public Date getTimeExpected() {
return timeExpected;
}
public void setTimeExpected(Date timeExpected) {
this.timeExpected = timeExpected;
}
@Temporal(TemporalType.TIME)
public Date getTimeOverdue() {
return timeOverdue;
}
public void setTimeOverdue(Date timeOverdue) {
this.timeOverdue = timeOverdue;
}
public void setTimeCompleted(Date timeCompleted) {
super.setTimeCompleted(timeCompleted);
if (timeCompleted != null) {
setEscalationLevel(-1);
}
}
@Transient
public String getCorrelationKey() {
ProcessInstance pi = getProcessInstance();
if (pi == null) {
return null;
}
return pi.getCorrelationKey();
}
// Implementation methods
// -----------------------------------------------------------------------
/**
* Called when the first message is reached
*/
protected void onFirstMessage(ProcessContext context) {
if (!isStarted()) {
setTimeStarted(currentTime());
context.onStarted(this);
LOG.debug("Activity first message: {}", this);
}
}
/**
* Called when the expected number of messages are is reached
*/
protected void onExpectedMessage(ProcessContext context) {
if (!isCompleted()) {
setTimeCompleted(currentTime());
// must also clear overdue otherwise we will get failures
setTimeOverdue(null);
context.onCompleted(this);
LOG.debug("Activity complete: {}", this);
}
}
/**
* Called when an excess message (after the expected number of messages) are
* received
*/
protected void onExcessMessage(ProcessContext context) {
// TODO
}
protected Date currentTime() {
return new Date();
}
}