blob: 401771a2e6bfa6a2a9f224ed84931209de52de97 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.cluster;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import org.apache.storm.callback.ZKStateChangedCallback;
import org.apache.storm.generated.ClusterWorkerHeartbeat;
import org.apache.storm.generated.HBExecutionException;
import org.apache.storm.generated.HBMessage;
import org.apache.storm.generated.HBMessageData;
import org.apache.storm.generated.HBPulse;
import org.apache.storm.generated.HBServerMessageType;
import org.apache.storm.pacemaker.PacemakerClientPool;
import org.apache.storm.pacemaker.PacemakerConnectionException;
import org.apache.storm.shade.org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.WrappedHBExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PaceMakerStateStorage implements IStateStorage {
private static final int maxRetries = 10;
private static Logger LOG = LoggerFactory.getLogger(PaceMakerStateStorage.class);
private PacemakerClientPool pacemakerClientPool;
private IStateStorage stateStorage;
public PaceMakerStateStorage(PacemakerClientPool pacemakerClientPool, IStateStorage stateStorage) throws Exception {
this.pacemakerClientPool = pacemakerClientPool;
this.stateStorage = stateStorage;
}
@Override
public String register(ZKStateChangedCallback callback) {
return stateStorage.register(callback);
}
@Override
public void unregister(String id) {
stateStorage.unregister(id);
}
@Override
public String create_sequential(String path, byte[] data, List<ACL> acls) {
return stateStorage.create_sequential(path, data, acls);
}
@Override
public void mkdirs(String path, List<ACL> acls) {
stateStorage.mkdirs(path, acls);
}
@Override
public void delete_node(String path) {
stateStorage.delete_node(path);
}
@Override
public void set_ephemeral_node(String path, byte[] data, List<ACL> acls) {
stateStorage.set_ephemeral_node(path, data, acls);
}
@Override
public Integer get_version(String path, boolean watch) throws Exception {
return stateStorage.get_version(path, watch);
}
@Override
public boolean node_exists(String path, boolean watch) {
return stateStorage.node_exists(path, watch);
}
@Override
public List<String> get_children(String path, boolean watch) {
return stateStorage.get_children(path, watch);
}
@Override
public void close() {
stateStorage.close();
pacemakerClientPool.close();
}
@Override
public void set_data(String path, byte[] data, List<ACL> acls) {
stateStorage.set_data(path, data, acls);
}
@Override
public byte[] get_data(String path, boolean watch) {
return stateStorage.get_data(path, watch);
}
@Override
public VersionedData<byte[]> get_data_with_version(String path, boolean watch) {
return stateStorage.get_data_with_version(path, watch);
}
@Override
public void set_worker_hb(String path, byte[] data, List<ACL> acls) {
int retry = maxRetries;
while (true) {
try {
HBPulse hbPulse = new HBPulse();
hbPulse.set_id(path);
hbPulse.set_details(data);
HBMessage message = new HBMessage(HBServerMessageType.SEND_PULSE, HBMessageData.pulse(hbPulse));
HBMessage response = pacemakerClientPool.send(message);
if (response.get_type() != HBServerMessageType.SEND_PULSE_RESPONSE) {
throw new WrappedHBExecutionException("Invalid Response Type");
}
LOG.debug("Successful set_worker_hb");
break;
} catch (HBExecutionException | PacemakerConnectionException e) {
if (retry <= 0) {
throw new RuntimeException(e);
}
retry--;
LOG.error("{} Failed to set_worker_hb. Will make {} more attempts.", e.getMessage(), retry);
} catch (InterruptedException e) {
LOG.debug("set_worker_hb got interrupted: {}", e);
throw new RuntimeException(e);
}
}
}
@Override
public byte[] get_worker_hb(String path, boolean watch) {
int retry = maxRetries;
while (true) {
try {
byte[] ret = null;
int latestTimeSecs = 0;
boolean gotResponse = false;
HBMessage message = new HBMessage(HBServerMessageType.GET_PULSE, HBMessageData.path(path));
List<HBMessage> responses = pacemakerClientPool.sendAll(message);
for (HBMessage response : responses) {
if (response.get_type() != HBServerMessageType.GET_PULSE_RESPONSE) {
LOG.error("get_worker_hb: Invalid Response Type");
continue;
}
// We got at least one GET_PULSE_RESPONSE message.
gotResponse = true;
byte[] details = response.get_data().get_pulse().get_details();
if (details == null) {
continue;
}
ClusterWorkerHeartbeat cwh = Utils.deserialize(details, ClusterWorkerHeartbeat.class);
if (cwh != null && cwh.get_time_secs() > latestTimeSecs) {
latestTimeSecs = cwh.get_time_secs();
ret = details;
}
}
if (!gotResponse) {
throw new WrappedHBExecutionException("Failed to get a response.");
}
return ret;
} catch (HBExecutionException | PacemakerConnectionException e) {
if (retry <= 0) {
throw new RuntimeException(e);
}
retry--;
LOG.error("{} Failed to get_worker_hb. Will make {} more attempts.", e.getMessage(), retry);
} catch (InterruptedException e) {
LOG.debug("get_worker_hb got interrupted: {}", e);
throw new RuntimeException(e);
}
}
}
@Override
public List<String> get_worker_hb_children(String path, boolean watch) {
int retry = maxRetries;
while (true) {
try {
HashSet<String> retSet = new HashSet<>();
HBMessage message = new HBMessage(HBServerMessageType.GET_ALL_NODES_FOR_PATH, HBMessageData.path(path));
List<HBMessage> responses = pacemakerClientPool.sendAll(message);
for (HBMessage response : responses) {
if (response.get_type() != HBServerMessageType.GET_ALL_NODES_FOR_PATH_RESPONSE) {
LOG.error("get_worker_hb_children: Invalid Response Type");
continue;
}
if (response.get_data().get_nodes().get_pulseIds() != null) {
retSet.addAll(response.get_data().get_nodes().get_pulseIds());
}
}
LOG.debug("Successful get_worker_hb_children");
return new ArrayList<>(retSet);
} catch (PacemakerConnectionException e) {
if (retry <= 0) {
throw new RuntimeException(e);
}
retry--;
LOG.error("{} Failed to get_worker_hb_children. Will make {} more attempts.", e.getMessage(), retry);
} catch (InterruptedException e) {
LOG.debug("get_worker_hb_children got interrupted: {}", e);
throw new RuntimeException(e);
}
}
}
@Override
public void delete_worker_hb(String path) {
int retry = maxRetries;
boolean someSucceeded;
while (true) {
someSucceeded = false;
try {
HBMessage message = new HBMessage(HBServerMessageType.DELETE_PATH, HBMessageData.path(path));
List<HBMessage> responses = pacemakerClientPool.sendAll(message);
boolean allSucceeded = true;
for (HBMessage response : responses) {
if (response.get_type() != HBServerMessageType.DELETE_PATH_RESPONSE) {
LOG.debug("Failed to delete heartbeat {}", response);
allSucceeded = false;
} else {
someSucceeded = true;
}
}
if (allSucceeded) {
break;
} else {
throw new WrappedHBExecutionException("Failed to delete from all pacemakers.");
}
} catch (HBExecutionException | PacemakerConnectionException e) {
if (retry <= 0) {
if (someSucceeded) {
LOG.warn("Unable to delete_worker_hb from every pacemaker.");
break;
} else {
LOG.error("Unable to delete_worker_hb from any pacemaker.");
throw new RuntimeException(e);
}
}
retry--;
LOG.debug("{} Failed to delete_worker_hb. Will make {} more attempts.", e.getMessage(), retry);
} catch (InterruptedException e) {
LOG.debug("delete_worker_hb got interrupted: {}", e);
throw new RuntimeException(e);
}
}
}
@Override
public void add_listener(ConnectionStateListener listener) {
stateStorage.add_listener(listener);
}
@Override
public void sync_path(String path) {
stateStorage.sync_path(path);
}
@Override
public void delete_node_blobstore(String path, String nimbusHostPortInfo) {
stateStorage.delete_node_blobstore(path, nimbusHostPortInfo);
}
}