blob: 51441512c73a17ca3cea034abbe7b2c738a451c7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.pacemaker;
import com.codahale.metrics.ExponentiallyDecayingReservoir;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.storm.generated.HBMessage;
import org.apache.storm.generated.HBMessageData;
import org.apache.storm.generated.HBNodes;
import org.apache.storm.generated.HBPulse;
import org.apache.storm.generated.HBServerMessageType;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.shade.uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.VersionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Pacemaker implements IServerMessageHandler {
private static final Logger LOG = LoggerFactory.getLogger(Pacemaker.class);
private final Meter meterSendPulseCount;
private final Meter meterTotalReceivedSize;
private final Meter meterGetPulseCount;
private final Meter meterTotalSentSize;
private final Histogram histogramHeartbeatSize;
private final Map<String, byte[]> heartbeats;
private final Map<String, Object> conf;
public Pacemaker(Map<String, Object> conf, StormMetricsRegistry metricsRegistry) {
heartbeats = new ConcurrentHashMap<>();
this.conf = conf;
this.meterSendPulseCount = metricsRegistry.registerMeter("pacemaker:send-pulse-count");
this.meterTotalReceivedSize = metricsRegistry.registerMeter("pacemaker:total-receive-size");
this.meterGetPulseCount = metricsRegistry.registerMeter("pacemaker:get-pulse=count");
this.meterTotalSentSize = metricsRegistry.registerMeter("pacemaker:total-sent-size");
this.histogramHeartbeatSize = metricsRegistry.registerHistogram("pacemaker:heartbeat-size", new ExponentiallyDecayingReservoir());
metricsRegistry.registerGauge("pacemaker:size-total-keys", heartbeats::size);
}
public static void main(String[] args) {
SysOutOverSLF4J.sendSystemOutAndErrToSLF4J();
Map<String, Object> conf = ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readStormConfig());
StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
final Pacemaker serverHandler = new Pacemaker(conf, metricsRegistry);
serverHandler.launchServer();
metricsRegistry.startMetricsReporters(conf);
Utils.addShutdownHookWithForceKillIn1Sec(metricsRegistry::stopMetricsReporters);
}
@Override
public HBMessage handleMessage(HBMessage m, boolean authenticated) {
HBMessage response = null;
HBMessageData data = m.get_data();
switch (m.get_type()) {
case CREATE_PATH:
response = createPath(data.get_path());
break;
case EXISTS:
response = pathExists(data.get_path(), authenticated);
break;
case SEND_PULSE:
response = sendPulse(data.get_pulse());
break;
case GET_ALL_PULSE_FOR_PATH:
response = getAllPulseForPath(data.get_path(), authenticated);
break;
case GET_ALL_NODES_FOR_PATH:
response = getAllNodesForPath(data.get_path(), authenticated);
break;
case GET_PULSE:
response = getPulse(data.get_path(), authenticated);
break;
case DELETE_PATH:
response = deletePath(data.get_path());
break;
case DELETE_PULSE_ID:
response = deletePulseId(data.get_path());
break;
default:
LOG.info("Got Unexpected Type: {}", m.get_type());
break;
}
if (response != null) {
response.set_message_id(m.get_message_id());
}
return response;
}
private HBMessage createPath(String path) {
return new HBMessage(HBServerMessageType.CREATE_PATH_RESPONSE, null);
}
private HBMessage pathExists(String path, boolean authenticated) {
HBMessage response = null;
if (authenticated) {
boolean itDoes = heartbeats.containsKey(path);
LOG.debug("Checking if path [ {} ] exists... {} .", path, itDoes);
response = new HBMessage(HBServerMessageType.EXISTS_RESPONSE, HBMessageData.boolval(itDoes));
} else {
response = notAuthorized();
}
return response;
}
private HBMessage notAuthorized() {
return new HBMessage(HBServerMessageType.NOT_AUTHORIZED, null);
}
private HBMessage sendPulse(HBPulse pulse) {
String id = pulse.get_id();
byte[] details = pulse.get_details();
LOG.debug("Saving Pulse for id [ {} ] data [ {} ].", id, details);
meterSendPulseCount.mark();
meterTotalReceivedSize.mark(details.length);
histogramHeartbeatSize.update(details.length);
heartbeats.put(id, details);
return new HBMessage(HBServerMessageType.SEND_PULSE_RESPONSE, null);
}
private HBMessage getAllPulseForPath(String path, boolean authenticated) {
if (authenticated) {
return new HBMessage(HBServerMessageType.GET_ALL_PULSE_FOR_PATH_RESPONSE, null);
} else {
return notAuthorized();
}
}
private HBMessage getAllNodesForPath(String path, boolean authenticated) {
LOG.debug("List all nodes for path {}", path);
if (authenticated) {
Set<String> pulseIds = new HashSet<>();
for (String key : heartbeats.keySet()) {
String[] replaceStr = key.replaceFirst(path, "").split("/");
String trimmed = null;
for (String str : replaceStr) {
if (!str.equals("")) {
trimmed = str;
break;
}
}
if (trimmed != null && key.indexOf(path) == 0) {
pulseIds.add(trimmed);
}
}
HBMessageData hbMessageData = HBMessageData.nodes(new HBNodes(new ArrayList(pulseIds)));
return new HBMessage(HBServerMessageType.GET_ALL_NODES_FOR_PATH_RESPONSE, hbMessageData);
} else {
return notAuthorized();
}
}
private HBMessage getPulse(String path, boolean authenticated) {
if (authenticated) {
byte[] details = heartbeats.get(path);
LOG.debug("Getting Pulse for path [ {} ]...data [ {} ].", path, details);
meterGetPulseCount.mark();
if (details != null) {
meterTotalSentSize.mark(details.length);
}
HBPulse hbPulse = new HBPulse();
hbPulse.set_id(path);
hbPulse.set_details(details);
return new HBMessage(HBServerMessageType.GET_PULSE_RESPONSE, HBMessageData.pulse(hbPulse));
} else {
return notAuthorized();
}
}
private HBMessage deletePath(String path) {
String prefix = path.endsWith("/") ? path : (path + "/");
for (String key : heartbeats.keySet()) {
String checkKey = key + "/";
if (checkKey.indexOf(prefix) == 0) {
deletePulseId(key);
}
}
return new HBMessage(HBServerMessageType.DELETE_PATH_RESPONSE, null);
}
private HBMessage deletePulseId(String path) {
LOG.debug("Deleting Pulse for id [ {} ].", path);
heartbeats.remove(path);
return new HBMessage(HBServerMessageType.DELETE_PULSE_ID_RESPONSE, null);
}
private PacemakerServer launchServer() {
LOG.info("Starting pacemaker server for storm version '{}", VersionInfo.getVersion());
return new PacemakerServer(this, conf);
}
}