blob: 2e92f476bb33509f54b760e4a30ce59c73eb8385 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component;
import java.io.File;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.util.ByteSequence;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.ExchangePattern;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.DefaultProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JournalEndpoint extends DefaultEndpoint {
private static final transient Logger LOG = LoggerFactory.getLogger(JournalEndpoint.class);
private final File directory;
private final AtomicReference<DefaultConsumer> consumer = new AtomicReference<DefaultConsumer>();
private final Object activationMutex = new Object();
private int referenceCount;
private AsyncDataManager dataManager;
private Thread thread;
private Location lastReadLocation;
private long idleDelay = 1000;
private boolean syncProduce = true;
private boolean syncConsume;
public JournalEndpoint(String uri, JournalComponent journalComponent, File directory) {
super(uri, journalComponent.getCamelContext());
this.directory = directory;
}
public JournalEndpoint(String endpointUri, File directory) {
super(endpointUri);
this.directory = directory;
}
public boolean isSingleton() {
return true;
}
public File getDirectory() {
return directory;
}
public Consumer createConsumer(Processor processor) throws Exception {
return new DefaultConsumer(this, processor) {
@Override
public void start() throws Exception {
super.start();
activateConsumer(this);
}
@Override
public void stop() throws Exception {
deactivateConsumer(this);
super.stop();
}
};
}
protected void decrementReference() throws IOException {
synchronized (activationMutex) {
referenceCount--;
if (referenceCount == 0) {
LOG.debug("Closing data manager: " + directory);
LOG.debug("Last mark at: " + lastReadLocation);
dataManager.close();
dataManager = null;
}
}
}
protected void incrementReference() throws IOException {
synchronized (activationMutex) {
referenceCount++;
if (referenceCount == 1) {
LOG.debug("Opening data manager: " + directory);
dataManager = new AsyncDataManager();
dataManager.setDirectory(directory);
dataManager.start();
lastReadLocation = dataManager.getMark();
LOG.debug("Last mark at: " + lastReadLocation);
}
}
}
protected void deactivateConsumer(DefaultConsumer consumer) throws IOException {
synchronized (activationMutex) {
if (this.consumer.get() != consumer) {
throw new RuntimeCamelException("Consumer was not active.");
}
this.consumer.set(null);
try {
thread.join();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
decrementReference();
}
}
protected void activateConsumer(DefaultConsumer consumer) throws IOException {
synchronized (activationMutex) {
if (this.consumer.get() != null) {
throw new RuntimeCamelException("Consumer already active: journal endpoints only support 1 active consumer");
}
incrementReference();
this.consumer.set(consumer);
thread = new Thread() {
@Override
public void run() {
dispatchToConsumer();
}
};
thread.setName("Dipatch thread: " + getEndpointUri());
thread.setDaemon(true);
thread.start();
}
}
protected void dispatchToConsumer() {
try {
DefaultConsumer consumer;
while ((consumer = this.consumer.get()) != null) {
// See if there is a new record to process
Location location = dataManager.getNextLocation(lastReadLocation);
if (location != null) {
// Send it on.
ByteSequence read = dataManager.read(location);
Exchange exchange = createExchange();
exchange.getIn().setBody(read);
exchange.getIn().setHeader("journal", getEndpointUri());
exchange.getIn().setHeader("location", location);
consumer.getProcessor().process(exchange);
// Setting the mark makes the data manager forget about
// everything
// before that record.
if (LOG.isDebugEnabled()) {
LOG.debug("Consumed record at: " + location);
}
dataManager.setMark(location, syncConsume);
lastReadLocation = location;
} else {
// Avoid a tight CPU loop if there is no new record to read.
LOG.debug("Sleeping due to no records being available.");
Thread.sleep(idleDelay);
}
}
} catch (Throwable e) {
e.printStackTrace();
}
}
public Producer createProducer() throws Exception {
return new DefaultProducer(this) {
public void process(Exchange exchange) throws Exception {
incrementReference();
try {
ByteSequence body = exchange.getIn().getBody(ByteSequence.class);
if (body == null) {
byte[] bytes = exchange.getIn().getBody(byte[].class);
if (bytes != null) {
body = new ByteSequence(bytes);
}
}
if (body == null) {
throw new CamelExchangeException("In body message could not be converted to a ByteSequence or a byte array.", exchange);
}
dataManager.write(body, syncProduce);
} finally {
decrementReference();
}
}
};
}
public boolean isSyncConsume() {
return syncConsume;
}
public void setSyncConsume(boolean syncConsume) {
this.syncConsume = syncConsume;
}
public boolean isSyncProduce() {
return syncProduce;
}
public void setSyncProduce(boolean syncProduce) {
this.syncProduce = syncProduce;
}
boolean isOpen() {
synchronized (activationMutex) {
return referenceCount > 0;
}
}
}