blob: c3ff9fefa6ff5d988cfd7b8de40d7e50591b18af [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.jcache;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import javax.cache.Cache;
import javax.cache.processor.EntryProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.util.ObjectHelper;
/**
* The JCache producer.
*/
public class JCacheProducer extends DefaultProducer {
private final JCacheConfiguration configuration;
public JCacheProducer(JCacheEndpoint endpoint, JCacheConfiguration configuration) {
super(endpoint);
this.configuration = configuration;
}
@Override
public void process(Exchange exchange) throws Exception {
String actionName = exchange.getIn().getHeader(JCacheConstants.ACTION, String.class);
if (actionName == null) {
actionName = configuration.getAction();
}
ObjectHelper.notEmpty(actionName, JCacheConstants.ACTION);
Action action = Action.fromName(actionName);
if (action != null) {
Cache<Object, Object> cache = getJCacheEndpoint().getManager().getCache();
action.validate(cache, exchange);
action.execute(cache, exchange);
} else {
throw new IllegalArgumentException(
String.format("The value '%s' is not allowed for parameter '%s'", actionName, JCacheConstants.ACTION));
}
}
@Override
protected void doStart() throws Exception {
getCache();
super.doStart();
}
private JCacheEndpoint getJCacheEndpoint() {
return (JCacheEndpoint)getEndpoint();
}
private Cache getCache() throws Exception {
return getJCacheEndpoint().getManager().getCache();
}
private enum Action {
PUT {
@Override
void validate(Cache<Object, Object> cache, Exchange exchange) throws Exception {
headerIsNotNull(exchange, JCacheConstants.KEY);
}
@Override
void execute(Cache<Object, Object> cache, Exchange exchange) {
cache.put(
exchange.getIn().getHeader(JCacheConstants.KEY),
exchange.getIn().getBody()
);
}
},
PUTALL {
@Override
void validate(Cache<Object, Object> cache, Exchange exchange) throws Exception {
}
@Override
void execute(Cache<Object, Object> cache, Exchange exchange) {
cache.putAll(
exchange.getIn().getBody(Map.class)
);
}
},
PUTIFABSENT {
@Override
void validate(Cache<Object, Object> cache, Exchange exchange) throws Exception {
headerIsNotNull(exchange, JCacheConstants.KEY);
}
@Override
void execute(Cache<Object, Object> cache, Exchange exchange) {
exchange.getIn().setHeader(
JCacheConstants.RESULT,
cache.putIfAbsent(
exchange.getIn().getHeader(JCacheConstants.KEY),
exchange.getIn().getBody())
);
}
},
GET {
@Override
void validate(Cache<Object, Object> cache, Exchange exchange) throws Exception {
headerIsNotNull(exchange, JCacheConstants.KEY);
}
@Override
void execute(Cache<Object, Object> cache, Exchange exchange) {
exchange.getIn().setBody(
cache.get(exchange.getIn().getHeader(JCacheConstants.KEY))
);
}
},
GETALL {
@Override
void validate(Cache<Object, Object> cache, Exchange exchange) throws Exception {
headerIsNotNull(exchange, JCacheConstants.KEYS);
}
@Override
void execute(Cache<Object, Object> cache, Exchange exchange) {
exchange.getIn().setBody(
cache.getAll(
exchange.getIn().getHeader(JCacheConstants.KEYS, Set.class))
);
}
},
GETANDREMOVE {
@Override
void validate(Cache<Object, Object> cache, Exchange exchange) throws Exception {
headerIsNotNull(exchange, JCacheConstants.KEY);
}
@Override
void execute(Cache<Object, Object> cache, Exchange exchange) {
exchange.getIn().setBody(
cache.getAndRemove(
exchange.getIn().getHeader(JCacheConstants.KEY))
);
}
},
GETANDREPLACE {
@Override
void validate(Cache<Object, Object> cache, Exchange exchange) throws Exception {
headerIsNotNull(exchange, JCacheConstants.KEY);
}
@Override
void execute(Cache<Object, Object> cache, Exchange exchange) {
exchange.getIn().setBody(
cache.getAndReplace(
exchange.getIn().getHeader(JCacheConstants.KEY),
exchange.getIn().getBody())
);
}
},
GETANDPUT {
@Override
void validate(Cache<Object, Object> cache, Exchange exchange) throws Exception {
headerIsNotNull(exchange, JCacheConstants.KEY);
}
@Override
void execute(Cache<Object, Object> cache, Exchange exchange) {
exchange.getIn().setBody(
cache.getAndPut(
exchange.getIn().getHeader(JCacheConstants.KEY),
exchange.getIn().getBody())
);
}
},
REPLACE {
@Override
void validate(Cache<Object, Object> cache, Exchange exchange) throws Exception {
headerIsNotNull(exchange, JCacheConstants.KEY);
}
@Override
void execute(Cache<Object, Object> cache, Exchange exchange) {
Object oldValue = exchange.getIn().getHeader(JCacheConstants.OLD_VALUE);
exchange.getIn().setHeader(
JCacheConstants.RESULT,
oldValue != null
? cache.replace(
exchange.getIn().getHeader(JCacheConstants.KEY),
oldValue,
exchange.getIn().getBody())
: cache.replace(
exchange.getIn().getHeader(JCacheConstants.KEY),
exchange.getIn().getBody())
);
}
},
REMOVE {
@Override
void validate(Cache<Object, Object> cache, Exchange exchange) throws Exception {
headerIsNotNull(exchange, JCacheConstants.KEY);
}
@Override
void execute(Cache<Object, Object> cache, Exchange exchange) {
Object oldValue = exchange.getIn().getHeader(JCacheConstants.OLD_VALUE);
exchange.getIn().setHeader(
JCacheConstants.RESULT,
oldValue != null
? cache.remove(
exchange.getIn().getHeader(JCacheConstants.KEY),
oldValue)
: cache.remove(
exchange.getIn().getHeader(JCacheConstants.KEY))
);
}
},
REMOVEALL {
@Override
void validate(Cache<Object, Object> cache, Exchange exchange) throws Exception {
}
@Override
void execute(Cache<Object, Object> cache, Exchange exchange) {
Set<Object> keys = exchange.getIn().getHeader(JCacheConstants.KEYS, Set.class);
if (keys != null) {
cache.removeAll(keys);
} else {
cache.removeAll();
}
}
},
INVOKE {
@Override
void validate(Cache<Object, Object> cache, Exchange exchange) throws Exception {
headerIsNotNull(exchange, JCacheConstants.ENTRY_PROCESSOR);
if (exchange.getIn().getHeader(JCacheConstants.KEYS) == null
&& exchange.getIn().getHeader(JCacheConstants.KEY) == null) {
throw new IllegalArgumentException(
String.format("Either %s or %s must be set for action %s",
JCacheConstants.KEYS,
JCacheConstants.KEY,
name())
);
}
}
@Override
void execute(Cache<Object, Object> cache, Exchange exchange) {
Message message = exchange.getIn();
Set<Object> keys = message.getHeader(JCacheConstants.KEYS, Set.class);
EntryProcessor<Object, Object, Object> entryProcessor = message.getHeader(JCacheConstants.ENTRY_PROCESSOR, EntryProcessor.class);
Collection<Object> arguments = message.getHeader(JCacheConstants.ARGUMENTS, Collection.class);
if (arguments == null) {
arguments = Collections.emptyList();
}
message.setBody(
keys != null
? cache.invokeAll(
keys,
entryProcessor,
arguments)
: cache.invoke(
exchange.getIn().getHeader(JCacheConstants.KEY),
entryProcessor,
arguments)
);
}
},
CLEAR {
@Override
void validate(Cache<Object, Object> cache, Exchange exchange) throws Exception {
}
@Override
void execute(Cache<Object, Object> cache, Exchange exchange) {
cache.clear();
}
};
static final Action[] VALUES = values();
static Action fromName(String name) {
if (ObjectHelper.isNotEmpty(name)) {
for (int i = 0; i < VALUES.length; i++) {
if (VALUES[i].name().equalsIgnoreCase(name)) {
return VALUES[i];
}
}
}
return null;
}
void validate(Cache<Object, Object> cache, Exchange exchange) throws Exception {
}
void execute(Cache<Object, Object> cache, Exchange exchange) {
}
protected void headerIsNotNull(Exchange exchange, String... keys) throws Exception {
for (int i = keys.length - 1; i >= 0; i--) {
if (exchange.getIn().getHeader(keys[i]) == null) {
throw new IllegalArgumentException(
String.format("Header %s must be set for action %s", keys[i], name())
);
}
}
}
}
}