blob: c9ecb30b495ac16b38a8a43ab10180576a22e8d7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.beanstalk;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import com.surftools.BeanstalkClient.BeanstalkException;
import com.surftools.BeanstalkClient.Client;
import com.surftools.BeanstalkClient.Job;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.beanstalk.processors.BuryCommand;
import org.apache.camel.component.beanstalk.processors.Command;
import org.apache.camel.component.beanstalk.processors.DeleteCommand;
import org.apache.camel.component.beanstalk.processors.ReleaseCommand;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.ScheduledPollConsumer;
/**
* PollingConsumer to read Beanstalk jobs.
* <p/>
* The consumer may delete the job immediately or based on successful {@link Exchange}
* completion. The behavior is configurable by <code>consumer.awaitJob</code>
* flag (by default <code>true</code>)
* <p/>
* This consumer will add a {@link Synchronization} object to every {@link Exchange}
* object it creates in order to react on successful exchange completion or failure.
* <p/>
* In the case of successful completion, Beanstalk's <code>delete</code> method is
* called upon the job. In the case of failure the default reaction is to call
* <code>bury</code>.
* <p/>
* The reaction on failures is configurable: possible variants are "bury", "release" or "delete"
*/
public class BeanstalkConsumer extends ScheduledPollConsumer {
private static final String[] STATS_KEY_STR = new String[]{"tube", "state"};
private static final String[] STATS_KEY_INT = new String[]{"age", "time-left", "timeouts", "releases", "buries", "kicks"};
private BeanstalkCommand onFailure;
private boolean useBlockIO;
private boolean awaitJob;
private Client client;
private ExecutorService executor;
private Synchronization sync;
private final Runnable initTask = new Runnable() {
@Override
public void run() {
client = getEndpoint().getConnection().newReadingClient(useBlockIO);
}
};
private final Callable<Exchange> pollTask = new Callable<Exchange>() {
final Integer noWait = 0;
@Override
public Exchange call() throws Exception {
if (client == null) {
throw new RuntimeCamelException("Beanstalk client not initialized");
}
try {
final Job job = client.reserve(noWait);
if (job == null) {
return null;
}
if (log.isDebugEnabled()) {
log.debug(String.format("Received job ID %d (data length %d)", job.getJobId(), job.getData().length));
}
final Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOnly);
exchange.getIn().setHeader(Headers.JOB_ID, job.getJobId());
exchange.getIn().setBody(job.getData(), byte[].class);
Map<String, String> jobStats = client.statsJob(job.getJobId());
if (jobStats != null && !jobStats.isEmpty()) {
for (String key : STATS_KEY_STR) {
if (jobStats.containsKey(key)) {
exchange.getIn().setHeader(Headers.PREFIX + key, jobStats.get(key).trim());
}
}
if (jobStats.containsKey("pri")) {
exchange.getIn().setHeader(Headers.PRIORITY, Long.parseLong(jobStats.get("pri").trim()));
}
for (String key : STATS_KEY_INT) {
if (jobStats.containsKey(key)) {
exchange.getIn().setHeader(Headers.PREFIX + key, Integer.parseInt(jobStats.get(key).trim()));
}
}
}
if (!awaitJob) {
client.delete(job.getJobId());
} else {
exchange.addOnCompletion(sync);
}
return exchange;
} catch (BeanstalkException e) {
getExceptionHandler().handleException("Beanstalk client error", e);
resetClient();
return null;
}
}
};
public BeanstalkConsumer(final BeanstalkEndpoint endpoint, final Processor processor) {
super(endpoint, processor);
}
@Override
protected int poll() throws Exception {
int messagesPolled = 0;
while (isPollAllowed()) {
final Exchange exchange = executor.submit(pollTask).get();
if (exchange == null) {
break;
}
++messagesPolled;
getProcessor().process(exchange);
}
return messagesPolled;
}
public BeanstalkCommand getOnFailure() {
return onFailure;
}
public void setOnFailure(BeanstalkCommand onFailure) {
this.onFailure = onFailure;
}
public boolean isUseBlockIO() {
return useBlockIO;
}
public void setUseBlockIO(boolean useBlockIO) {
this.useBlockIO = useBlockIO;
}
public boolean isAwaitJob() {
return awaitJob;
}
public void setAwaitJob(boolean awaitJob) {
this.awaitJob = awaitJob;
}
@Override
public BeanstalkEndpoint getEndpoint() {
return (BeanstalkEndpoint) super.getEndpoint();
}
@Override
protected void doStart() throws Exception {
executor = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "Beanstalk-Consumer");
executor.execute(initTask);
sync = new Sync();
super.doStart();
}
@Override
protected void doStop() throws Exception {
super.doStop();
if (executor != null) {
getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(executor);
}
}
protected void resetClient() {
if (client != null) {
client.close();
}
initTask.run();
}
class Sync implements Synchronization {
protected final Command successCommand;
protected final Command failureCommand;
Sync() {
successCommand = new DeleteCommand(getEndpoint());
if (BeanstalkComponent.COMMAND_BURY.equals(onFailure.name())) {
failureCommand = new BuryCommand(getEndpoint());
} else if (BeanstalkComponent.COMMAND_RELEASE.equals(onFailure.name())) {
failureCommand = new ReleaseCommand(getEndpoint());
} else if (BeanstalkComponent.COMMAND_DELETE.equals(onFailure.name())) {
failureCommand = new DeleteCommand(getEndpoint());
} else {
throw new IllegalArgumentException(String.format("Unknown failure command: %s", onFailure));
}
}
@Override
public void onComplete(final Exchange exchange) {
try {
executor.submit(new RunCommand(successCommand, exchange)).get();
} catch (Exception e) {
log.error(String.format("Could not run completion of exchange %s", exchange), e);
}
}
@Override
public void onFailure(final Exchange exchange) {
try {
executor.submit(new RunCommand(failureCommand, exchange)).get();
} catch (Exception e) {
log.error(String.format("%s could not run failure of exchange %s", failureCommand.getClass().getName(), exchange), e);
}
}
class RunCommand implements Runnable {
private final Command command;
private final Exchange exchange;
RunCommand(final Command command, final Exchange exchange) {
this.command = command;
this.exchange = exchange;
}
@Override
public void run() {
try {
try {
command.act(client, exchange);
} catch (BeanstalkException e) {
log.warn(String.format("Post-processing %s of exchange %s failed, retrying.", command.getClass().getName(), exchange), e);
resetClient();
command.act(client, exchange);
}
} catch (final Exception e) {
log.error(String.format("%s could not post-process exchange %s", command.getClass().getName(), exchange), e);
exchange.setException(e);
}
}
}
}
}