blob: 79e420fa2e37044da1cff49b19fb9602c3e23a31 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicemix.core
import org.apache.camel.spi.{RouteContext, ProcessorFactory}
import org.apache.camel.model.ProcessorDefinition
import java.util.concurrent.atomic.AtomicInteger
import collection.mutable.ListBuffer
import org.apache.camel._
import processor.DelegateAsyncProcessor
import GlobalProcessorFactory._
/**
* Global ServiceMix ProcessorFactory implementation, which will take care of wrapping processors with the additional
* functionality provided by the {@link DelegateProcessorFactory} instances
*/
class GlobalProcessorFactory extends ProcessorFactory {
val factories = new ListBuffer[DelegateProcessorFactory]
val version = new AtomicInteger(1)
def addFactory(factory: DelegateProcessorFactory) = triggerUpdate(factories += factory);
def removeFactory(factory: DelegateProcessorFactory) = triggerUpdate(factories -= factory);
def createChildProcessor(context: RouteContext, definition: ProcessorDefinition[_], mandatory: Boolean) = {
nullOrElse(context.createProcessor(definition))(new GlobalDelegateProcessor(context, definition, _))
}
def createProcessor(context: RouteContext, definition: ProcessorDefinition[_]) = {
nullOrElse(definition.createProcessor(context))(new GlobalDelegateProcessor(context, definition, _))
}
def triggerUpdate(block: => Unit) = {
block
version.incrementAndGet()
}
class GlobalDelegateProcessor(routeContext: RouteContext, definition: ProcessorDefinition[_], target: Processor) extends DelegateAsyncProcessor(target) {
var currentProcessor = configure(getProcessor)
var version = GlobalProcessorFactory.this.version.get()
override def process(exchange: Exchange, callback: AsyncCallback) = {
// let's check if processor factories have changed and reconfigure things if necessary
if (version < GlobalProcessorFactory.this.version.get) {
currentProcessor = configure(getProcessor)
}
currentProcessor.process(exchange, callback)
}
override def toString = "ServiceMix Wrapper[" + processor + "]"
def configure(original: AsyncProcessor) : AsyncProcessor = {
factories.foldLeft(original) { (delegate: AsyncProcessor, factory: DelegateProcessorFactory) => {
factory.create(delegate)
}
}
}
}
}
object GlobalProcessorFactory {
private def nullOrElse[S,T](value: S)(function: S => T) = if (value == null) {
null.asInstanceOf[T]
} else {
function(value)
}
}