blob: b1215d6bbeb23c20e1f1db2534a8829d503b486e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.soroushbot.component;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;
import okhttp3.sse.EventSources;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.soroushbot.models.SoroushAction;
import org.apache.camel.component.soroushbot.models.SoroushMessage;
import org.apache.camel.component.soroushbot.service.SoroushService;
import org.apache.camel.support.DefaultConsumer;
import static org.apache.camel.component.soroushbot.utils.StringUtils.ordinal;
/**
* this component handle logic for getting message from Soroush server and for each message
* it calls abstract function {@link SoroushBotAbstractConsumer#sendExchange(Exchange)}
* each subclass should handle how it will start the processing of the exchange
*/
public abstract class SoroushBotAbstractConsumer extends DefaultConsumer implements org.apache.camel.spi.ShutdownPrepared {
SoroushBotEndpoint endpoint;
/**
* {@link ObjectMapper} for parse message JSON
*/
ObjectMapper objectMapper = new ObjectMapper();
boolean shutdown;
long lastMessageReceived;
private ReconnectableEventSourceListener connection;
public SoroushBotAbstractConsumer(SoroushBotEndpoint endpoint, Processor processor) {
super(endpoint, processor);
this.endpoint = endpoint;
}
@Override
public void doStart() {
run();
}
protected final void handleExceptionThrownWhileCreatingOrProcessingExchange(Exchange exchange, SoroushMessage soroushMessage, Exception ex) {
//set originalMessage property to the created soroushMessage to let Error Handler access the message
exchange.setProperty("OriginalMessage", soroushMessage);
//use this instead of handleException() to manually set the exchange.
getExceptionHandler().handleException("message can not be processed due to :" + ex.getMessage(), exchange, ex);
}
/**
* handle how processing of the exchange should be started
*
* @param exchange
*/
protected abstract void sendExchange(Exchange exchange) throws Exception;
private void run() {
lastMessageReceived = System.currentTimeMillis();
Request request = new Request.Builder()
.url(SoroushService.get().generateUrl(endpoint.getAuthorizationToken(), SoroushAction.getMessage, null))
.build();
OkHttpClient client = new OkHttpClient.Builder()
.connectTimeout(endpoint.getConnectionTimeout(), TimeUnit.MILLISECONDS)
.writeTimeout(0L, TimeUnit.MILLISECONDS)
.readTimeout(0L, TimeUnit.MILLISECONDS)
.build();
connection = new ReconnectableEventSourceListener(client, request, endpoint.getMaxConnectionRetry()) {
@Override
protected boolean onBeforeConnect() {
int connectionRetry = getConnectionRetry();
try {
endpoint.waitBeforeRetry(connectionRetry);
} catch (InterruptedException e) {
return false;
}
if (!shutdown) {
if (connectionRetry == 0) {
if (log.isInfoEnabled()) {
log.info("connecting to getMessage from soroush");
}
} else {
if (log.isInfoEnabled()) {
log.info("connection is closed. retrying for the " + ordinal(connectionRetry) + " time(s)... ");
}
}
}
return !shutdown;
}
@Override
public void onOpen(EventSource eventSource, Response response) {
super.onOpen(eventSource, response);
log.info("connection established");
}
@Override
protected boolean handleClose(EventSource eventSource, boolean manuallyClosed) {
if (!manuallyClosed) {
log.warn("connection got closed");
} else {
log.debug("manually reconnecting to ensure we have live connection");
}
return true;
}
@Override
protected boolean handleFailure(EventSource eventSource, boolean manuallyClosed, Throwable t, Response response) {
if (!manuallyClosed) {
log.error("connection failed due to following error", t);
} else {
log.debug("manually reconnecting to ensure we have live connection");
}
return true;
}
@Override
public void onEvent(EventSource eventSource, String id, String type, String data) {
Exchange exchange = endpoint.createExchange();
try {
SoroushMessage soroushMessage = objectMapper.readValue(data, SoroushMessage.class);
try {
exchange.getIn().setBody(soroushMessage);
if (log.isDebugEnabled()) {
log.debug("event data is: " + data);
}
// if autoDownload is true, download the resource if provided in the message
if (endpoint.isAutoDownload()) {
endpoint.handleDownloadFiles(soroushMessage);
}
//let each subclass decide how to start processing of each exchange
sendExchange(exchange);
} catch (Exception ex) {
handleExceptionThrownWhileCreatingOrProcessingExchange(exchange, soroushMessage, ex);
}
} catch (IOException ex) {
log.error("can not parse data due to following error", ex);
}
}
@Override
public void onFinishProcess() {
log.info("max connection retry reached! we are closing the endpoint!");
}
};
connection.connect();
endpoint.getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "health check")
.scheduleAtFixedRate(() -> {
if (lastMessageReceived < System.currentTimeMillis() - endpoint.getReconnectIdleConnectionTimeout()) {
connection.close();
}
}, 2000, endpoint.getReconnectIdleConnectionTimeout(), TimeUnit.MILLISECONDS);
}
@Override
public void prepareShutdown(boolean suspendOnly, boolean forced) {
if (!suspendOnly) {
shutdown = true;
connection.close();
}
}
}
class ReconnectableEventSourceListener extends EventSourceListener {
private boolean manuallyClosed;
private OkHttpClient client;
private final int maxConnectionRetry;
private int connectionRetry;
private Request request;
private final EventSource.Factory factory;
private EventSource eventSource;
public ReconnectableEventSourceListener(OkHttpClient client, Request request, int maxConnectionRetry) {
this.client = client;
this.maxConnectionRetry = maxConnectionRetry;
this.request = request;
factory = EventSources.createFactory(client);
}
public void reconnect() {
if (!manuallyClosed) {
connectionRetry++;
} else {
manuallyClosed = false;
}
if (eventSource != null) {
eventSource.cancel();
}
connect();
}
public void connect() {
if (!onBeforeConnect()) {
return;
}
if (maxConnectionRetry >= connectionRetry || maxConnectionRetry < 0) {
eventSource = factory.newEventSource(request, this);
} else {
onFinishProcess();
}
}
public void close() {
manuallyClosed = true;
eventSource.cancel();
}
public void onFinishProcess() {
}
protected boolean onBeforeConnect() {
return true;
}
@Override
public void onOpen(EventSource eventSource, Response response) {
connectionRetry = 0;
}
@Override
public final void onClosed(EventSource eventSource) {
if (handleClose(eventSource, manuallyClosed)) {
reconnect();
}
}
protected boolean handleClose(EventSource eventSource, boolean manuallyClosed) {
return true;
}
@Override
public final void onFailure(EventSource eventSource, Throwable t, Response response) {
if (handleFailure(eventSource, manuallyClosed, t, response)) {
reconnect();
}
}
protected boolean handleFailure(EventSource eventSource, boolean manuallyClosed, Throwable t, Response response) {
return true;
}
public int getConnectionRetry() {
return connectionRetry;
}
}