blob: b9438aaf1a82fe83bffd28dec8b73c363a570b8a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.knox.gateway.rm.dispatch;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.knox.gateway.dispatch.DefaultDispatch;
import org.apache.knox.gateway.filter.AbstractGatewayFilter;
import org.apache.knox.gateway.ha.provider.HaProvider;
import org.apache.knox.gateway.ha.provider.impl.HaServiceConfigConstants;
import org.apache.knox.gateway.i18n.messages.MessagesFactory;
import org.apache.knox.gateway.rm.i18n.RMMessages;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicInteger;
class RMHaBaseDispatcher extends DefaultDispatch {
private static final String FAILOVER_COUNTER_ATTRIBUTE = "dispatch.ha.failover.counter";
private static final String LOCATION = "Location";
private static final RMMessages LOG = MessagesFactory.get(RMMessages.class);
private int maxFailoverAttempts = HaServiceConfigConstants.DEFAULT_MAX_FAILOVER_ATTEMPTS;
private int failoverSleep = HaServiceConfigConstants.DEFAULT_FAILOVER_SLEEP;
private String resourceRole;
private HttpResponse inboundResponse;
/**
*
* @return HttpReponse used for unit testing so we
* can inject inboundResponse before calling executeRequest method
*/
private HttpResponse getInboundResponse() {
HttpResponse response = this.inboundResponse;
this.setInboundResponse(null);
return response;
}
void setInboundResponse(HttpResponse inboundResponse) {
this.inboundResponse = inboundResponse;
}
void setHaProvider(HaProvider haProvider) {
this.haProvider = haProvider;
}
private HaProvider haProvider;
void setMaxFailoverAttempts(int maxFailoverAttempts) {
this.maxFailoverAttempts = maxFailoverAttempts;
}
void setFailoverSleep(int failoverSleep) {
this.failoverSleep = failoverSleep;
}
void setResourceRole(String resourceRole) {
this.resourceRole = resourceRole;
}
@Override
protected void executeRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws IOException {
HttpResponse inboundResponse = this.getInboundResponse();
try {
if( this.getInboundResponse() == null ) {
inboundResponse = executeOutboundRequest(outboundRequest);
}
writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse);
} catch (StandbyException e) {
LOG.errorReceivedFromStandbyNode(e);
failoverRequest(outboundRequest, inboundRequest, outboundResponse, inboundResponse, e);
} catch (IOException e) {
LOG.errorConnectingToServer(outboundRequest.getURI().toString(), e);
failoverRequest(outboundRequest, inboundRequest, outboundResponse, inboundResponse, e);
}
}
/**
* Checks for specific outbound response codes/content to trigger a retry or failover
*/
@Override
protected void writeOutboundResponse(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, HttpResponse inboundResponse) throws IOException {
int status = inboundResponse.getStatusLine().getStatusCode();
if ( status == 403 || status == 307) {
BufferedHttpEntity entity = new BufferedHttpEntity(inboundResponse.getEntity());
inboundResponse.setEntity(entity);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
inboundResponse.getEntity().writeTo(outputStream);
String body = new String(outputStream.toByteArray(), StandardCharsets.UTF_8);
if (body.contains("This is standby RM")) {
throw new StandbyException();
}
}
super.writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse);
}
private void failoverRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, HttpResponse inboundResponse, Exception exception) throws IOException {
LOG.failingOverRequest(outboundRequest.getURI().toString());
URI uri;
String outboundURIs;
AtomicInteger counter = (AtomicInteger) inboundRequest.getAttribute(FAILOVER_COUNTER_ATTRIBUTE);
if (counter == null) {
counter = new AtomicInteger(0);
}
inboundRequest.setAttribute(FAILOVER_COUNTER_ATTRIBUTE, counter);
outboundURIs = outboundRequest.getURI().toString();
if (counter.incrementAndGet() <= maxFailoverAttempts) {
//null out target url so that rewriters run again
inboundRequest.setAttribute(AbstractGatewayFilter.TARGET_REQUEST_URL_ATTRIBUTE_NAME, null);
uri = getUriFromInbound(inboundRequest, inboundResponse, outboundURIs);
((HttpRequestBase) outboundRequest).setURI(uri);
if (failoverSleep > 0) {
try {
Thread.sleep(failoverSleep);
} catch (InterruptedException e) {
LOG.failoverSleepFailed(this.resourceRole, e);
Thread.currentThread().interrupt();
}
}
executeRequest(outboundRequest, inboundRequest, outboundResponse);
} else {
LOG.maxFailoverAttemptsReached(maxFailoverAttempts, this.resourceRole);
if (inboundResponse != null) {
writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse);
} else {
throw new IOException(exception);
}
}
}
URI getUriFromInbound(HttpServletRequest inboundRequest, HttpResponse inboundResponse, String outboundURIs) {
URI uri;
if( outboundURIs != null ) {
markFailedURL(outboundURIs);
}
try {
if( inboundResponse != null ) {
// We get redirection URI, failover condition we don't
// need to consult list of host.
String host = inboundResponse.getFirstHeader(LOCATION).getValue();
LOG.failoverRedirect(host);
uri = URI.create(host);
} else { // inboundRequest was null previous active node is down
// get next URI in list to try.
uri = getDispatchUrl(inboundRequest);
}
} catch(Exception ex ) {
uri = getDispatchUrl(inboundRequest);
}
haProvider.setActiveURL(this.resourceRole, uri.toString());
return uri;
}
private void markFailedURL(String outboundURIs) {
haProvider.markFailedURL(this.resourceRole, outboundURIs);
}
}