blob: e6248dc1e276b56c099b289408eedbc59c143bd2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager;
import java.io.IOException;
import java.net.DatagramSocket;
import java.util.List;
import java.util.Map.Entry;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.model.ActiveGossipOk;
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.GossipMember;
import org.apache.gossip.model.Response;
import org.apache.gossip.model.SharedGossipDataMessage;
import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpGossipDataMessage;
import org.apache.gossip.udp.UdpSharedGossipDataMessage;
import org.apache.log4j.Logger;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* [The active thread: periodically send gossip request.] The class handles gossiping the membership
* list. This information is important to maintaining a common state among all the nodes, and is
* important for detecting failures.
*/
public class ActiveGossipThread {
private static final Logger LOGGER = Logger.getLogger(ActiveGossipThread.class);
private final GossipManager gossipManager;
private final Random random;
private final GossipCore gossipCore;
private ScheduledExecutorService scheduledExecutorService;
private final BlockingQueue<Runnable> workQueue;
private ThreadPoolExecutor threadService;
private ObjectMapper MAPPER = new ObjectMapper();
public ActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
this.gossipManager = gossipManager;
random = new Random();
this.gossipCore = gossipCore;
this.scheduledExecutorService = Executors.newScheduledThreadPool(2);
workQueue = new ArrayBlockingQueue<Runnable>(1024);
threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy());
}
public void init() {
scheduledExecutorService.scheduleAtFixedRate(
() -> {
threadService.execute( () -> { sendToALiveMember(); });
}, 0,
gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(
() -> { this.sendToDeadMember(); }, 0,
gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(
() -> sendPerNodeData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(
() -> sendSharedData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
}
public void shutdown() {
scheduledExecutorService.shutdown();
try {
scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOGGER.debug("Issue during shurdown" + e);
}
}
public void sendSharedData(LocalGossipMember me, List<LocalGossipMember> memberList){
LocalGossipMember member = selectPartner(memberList);
if (member == null) {
LOGGER.debug("Send sendMembershipList() is called without action");
return;
}
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
for (Entry<String, SharedGossipDataMessage> innerEntry : this.gossipCore.getSharedData().entrySet()){
UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage();
message.setUuid(UUID.randomUUID().toString());
message.setUriFrom(me.getId());
message.setExpireAt(innerEntry.getValue().getExpireAt());
message.setKey(innerEntry.getValue().getKey());
message.setNodeId(innerEntry.getValue().getNodeId());
message.setTimestamp(innerEntry.getValue().getTimestamp());
message.setPayload(innerEntry.getValue().getPayload());
message.setTimestamp(innerEntry.getValue().getTimestamp());
byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
int packet_length = json_bytes.length;
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
gossipCore.sendOneWay(message, member.getUri());
} else {
LOGGER.error("The length of the to be send message is too large ("
+ packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
}
}
} catch (IOException e1) {
LOGGER.warn(e1);
}
}
public void sendPerNodeData(LocalGossipMember me, List<LocalGossipMember> memberList){
LocalGossipMember member = selectPartner(memberList);
if (member == null) {
LOGGER.debug("Send sendMembershipList() is called without action");
return;
}
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
for (Entry<String, GossipDataMessage> innerEntry : entry.getValue().entrySet()){
UdpGossipDataMessage message = new UdpGossipDataMessage();
message.setUuid(UUID.randomUUID().toString());
message.setUriFrom(me.getId());
message.setExpireAt(innerEntry.getValue().getExpireAt());
message.setKey(innerEntry.getValue().getKey());
message.setNodeId(innerEntry.getValue().getNodeId());
message.setTimestamp(innerEntry.getValue().getTimestamp());
message.setPayload(innerEntry.getValue().getPayload());
message.setTimestamp(innerEntry.getValue().getTimestamp());
byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
int packet_length = json_bytes.length;
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
gossipCore.sendOneWay(message, member.getUri());
} else {
LOGGER.error("The length of the to be send message is too large ("
+ packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
}
}
}
} catch (IOException e1) {
LOGGER.warn(e1);
}
}
protected void sendToALiveMember(){
LocalGossipMember member = selectPartner(gossipManager.getLiveMembers());
sendMembershipList(gossipManager.getMyself(), member);
}
protected void sendToDeadMember(){
LocalGossipMember member = selectPartner(gossipManager.getDeadMembers());
sendMembershipList(gossipManager.getMyself(), member);
}
/**
* Performs the sending of the membership list, after we have incremented our own heartbeat.
*/
protected void sendMembershipList(LocalGossipMember me, LocalGossipMember member) {
me.setHeartbeat(System.nanoTime());
if (member == null) {
LOGGER.debug("Send sendMembershipList() is called without action");
return;
} else {
LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
}
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
UdpActiveGossipMessage message = new UdpActiveGossipMessage();
message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
message.setUuid(UUID.randomUUID().toString());
message.getMembers().add(convert(me));
for (LocalGossipMember other : gossipManager.getMembers().keySet()) {
message.getMembers().add(convert(other));
}
byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
int packet_length = json_bytes.length;
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
Response r = gossipCore.send(message, member.getUri());
if (r instanceof ActiveGossipOk){
//maybe count metrics here
} else {
LOGGER.debug("Message " + message + " generated response " + r);
}
} else {
LOGGER.error("The length of the to be send message is too large ("
+ packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
}
} catch (IOException e1) {
LOGGER.warn(e1);
}
}
/**
*
* @param memberList
* The list of members which are stored in the local list of members.
* @return The chosen LocalGossipMember to gossip with.
*/
protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
LocalGossipMember member = null;
if (memberList.size() > 0) {
int randomNeighborIndex = random.nextInt(memberList.size());
member = memberList.get(randomNeighborIndex);
} else {
LOGGER.debug("I am alone in this world.");
}
return member;
}
private GossipMember convert(LocalGossipMember member){
GossipMember gm = new GossipMember();
gm.setCluster(member.getClusterName());
gm.setHeartbeat(member.getHeartbeat());
gm.setUri(member.getUri().toASCIIString());
gm.setId(member.getId());
return gm;
}
}