blob: 344a718e4e70bcd77c699e4bb347e506072f54cc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.discovery.base.connectors.announcement;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonArrayBuilder;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.json.JsonReaderFactory;
import javax.json.JsonValue;
import javax.json.JsonValue.ValueType;
import org.apache.sling.api.resource.ModifiableValueMap;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.discovery.ClusterView;
import org.apache.sling.discovery.InstanceDescription;
import org.apache.sling.discovery.commons.providers.DefaultClusterView;
import org.apache.sling.discovery.commons.providers.DefaultInstanceDescription;
import org.apache.sling.discovery.commons.providers.NonLocalInstanceDescription;
/**
* An announcement is the information exchanged by the topology connector and
* contains all clusters and instances which both the topology connector client
* and servlet see (in their part before joining the two worlds).
* <p>
* An announcement is exchanged in json format and carries a timeout.
*/
public class Announcement {
/** the protocol version this announcement currently represents. Mismatching protocol versions are
* used to detect incompatible topology connectors
*/
private final static int PROTOCOL_VERSION = 1;
/** the sling id of the owner of this announcement. the owner is where this announcement comes from **/
private final String ownerId;
/** announcement protocol version **/
private final int protocolVersion;
/** the local cluster view **/
private ClusterView localCluster;
/** the incoming instances **/
private List<Announcement> incomings = new LinkedList<Announcement>();
/** whether or not this annoucement was inherited (response of a connect) or incoming (the connect) **/
private boolean inherited = false;
/** some information about the server where this announcement came from **/
private String serverInfo;
/** whether or not this announcement represents a loop detected in the topology connectors **/
private boolean loop = false;
/** SLING-3382: Sets the backoffInterval which the connector servlets passes back to the client to use as the next heartbeatInterval **/
private long backoffInterval = -1;
/** SLING-3382: the resetBackoff flag is sent from client to server and indicates that the client wants to start from (backoff) scratch **/
private boolean resetBackoff = false;
private long originallyCreatedAt = -1;
private long receivedAt = System.currentTimeMillis();
private static final JsonReaderFactory jsonReaderFactory;
static {
Map<String, Object> config = new HashMap<String, Object>();
config.put("org.apache.johnzon.supports-comments", true);
jsonReaderFactory = Json.createReaderFactory(config);
}
public Announcement(final String ownerId) {
this(ownerId, PROTOCOL_VERSION);
}
public Announcement(final String ownerId, int protocolVersion) {
if (ownerId==null || ownerId.length()==0) {
throw new IllegalArgumentException("ownerId must not be null or empty");
}
this.ownerId = ownerId;
this.protocolVersion = protocolVersion;
}
@Override
public String toString() {
StringBuilder incomingList = new StringBuilder();
for (Iterator<Announcement> it = incomings.iterator(); it.hasNext();) {
Announcement anIncomingAnnouncement = it.next();
if (incomingList.length()!=0) {
incomingList.append(", ");
}
incomingList.append(anIncomingAnnouncement);
}
return "Announcement[ownerId="+getOwnerId()+
", protocolVersion="+protocolVersion+
", inherited="+isInherited()+
", loop="+loop+
", incomings="+incomingList+"]";
}
/** check whether this is announcement contains the valid protocol version **/
public boolean isCorrectVersion() {
return (protocolVersion==PROTOCOL_VERSION);
}
/** check whether this is a valid announcement, containing the minimal information **/
public boolean isValid() {
if (ownerId==null || ownerId.length()==0) {
return false;
}
if (loop) {
return true;
}
if (!isCorrectVersion()) {
return false;
}
if (localCluster==null) {
return false;
}
try{
List<InstanceDescription> instances = localCluster.getInstances();
if (instances==null || instances.size()==0) {
return false;
}
boolean isOwnerMemberOfLocalCluster = false;
for (Iterator<InstanceDescription> it = instances.iterator(); it.hasNext();) {
InstanceDescription instanceDescription = it.next();
if (instanceDescription.getSlingId().equals(ownerId)) {
isOwnerMemberOfLocalCluster = true;
}
}
if (!isOwnerMemberOfLocalCluster) {
return false;
}
} catch(Exception ise) {
return false;
}
return true;
}
/** set the inherited flag - if true this means this announcement is the response of a topology connect **/
public void setInherited(final boolean inherited) {
this.inherited = inherited;
}
/** Returns the inherited flag - if true this means that this announcement is the response of a topology connect **/
public boolean isInherited() {
return inherited;
}
/** Sets the loop falg - set true when this announcement should represent a loop detected in the topology connectors **/
public void setLoop(final boolean loop) {
this.loop = loop;
}
/** Sets the backoffInterval which the connector servlets passes back to the client to use as the next heartbeatInterval **/
public void setBackoffInterval(long backoffInterval) {
this.backoffInterval = backoffInterval;
}
/** Gets the backoffInterval which the connector servlets passes back to the client to use as the next heartbeatInterval **/
public long getBackoffInterval() {
return this.backoffInterval;
}
public long getOriginallyCreatedAt() {
return this.originallyCreatedAt;
}
public long getReceivedAt() {
return this.receivedAt;
}
/** sets the resetBackoff flag **/
public void setResetBackoff(boolean resetBackoff) {
this.resetBackoff = resetBackoff;
}
/** gets the resetBackoff flag **/
public boolean getResetBackoff() {
return resetBackoff;
}
/** Returns the loop flag - set when this announcement represents a loop detected in the topology connectors **/
public boolean isLoop() {
return loop;
}
/** Returns the protocolVersion of this announcement **/
public int getProtocolVersion() {
return protocolVersion;
}
/** sets the information about the server where this announcement came from **/
public void setServerInfo(final String serverInfo) {
this.serverInfo = serverInfo;
}
/** the information about the server where this announcement came from **/
public String getServerInfo() {
return serverInfo;
}
/**
* Returns the slingid of the owner of this announcement.
* <p>
* The owner is the instance which initiated the topology connection
*/
public String getOwnerId() {
return ownerId;
}
/** Convert this announcement into a json object **/
private JsonObject asJSONObject(boolean filterTimes) {
JsonObjectBuilder announcement = Json.createObjectBuilder();
announcement.add("ownerId", ownerId);
announcement.add("protocolVersion", protocolVersion);
// SLING-3389: leaving the 'created' property in the announcement
// for backwards compatibility!
if (!filterTimes) {
announcement.add("created", System.currentTimeMillis());
}
announcement.add("inherited", inherited);
if (loop) {
announcement.add("loop", loop);
}
if (serverInfo != null) {
announcement.add("serverInfo", serverInfo);
}
if (localCluster!=null) {
announcement.add("localClusterView", asJSON(localCluster));
}
if (!filterTimes && backoffInterval>0) {
announcement.add("backoffInterval", backoffInterval);
}
if (resetBackoff) {
announcement.add("resetBackoff", resetBackoff);
}
JsonArrayBuilder incomingAnnouncements = Json.createArrayBuilder();
for (Iterator<Announcement> it = incomings.iterator(); it.hasNext();) {
Announcement incoming = it.next();
incomingAnnouncements.add(incoming.asJSONObject(filterTimes));
}
announcement.add("topologyAnnouncements", incomingAnnouncements);
return announcement.build();
}
/** Create an announcement form json **/
public static Announcement fromJSON(final String topologyAnnouncementJSON) {
JsonObject announcement = jsonReaderFactory.createReader(new StringReader(topologyAnnouncementJSON)).readObject();
final String ownerId = announcement.getString("ownerId");
final int protocolVersion;
if (!announcement.containsKey("protocolVersion")) {
protocolVersion = -1;
} else {
protocolVersion = announcement.getInt("protocolVersion");
}
final Announcement result = new Announcement(ownerId, protocolVersion);
if (announcement.containsKey("created")) {
result.originallyCreatedAt = announcement.getJsonNumber("created").longValue();
}
if (announcement.containsKey("backoffInterval")) {
long backoffInterval = announcement.getJsonNumber("backoffInterval").longValue();
result.backoffInterval = backoffInterval;
}
if (announcement.containsKey("resetBackoff")) {
boolean resetBackoff = announcement.getBoolean("resetBackoff");
result.resetBackoff = resetBackoff;
}
if (announcement.containsKey("loop") && announcement.getBoolean("loop")) {
result.setLoop(true);
return result;
}
if (announcement.containsKey("localClusterView"))
{
final String localClusterViewJSON = asJSON(announcement
.getJsonObject("localClusterView"));
final ClusterView localClusterView = asClusterView(localClusterViewJSON);
result.setLocalCluster(localClusterView);
}
if (announcement.containsKey("inherited")) {
final Boolean inherited = announcement.getBoolean("inherited");
result.inherited = inherited;
}
if (announcement.containsKey("serverInfo")) {
String serverInfo = announcement.getString("serverInfo");
result.serverInfo = serverInfo;
}
final JsonArray subAnnouncements = announcement
.getJsonArray("topologyAnnouncements");
for (int i = 0; i < subAnnouncements.size(); i++) {
String subAnnouncementJSON = subAnnouncements.get(i).toString();
result.addIncomingTopologyAnnouncement(fromJSON(subAnnouncementJSON));
}
return result;
}
/** create a clusterview from json **/
private static ClusterView asClusterView(final String localClusterViewJSON) {
JsonObject obj = jsonReaderFactory.createReader(new StringReader(localClusterViewJSON)).readObject();
DefaultClusterView clusterView = new DefaultClusterView(
obj.getString("id"));
JsonArray instancesObj = obj.getJsonArray("instances");
for (int i = 0; i < instancesObj.size(); i++) {
JsonObject anInstance = instancesObj.getJsonObject(i);
clusterView.addInstanceDescription(asInstance(anInstance));
}
return clusterView;
}
/** convert a clusterview into json **/
private static JsonObject asJSON(final ClusterView clusterView) {
JsonObjectBuilder obj = Json.createObjectBuilder();
obj.add("id", clusterView.getId());
JsonArrayBuilder instancesObj = Json.createArrayBuilder();
List<InstanceDescription> instances = clusterView.getInstances();
for (Iterator<InstanceDescription> it = instances.iterator(); it
.hasNext();) {
InstanceDescription instanceDescription = it.next();
instancesObj.add(asJSON(instanceDescription));
}
obj.add("instances", instancesObj);
return obj.build();
}
/** create an instancedescription from json **/
private static DefaultInstanceDescription asInstance(final JsonObject anInstance) {
final boolean isLeader = anInstance.getBoolean("isLeader");
final String slingId = anInstance.getString("slingId");
final JsonObject propertiesObj = anInstance.getJsonObject("properties");
Iterator<String> it = propertiesObj.keySet().iterator();
Map<String, String> properties = new HashMap<String, String>();
while (it.hasNext()) {
String key = it.next();
properties.put(key, propertiesObj.getString(key));
}
NonLocalInstanceDescription instance = new NonLocalInstanceDescription(
null, isLeader, slingId, properties);
return instance;
}
/** convert an instance description into a json object **/
private static JsonObject asJSON(final InstanceDescription instanceDescription) {
JsonObjectBuilder obj = Json.createObjectBuilder();
obj.add("slingId", instanceDescription.getSlingId());
obj.add("isLeader", instanceDescription.isLeader());
ClusterView cluster = instanceDescription.getClusterView();
if (cluster != null) {
obj.add("cluster", cluster.getId());
}
JsonObjectBuilder propertiesObj = Json.createObjectBuilder();
Map<String, String> propertiesMap = instanceDescription.getProperties();
for (Iterator<Entry<String, String>> it = propertiesMap.entrySet()
.iterator(); it.hasNext();) {
Entry<String, String> entry = it.next();
propertiesObj.add(entry.getKey(), entry.getValue());
}
obj.add("properties", propertiesObj);
return obj.build();
}
/** sets the local clusterview **/
public void setLocalCluster(ClusterView localCluster) {
this.localCluster = localCluster;
}
/** adds an incoming announcement to this announcement **/
public void addIncomingTopologyAnnouncement(
Announcement incomingTopologyAnnouncement) {
incomings.add(incomingTopologyAnnouncement);
}
/** Convert this announcement into json **/
public String asJSON() {
return asJSON(asJSONObject(false));
}
private static String asJSON(JsonValue json) {
StringWriter writer = new StringWriter();
Json.createGenerator(writer).write(json).close();
return writer.toString();
}
/** the key which is unique to this announcement **/
public String getPrimaryKey() {
return ownerId;
}
/** Returns the list of instances that are contained in this announcement **/
public Collection<InstanceDescription> listInstances() {
Collection<InstanceDescription> instances = new LinkedList<InstanceDescription>();
instances.addAll(localCluster.getInstances());
for (Iterator<Announcement> it = incomings.iterator(); it.hasNext();) {
Announcement incomingAnnouncement = it.next();
instances.addAll(incomingAnnouncement.listInstances());
}
return instances;
}
/**
* Persists this announcement using the given 'announcements' resource,
* under which a node with the primary key is created
**/
public void persistTo(Resource announcementsResource)
throws PersistenceException {
Resource announcementChildResource = announcementsResource.getChild(getPrimaryKey());
// SLING-2967 used to introduce 'resetting the created time' here
// in order to become machine-clock independent.
// With introduction of SLING-3389, where we dont store any
// announcement-heartbeat-dates anymore at all, this resetting here
// became unnecessary.
final String announcementJson = asJSON();
if (announcementChildResource==null) {
final ResourceResolver resourceResolver = announcementsResource.getResourceResolver();
Map<String, Object> properties = new HashMap<String, Object>();
properties.put("topologyAnnouncement", announcementJson);
resourceResolver.create(announcementsResource, getPrimaryKey(), properties);
} else {
final ModifiableValueMap announcementChildMap = announcementChildResource.adaptTo(ModifiableValueMap.class);
announcementChildMap.put("topologyAnnouncement", announcementJson);
}
}
/**
* Remove all announcements that match the given owner Id
*/
public void removeInherited(final String ownerId) {
for (Iterator<Announcement> it = incomings.iterator(); it.hasNext();) {
Announcement anIncomingAnnouncement = it.next();
if (anIncomingAnnouncement.isInherited()
&& anIncomingAnnouncement.getOwnerId().equals(ownerId)) {
// then filter this
it.remove();
}
}
}
/**
* Compare this Announcement with another one, ignoring the 'created'
* property - which gets added to the JSON object automatically due
* to SLING-3389 wire-backwards-compatibility - and backoffInterval
* introduced as part of SLING-3382
*/
public boolean correspondsTo(Announcement announcement) {
final JsonObject myJson = asJSONObject(true);
final JsonObject otherJson = announcement.asJSONObject(true);
return asJSON(myJson).equals(asJSON(otherJson));
}
public void registerPing(Announcement incomingAnnouncement) {
originallyCreatedAt = incomingAnnouncement.originallyCreatedAt;
receivedAt = incomingAnnouncement.receivedAt;
}
}