blob: 7501eb1284bd591350a771c921c5ce3f40bd2920 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.processor.aggregate.hazelcast;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import com.hazelcast.config.Config;
import com.hazelcast.config.XmlConfigBuilder;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.core.TransactionalMap;
import com.hazelcast.transaction.TransactionContext;
import com.hazelcast.transaction.TransactionOptions;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.spi.OptimisticLockingAggregationRepository;
import org.apache.camel.spi.RecoverableAggregationRepository;
import org.apache.camel.support.DefaultExchange;
import org.apache.camel.support.DefaultExchangeHolder;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StringHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A Hazelcast-based AggregationRepository implementing
* {@link RecoverableAggregationRepository} and {@link OptimisticLockingAggregationRepository}.
* Defaults to thread-safe (non-optimistic) locking and recoverable strategy.
* Hazelcast settings are given to an end-user and can be controlled with repositoryName and persistentRespositoryName,
* both are {@link com.hazelcast.core.IMap} <String, Exchange>. However HazelcastAggregationRepository
* can run it's own Hazelcast instance, but obviously no benefits of Hazelcast clustering are gained this way.
* If the {@link HazelcastAggregationRepository} uses it's own local {@link HazelcastInstance} it will DESTROY this
* instance on {@link #doStop()}. You should control {@link HazelcastInstance} lifecycle yourself whenever you instantiate
* {@link HazelcastAggregationRepository} passing a reference to the instance.
*
*/
public class HazelcastAggregationRepository extends ServiceSupport
implements RecoverableAggregationRepository,
OptimisticLockingAggregationRepository {
private static final Logger LOG = LoggerFactory.getLogger(HazelcastAggregationRepository.class.getName());
private static final String COMPLETED_SUFFIX = "-completed";
private boolean optimistic;
private boolean useLocalHzInstance;
private boolean useRecovery = true;
private IMap<String, DefaultExchangeHolder> cache;
private IMap<String, DefaultExchangeHolder> persistedCache;
private HazelcastInstance hzInstance;
private String mapName;
private String persistenceMapName;
private String deadLetterChannel;
private long recoveryInterval = 5000;
private int maximumRedeliveries = 3;
private boolean allowSerializedHeaders;
/**
* Creates new {@link HazelcastAggregationRepository} that defaults to non-optimistic locking
* with recoverable behavior and a local Hazelcast instance. Recoverable repository name defaults to
* {@code repositoryName} + "-compeleted".
* @param repositoryName {@link IMap} repository name;
*/
public HazelcastAggregationRepository(final String repositoryName) {
mapName = repositoryName;
persistenceMapName = String.format("%s%s", mapName, COMPLETED_SUFFIX);
optimistic = false;
useLocalHzInstance = true;
}
/**
* Creates new {@link HazelcastAggregationRepository} that defaults to non-optimistic locking
* with recoverable behavior and a local Hazelcast instance.
* @param repositoryName {@link IMap} repository name;
* @param persistentRepositoryName {@link IMap} recoverable repository name;
*/
public HazelcastAggregationRepository(final String repositoryName, final String persistentRepositoryName) {
mapName = repositoryName;
persistenceMapName = persistentRepositoryName;
optimistic = false;
useLocalHzInstance = true;
}
/**
* Creates new {@link HazelcastAggregationRepository} with recoverable behavior and a local Hazelcast instance.
* Recoverable repository name defaults to {@code repositoryName} + "-compeleted".
* @param repositoryName {@link IMap} repository name;
* @param optimistic whether to use optimistic locking manner.
*/
public HazelcastAggregationRepository(final String repositoryName, boolean optimistic) {
this(repositoryName);
this.optimistic = optimistic;
useLocalHzInstance = true;
}
/**
* Creates new {@link HazelcastAggregationRepository} with recoverable behavior and a local Hazelcast instance.
* @param repositoryName {@link IMap} repository name;
* @param persistentRepositoryName {@link IMap} recoverable repository name;
* @param optimistic whether to use optimistic locking manner.
*/
public HazelcastAggregationRepository(final String repositoryName, final String persistentRepositoryName, boolean optimistic) {
this(repositoryName, persistentRepositoryName);
this.optimistic = optimistic;
useLocalHzInstance = true;
}
/**
* Creates new {@link HazelcastAggregationRepository} that defaults to non-optimistic locking
* with recoverable behavior. Recoverable repository name defaults to
* {@code repositoryName} + "-compeleted".
* @param repositoryName {@link IMap} repository name;
* @param hzInstanse externally configured {@link HazelcastInstance}.
*/
public HazelcastAggregationRepository(final String repositoryName, HazelcastInstance hzInstanse) {
this (repositoryName, false);
this.hzInstance = hzInstanse;
useLocalHzInstance = false;
}
/**
* Creates new {@link HazelcastAggregationRepository} that defaults to non-optimistic locking
* with recoverable behavior.
* @param repositoryName {@link IMap} repository name;
* @param persistentRepositoryName {@link IMap} recoverable repository name;
* @param hzInstanse externally configured {@link HazelcastInstance}.
*/
public HazelcastAggregationRepository(final String repositoryName, final String persistentRepositoryName, HazelcastInstance hzInstanse) {
this (repositoryName, persistentRepositoryName, false);
this.hzInstance = hzInstanse;
useLocalHzInstance = false;
}
/**
* Creates new {@link HazelcastAggregationRepository} with recoverable behavior.
* Recoverable repository name defaults to {@code repositoryName} + "-compeleted".
* @param repositoryName {@link IMap} repository name;
* @param optimistic whether to use optimistic locking manner;
* @param hzInstance externally configured {@link HazelcastInstance}.
*/
public HazelcastAggregationRepository(final String repositoryName, boolean optimistic, HazelcastInstance hzInstance) {
this(repositoryName, optimistic);
this.hzInstance = hzInstance;
useLocalHzInstance = false;
}
/**
* Creates new {@link HazelcastAggregationRepository} with recoverable behavior.
* @param repositoryName {@link IMap} repository name;
* @param optimistic whether to use optimistic locking manner;
* @param persistentRepositoryName {@link IMap} recoverable repository name;
* @param hzInstance externally configured {@link HazelcastInstance}.
*/
public HazelcastAggregationRepository(final String repositoryName, final String persistentRepositoryName, boolean optimistic, HazelcastInstance hzInstance) {
this(repositoryName, persistentRepositoryName, optimistic);
this.hzInstance = hzInstance;
useLocalHzInstance = false;
}
@Override
public Exchange add(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) throws OptimisticLockingException {
if (!optimistic) {
throw new UnsupportedOperationException();
}
LOG.trace("Adding an Exchange with ID {} for key {} in an optimistic manner.", newExchange.getExchangeId(), key);
if (oldExchange == null) {
DefaultExchangeHolder holder = DefaultExchangeHolder.marshal(newExchange, true, allowSerializedHeaders);
final DefaultExchangeHolder misbehaviorHolder = cache.putIfAbsent(key, holder);
if (misbehaviorHolder != null) {
Exchange misbehaviorEx = unmarshallExchange(camelContext, misbehaviorHolder);
LOG.error("Optimistic locking failed for exchange with key {}: IMap#putIfAbsend returned Exchange with ID {}, while it's expected no exchanges to be returned",
key, misbehaviorEx != null ? misbehaviorEx.getExchangeId() : "<null>");
throw new OptimisticLockingException();
}
} else {
DefaultExchangeHolder oldHolder = DefaultExchangeHolder.marshal(oldExchange, true, allowSerializedHeaders);
DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(newExchange, true, allowSerializedHeaders);
if (!cache.replace(key, oldHolder, newHolder)) {
LOG.error("Optimistic locking failed for exchange with key {}: IMap#replace returned no Exchanges, while it's expected to replace one",
key);
throw new OptimisticLockingException();
}
}
LOG.trace("Added an Exchange with ID {} for key {} in optimistic manner.", newExchange.getExchangeId(), key);
return oldExchange;
}
@Override
public Exchange add(CamelContext camelContext, String key, Exchange exchange) {
if (optimistic) {
throw new UnsupportedOperationException();
}
LOG.trace("Adding an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key);
Lock l = hzInstance.getLock(mapName);
try {
l.lock();
DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders);
DefaultExchangeHolder oldHolder = cache.put(key, newHolder);
return unmarshallExchange(camelContext, oldHolder);
} finally {
LOG.trace("Added an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key);
l.unlock();
}
}
@Override
public Set<String> scan(CamelContext camelContext) {
if (useRecovery) {
LOG.trace("Scanning for exchanges to recover in {} context", camelContext.getName());
Set<String> scanned = Collections.unmodifiableSet(persistedCache.keySet());
LOG.trace("Found {} keys for exchanges to recover in {} context", scanned.size(), camelContext.getName());
return scanned;
} else {
LOG.warn("What for to run recovery scans in {} context while repository {} is running in non-recoverable aggregation repository mode?!",
camelContext.getName(), mapName);
return Collections.emptySet();
}
}
@Override
public Exchange recover(CamelContext camelContext, String exchangeId) {
LOG.trace("Recovering an Exchange with ID {}.", exchangeId);
return useRecovery ? unmarshallExchange(camelContext, persistedCache.get(exchangeId)) : null;
}
@Override
public void setRecoveryInterval(long interval, TimeUnit timeUnit) {
this.recoveryInterval = timeUnit.toMillis(interval);
}
@Override
public void setRecoveryInterval(long interval) {
this.recoveryInterval = interval;
}
@Override
public long getRecoveryIntervalInMillis() {
return recoveryInterval;
}
@Override
public void setUseRecovery(boolean useRecovery) {
this.useRecovery = useRecovery;
}
@Override
public boolean isUseRecovery() {
return useRecovery;
}
@Override
public void setDeadLetterUri(String deadLetterUri) {
this.deadLetterChannel = deadLetterUri;
}
@Override
public String getDeadLetterUri() {
return deadLetterChannel;
}
@Override
public void setMaximumRedeliveries(int maximumRedeliveries) {
this.maximumRedeliveries = maximumRedeliveries;
}
@Override
public int getMaximumRedeliveries() {
return maximumRedeliveries;
}
@Override
public Exchange get(CamelContext camelContext, String key) {
return unmarshallExchange(camelContext, cache.get(key));
}
/**
* Checks if the key in question is in the repository.
*
* @param key Object - key in question
*/
public boolean containsKey(Object key) {
if (cache != null) {
return cache.containsKey(key);
} else {
return false;
}
}
public boolean isAllowSerializedHeaders() {
return allowSerializedHeaders;
}
public void setAllowSerializedHeaders(boolean allowSerializedHeaders) {
this.allowSerializedHeaders = allowSerializedHeaders;
}
/**
* This method performs transactional operation on removing the {@code exchange}
* from the operational storage and moving it into the persistent one if the {@link HazelcastAggregationRepository}
* runs in recoverable mode and {@code optimistic} is false. It will act at <u>your own</u> risk otherwise.
* @param camelContext the current CamelContext
* @param key the correlation key
* @param exchange the exchange to remove
*/
@Override
public void remove(CamelContext camelContext, String key, Exchange exchange) {
DefaultExchangeHolder holder = DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders);
if (optimistic) {
LOG.trace("Removing an exchange with ID {} for key {} in an optimistic manner.", exchange.getExchangeId(), key);
if (!cache.remove(key, holder)) {
LOG.error("Optimistic locking failed for exchange with key {}: IMap#remove removed no Exchanges, while it's expected to remove one.",
key);
throw new OptimisticLockingException();
}
LOG.trace("Removed an exchange with ID {} for key {} in an optimistic manner.", exchange.getExchangeId(), key);
if (useRecovery) {
LOG.trace("Putting an exchange with ID {} for key {} into a recoverable storage in an optimistic manner.",
exchange.getExchangeId(), key);
persistedCache.put(exchange.getExchangeId(), holder);
LOG.trace("Put an exchange with ID {} for key {} into a recoverable storage in an optimistic manner.",
exchange.getExchangeId(), key);
}
} else {
if (useRecovery) {
LOG.trace("Removing an exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key);
// The only considerable case for transaction usage is fault tolerance:
// the transaction will be rolled back automatically (default timeout is 2 minutes)
// if no commit occurs during the timeout. So we are still consistent whether local node crashes.
TransactionOptions tOpts = new TransactionOptions();
tOpts.setTransactionType(TransactionOptions.TransactionType.ONE_PHASE);
TransactionContext tCtx = hzInstance.newTransactionContext(tOpts);
try {
tCtx.beginTransaction();
TransactionalMap<String, DefaultExchangeHolder> tCache = tCtx.getMap(cache.getName());
TransactionalMap<String, DefaultExchangeHolder> tPersistentCache = tCtx.getMap(persistedCache.getName());
DefaultExchangeHolder removedHolder = tCache.remove(key);
LOG.trace("Putting an exchange with ID {} for key {} into a recoverable storage in a thread-safe manner.",
exchange.getExchangeId(), key);
tPersistentCache.put(exchange.getExchangeId(), removedHolder);
tCtx.commitTransaction();
LOG.trace("Removed an exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key);
LOG.trace("Put an exchange with ID {} for key {} into a recoverable storage in a thread-safe manner.",
exchange.getExchangeId(), key);
} catch (Throwable throwable) {
tCtx.rollbackTransaction();
final String msg = String.format("Transaction with ID %s was rolled back for remove operation with a key %s and an Exchange ID %s.",
tCtx.getTxnId(), key, exchange.getExchangeId());
LOG.warn(msg, throwable);
throw new RuntimeException(msg, throwable);
}
} else {
cache.remove(key);
}
}
}
@Override
public void confirm(CamelContext camelContext, String exchangeId) {
LOG.trace("Confirming an exchange with ID {}.", exchangeId);
if (useRecovery) {
persistedCache.remove(exchangeId);
}
}
@Override
public Set<String> getKeys() {
return Collections.unmodifiableSet(cache.keySet());
}
/**
* @return Persistent repository {@link IMap} name;
*/
public String getPersistentRepositoryName() {
return persistenceMapName;
}
@Override
protected void doStart() throws Exception {
if (maximumRedeliveries < 0) {
throw new IllegalArgumentException("Maximum redelivery retries must be zero or a positive integer.");
}
if (recoveryInterval < 0) {
throw new IllegalArgumentException("Recovery interval must be zero or a positive integer.");
}
StringHelper.notEmpty(mapName, "repositoryName");
if (useLocalHzInstance) {
Config cfg = new XmlConfigBuilder().build();
cfg.setProperty("hazelcast.version.check.enabled", "false");
hzInstance = Hazelcast.newHazelcastInstance(cfg);
} else {
ObjectHelper.notNull(hzInstance, "hzInstanse");
}
cache = hzInstance.getMap(mapName);
if (useRecovery) {
persistedCache = hzInstance.getMap(persistenceMapName);
}
}
@Override
protected void doStop() throws Exception {
//noop
if (useLocalHzInstance) {
hzInstance.getLifecycleService().shutdown();
}
}
protected Exchange unmarshallExchange(CamelContext camelContext, DefaultExchangeHolder holder) {
Exchange exchange = null;
if (holder != null) {
exchange = new DefaultExchange(camelContext);
DefaultExchangeHolder.unmarshal(exchange, holder);
}
return exchange;
}
}