blob: fc6d1249b59ffc00502d492ce7f7607b7ab62573 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.discovery.base.connectors.ping;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URL;
import java.util.Date;
import java.util.Iterator;
import java.util.UUID;
import java.util.zip.GZIPOutputStream;
import javax.json.JsonException;
import javax.servlet.http.HttpServletResponse;
import org.apache.http.Header;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.Credentials;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.config.SocketConfig;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.sling.discovery.ClusterView;
import org.apache.sling.discovery.InstanceDescription;
import org.apache.sling.discovery.base.commons.ClusterViewService;
import org.apache.sling.discovery.base.commons.UndefinedClusterViewException;
import org.apache.sling.discovery.base.connectors.BaseConfig;
import org.apache.sling.discovery.base.connectors.announcement.Announcement;
import org.apache.sling.discovery.base.connectors.announcement.AnnouncementFilter;
import org.apache.sling.discovery.base.connectors.announcement.AnnouncementRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A topology connector client is used for sending (pinging) a remote topology
* connector servlet and exchanging announcements with it
*/
public class TopologyConnectorClient implements
TopologyConnectorClientInformation {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
/** the endpoint url **/
private final URL connectorUrl;
/** the cluster view service **/
private final ClusterViewService clusterViewService;
/** the config service to user **/
private final BaseConfig config;
/** the id of this connection **/
private final UUID id;
/** the announcement registry **/
private final AnnouncementRegistry announcementRegistry;
/** the last inherited announcement **/
private Announcement lastInheritedAnnouncement;
/** the time when the last announcement was inherited - for webconsole use only **/
private long lastPingedAt;
/** the information about this server **/
private final String serverInfo;
/** the status code of the last post **/
private int lastStatusCode = -1;
/** SLING-3316: whether or not this connector was auto-stopped **/
private boolean autoStopped = false;
/** more details about connection failures **/
private String statusDetails = null;
/** SLING-2882: whether or not to suppress ping warnings **/
private boolean suppressPingWarnings_ = false;
private TopologyRequestValidator requestValidator;
/** value of Content-Encoding of the last request **/
private String lastRequestEncoding;
/** value of Content-Encoding of the last repsonse **/
private String lastResponseEncoding;
/** SLING-3382: unix-time at which point the backoff-period ends and pings can be sent again **/
private long backoffPeriodEnd = -1;
TopologyConnectorClient(final ClusterViewService clusterViewService,
final AnnouncementRegistry announcementRegistry, final BaseConfig config,
final URL connectorUrl, final String serverInfo) {
if (clusterViewService == null) {
throw new IllegalArgumentException(
"clusterViewService must not be null");
}
if (announcementRegistry == null) {
throw new IllegalArgumentException(
"announcementRegistry must not be null");
}
if (config == null) {
throw new IllegalArgumentException("config must not be null");
}
if (connectorUrl == null) {
throw new IllegalArgumentException("connectorUrl must not be null");
}
this.requestValidator = new TopologyRequestValidator(config);
this.clusterViewService = clusterViewService;
this.announcementRegistry = announcementRegistry;
this.config = config;
this.connectorUrl = connectorUrl;
this.serverInfo = serverInfo;
this.id = UUID.randomUUID();
}
/** ping the server and pass the announcements between the two **/
void ping(final boolean force) {
if (autoStopped) {
// then we suppress any further pings!
logger.debug("ping: autoStopped=true, hence suppressing any further pings.");
return;
}
if (force) {
backoffPeriodEnd = -1;
} else if (backoffPeriodEnd>0) {
if (System.currentTimeMillis()<backoffPeriodEnd) {
logger.debug("ping: not issueing a heartbeat due to backoff instruction from peer.");
return;
} else {
logger.debug("ping: backoff period ended, issuing another ping now.");
}
}
final String uri = connectorUrl.toString()+"."+clusterViewService.getSlingId()+".json";
if (logger.isDebugEnabled()) {
logger.debug("ping: connectorUrl=" + connectorUrl + ", complete uri=" + uri);
}
final HttpClientContext clientContext = HttpClientContext.create();
final CloseableHttpClient httpClient = createHttpClient();
final HttpPut putRequest = new HttpPut(uri);
// setting the connection timeout (idle connection, configured in seconds)
putRequest.setConfig(RequestConfig.
custom().
setConnectTimeout(1000*config.getSocketConnectTimeout()).
build());
Announcement resultingAnnouncement = null;
try {
String userInfo = connectorUrl.getUserInfo();
if (userInfo != null) {
Credentials c = new UsernamePasswordCredentials(userInfo);
clientContext.getCredentialsProvider().setCredentials(
new AuthScope(putRequest.getURI().getHost(), putRequest
.getURI().getPort()), c);
}
Announcement topologyAnnouncement = new Announcement(
clusterViewService.getSlingId());
topologyAnnouncement.setServerInfo(serverInfo);
final ClusterView clusterView;
try {
clusterView = clusterViewService
.getLocalClusterView();
} catch (UndefinedClusterViewException e) {
// SLING-5030 : then we cannot ping
logger.warn("ping: no clusterView available at the moment, cannot ping others now: "+e);
return;
}
topologyAnnouncement.setLocalCluster(clusterView);
if (force) {
logger.debug("ping: sending a resetBackoff");
topologyAnnouncement.setResetBackoff(true);
}
announcementRegistry.addAllExcept(topologyAnnouncement, clusterView, new AnnouncementFilter() {
public boolean accept(final String receivingSlingId, final Announcement announcement) {
// filter out announcements that are of old cluster instances
// which I dont really have in my cluster view at the moment
final Iterator<InstanceDescription> it =
clusterView.getInstances().iterator();
while(it.hasNext()) {
final InstanceDescription instance = it.next();
if (instance.getSlingId().equals(receivingSlingId)) {
// then I have the receiving instance in my cluster view
// all fine then
return true;
}
}
// looks like I dont have the receiving instance in my cluster view
// then I should also not propagate that announcement anywhere
return false;
}
});
final String p = requestValidator.encodeMessage(topologyAnnouncement.asJSON());
if (logger.isDebugEnabled()) {
logger.debug("ping: topologyAnnouncement json is: " + p);
}
requestValidator.trustMessage(putRequest, p);
if (config.isGzipConnectorRequestsEnabled()) {
// tell the server that the content is gzipped:
putRequest.addHeader("Content-Encoding", "gzip");
// and gzip the body:
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final GZIPOutputStream gzipOut = new GZIPOutputStream(baos);
gzipOut.write(p.getBytes("UTF-8"));
gzipOut.close();
final byte[] gzippedEncodedJson = baos.toByteArray();
putRequest.setEntity(new ByteArrayEntity(gzippedEncodedJson, ContentType.APPLICATION_JSON));
lastRequestEncoding = "gzip";
} else {
// otherwise plaintext:
final StringEntity plaintext = new StringEntity(p, "UTF-8");
plaintext.setContentType(ContentType.APPLICATION_JSON.getMimeType());
putRequest.setEntity(plaintext);
lastRequestEncoding = "plaintext";
}
// independent of request-gzipping, we do accept the response to be gzipped,
// so indicate this to the server:
putRequest.addHeader("Accept-Encoding", "gzip");
final CloseableHttpResponse response = httpClient.execute(putRequest, clientContext);
if (logger.isDebugEnabled()) {
logger.debug("ping: done. code=" + response.getStatusLine().getStatusCode() + " - "
+ response.getStatusLine().getReasonPhrase());
}
lastStatusCode = response.getStatusLine().getStatusCode();
lastResponseEncoding = null;
if (response.getStatusLine().getStatusCode()==HttpServletResponse.SC_OK) {
final Header contentEncoding = response.getFirstHeader("Content-Encoding");
if (contentEncoding!=null && contentEncoding.getValue()!=null &&
contentEncoding.getValue().contains("gzip")) {
lastResponseEncoding = "gzip";
} else {
lastResponseEncoding = "plaintext";
}
final String responseBody = requestValidator.decodeMessage(putRequest.getURI().getPath(), response); // limiting to 16MB, should be way enough
if (logger.isDebugEnabled()) {
logger.debug("ping: response body=" + responseBody);
}
if (responseBody!=null && responseBody.length()>0) {
Announcement inheritedAnnouncement = Announcement
.fromJSON(responseBody);
final long backoffInterval = inheritedAnnouncement.getBackoffInterval();
if (backoffInterval>0) {
// then reset the backoffPeriodEnd:
/* minus 1 sec to avoid slipping the interval by a few millis */
this.backoffPeriodEnd = System.currentTimeMillis() + (1000 * backoffInterval) - 1000;
logger.debug("ping: servlet instructed to backoff: backoffInterval="+backoffInterval+", resulting in period end of "+new Date(backoffPeriodEnd));
} else {
logger.debug("ping: servlet did not instruct any backoff-ing at this stage");
this.backoffPeriodEnd = -1;
}
if (inheritedAnnouncement.isLoop()) {
if (logger.isDebugEnabled()) {
logger.debug("ping: connector response indicated a loop detected. not registering this announcement from "+
inheritedAnnouncement.getOwnerId());
}
if (inheritedAnnouncement.getOwnerId().equals(clusterViewService.getSlingId())) {
// SLING-3316 : local-loop detected. Check config to see if we should stop this connector
if (config.isAutoStopLocalLoopEnabled()) {
inheritedAnnouncement = null; // results in connected -> false and representsloop -> true
autoStopped = true; // results in isAutoStopped -> true
}
}
} else {
inheritedAnnouncement.setInherited(true);
if (announcementRegistry
.registerAnnouncement(inheritedAnnouncement)==-1) {
if (logger.isDebugEnabled()) {
logger.debug("ping: connector response is from an instance which I already see in my topology"
+ inheritedAnnouncement);
}
statusDetails = "receiving side is seeing me via another path (connector or cluster) already (loop)";
return;
}
}
resultingAnnouncement = inheritedAnnouncement;
statusDetails = null;
} else {
statusDetails = "no response body received";
}
} else {
statusDetails = "got HTTP Status-Code: "+lastStatusCode;
}
// SLING-2882 : reset suppressPingWarnings_ flag in success case
suppressPingWarnings_ = false;
} catch (IOException e) {
// SLING-2882 : set/check the suppressPingWarnings_ flag
if (suppressPingWarnings_) {
if (logger.isDebugEnabled()) {
logger.debug("ping: got IOException: " + e + ", uri=" + uri);
}
} else {
suppressPingWarnings_ = true;
logger.warn("ping: got IOException [suppressing further warns]: " + e + ", uri=" + uri);
}
statusDetails = e.toString();
} catch (JsonException e) {
logger.warn("ping: got JSONException: " + e);
statusDetails = e.toString();
} catch (RuntimeException re) {
logger.warn("ping: got RuntimeException: " + re, re);
statusDetails = re.toString();
} finally {
putRequest.releaseConnection();
lastInheritedAnnouncement = resultingAnnouncement;
lastPingedAt = System.currentTimeMillis();
try {
httpClient.close();
} catch (IOException e) {
logger.error("disconnect: could not close httpClient: "+e, e);
}
}
}
private CloseableHttpClient createHttpClient() {
final HttpClientBuilder builder = HttpClientBuilder.create();
// setting the SoTimeout (which is configured in seconds)
builder.setDefaultSocketConfig(SocketConfig.
custom().
setSoTimeout(1000*config.getSoTimeout()).
build());
builder.setRetryHandler(new DefaultHttpRequestRetryHandler(0, false));
return builder.build();
}
public int getStatusCode() {
return lastStatusCode;
}
public URL getConnectorUrl() {
return connectorUrl;
}
public boolean representsLoop() {
if (autoStopped) {
return true;
}
if (lastInheritedAnnouncement == null) {
return false;
} else {
return lastInheritedAnnouncement.isLoop();
}
}
public boolean isConnected() {
if (autoStopped) {
return false;
}
if (lastInheritedAnnouncement == null) {
return false;
} else {
return announcementRegistry.hasActiveAnnouncement(lastInheritedAnnouncement.getOwnerId());
}
}
public String getStatusDetails() {
if (autoStopped) {
return "auto-stopped";
}
if (lastInheritedAnnouncement == null) {
return statusDetails;
} else {
if (announcementRegistry.hasActiveAnnouncement(lastInheritedAnnouncement.getOwnerId())) {
// still active - so no status details
return null;
} else {
return "received announcement has expired (it was last renewed "+new Date(lastPingedAt)+") - consider increasing heartbeat timeout";
}
}
}
public long getLastPingSent() {
return lastPingedAt;
}
public int getNextPingDue() {
final long absDue;
if (backoffPeriodEnd>0) {
absDue = backoffPeriodEnd;
} else {
absDue = lastPingedAt + 1000*config.getConnectorPingInterval();
}
final int relDue = (int) ((absDue - System.currentTimeMillis()) / 1000);
if (relDue<0) {
return -1;
} else {
return relDue;
}
}
public boolean isAutoStopped() {
return autoStopped;
}
public String getLastRequestEncoding() {
return lastRequestEncoding==null ? "" : lastRequestEncoding;
}
public String getLastResponseEncoding() {
return lastResponseEncoding==null ? "" : lastResponseEncoding;
}
public String getRemoteSlingId() {
if (lastInheritedAnnouncement == null) {
return null;
} else {
return lastInheritedAnnouncement.getOwnerId();
}
}
public String getId() {
return id.toString();
}
/** Disconnect this connector **/
public void disconnect() {
final String uri = connectorUrl.toString()+"."+clusterViewService.getSlingId()+".json";
if (logger.isDebugEnabled()) {
logger.debug("disconnect: connectorUrl=" + connectorUrl + ", complete uri="+uri);
}
if (lastInheritedAnnouncement != null) {
announcementRegistry
.unregisterAnnouncement(lastInheritedAnnouncement
.getOwnerId());
}
final HttpClientContext clientContext = HttpClientContext.create();
final CloseableHttpClient httpClient = createHttpClient();
final HttpDelete deleteRequest = new HttpDelete(uri);
// setting the connection timeout (idle connection, configured in seconds)
deleteRequest.setConfig(RequestConfig.
custom().
setConnectTimeout(1000*config.getSocketConnectTimeout()).
build());
try {
String userInfo = connectorUrl.getUserInfo();
if (userInfo != null) {
Credentials c = new UsernamePasswordCredentials(userInfo);
clientContext.getCredentialsProvider().setCredentials(
new AuthScope(deleteRequest.getURI().getHost(), deleteRequest
.getURI().getPort()), c);
}
requestValidator.trustMessage(deleteRequest, null);
final CloseableHttpResponse response = httpClient.execute(deleteRequest, clientContext);
if (logger.isDebugEnabled()) {
logger.debug("disconnect: done. code=" + response.getStatusLine().getStatusCode()
+ " - " + response.getStatusLine().getReasonPhrase());
}
// ignoring the actual statuscode though as there's little we can
// do about it after this point
} catch (IOException e) {
logger.warn("disconnect: got IOException: " + e);
} catch (RuntimeException re) {
logger.error("disconnect: got RuntimeException: " + re, re);
} finally {
deleteRequest.releaseConnection();
try {
httpClient.close();
} catch (IOException e) {
logger.error("disconnect: could not close httpClient: "+e, e);
}
}
}
}