blob: 449dd2784591b142de9d8bf85777cce330c0b326 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.app.replication;
import static org.apache.asterix.api.http.server.ServletConstants.SYS_AUTH_HEADER;
import static org.apache.asterix.common.config.ExternalProperties.Option.NC_API_PORT;
import static org.apache.hyracks.api.exceptions.ErrorCode.NODE_FAILED;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
import org.apache.asterix.app.nc.task.CheckpointTask;
import org.apache.asterix.app.nc.task.ExportMetadataNodeTask;
import org.apache.asterix.app.nc.task.LocalRecoveryTask;
import org.apache.asterix.app.nc.task.LocalStorageCleanupTask;
import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
import org.apache.asterix.app.nc.task.RetrieveLibrariesTask;
import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
import org.apache.asterix.app.nc.task.StartReplicationServiceTask;
import org.apache.asterix.app.nc.task.UpdateNodeStatusTask;
import org.apache.asterix.app.replication.message.MetadataNodeRequestMessage;
import org.apache.asterix.app.replication.message.MetadataNodeResponseMessage;
import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.common.replication.INCLifecycleMessage;
import org.apache.asterix.common.replication.INcLifecycleCoordinator;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.replication.messaging.ReplicaFailedMessage;
import org.apache.http.client.utils.URIBuilder;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.client.NodeStatus;
import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.control.IGatekeeper;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import io.netty.handler.codec.http.HttpScheme;
public class NcLifecycleCoordinator implements INcLifecycleCoordinator {
private static final Logger LOGGER = LogManager.getLogger();
protected IClusterStateManager clusterManager;
protected volatile String metadataNodeId;
protected Set<String> pendingStartupCompletionNodes = Collections.synchronizedSet(new HashSet<>());
protected final ICCMessageBroker messageBroker;
private final boolean replicationEnabled;
private final IGatekeeper gatekeeper;
Map<String, Map<String, Object>> nodeSecretsMap;
private ICCServiceContext serviceContext;
public NcLifecycleCoordinator(ICCServiceContext serviceCtx, boolean replicationEnabled) {
this.serviceContext = serviceCtx;
this.messageBroker = (ICCMessageBroker) serviceCtx.getMessageBroker();
this.replicationEnabled = replicationEnabled;
this.gatekeeper =
((ClusterControllerService) serviceCtx.getControllerService()).getApplication().getGatekeeper();
this.nodeSecretsMap = new HashMap<>();
}
@Override
public void notifyNodeJoin(String nodeId) {
pendingStartupCompletionNodes.add(nodeId);
}
@Override
public void notifyNodeFailure(String nodeId, InetSocketAddress replicaAddress) throws HyracksDataException {
pendingStartupCompletionNodes.remove(nodeId);
clusterManager.updateNodeState(nodeId, false, null, null);
if (nodeId.equals(metadataNodeId)) {
clusterManager.updateMetadataNode(metadataNodeId, false);
}
if (replicaAddress != null) {
notifyFailedReplica(clusterManager, nodeId, replicaAddress);
}
clusterManager.refreshState();
}
@Override
public void process(INCLifecycleMessage message) throws HyracksDataException {
switch (message.getType()) {
case REGISTRATION_TASKS_REQUEST:
process((RegistrationTasksRequestMessage) message);
break;
case REGISTRATION_TASKS_RESULT:
process((NCLifecycleTaskReportMessage) message);
break;
case METADATA_NODE_RESPONSE:
process((MetadataNodeResponseMessage) message);
break;
default:
throw new RuntimeDataException(ErrorCode.UNSUPPORTED_MESSAGE_TYPE, message.getType().name());
}
}
@Override
public void bindTo(IClusterStateManager clusterManager) {
this.clusterManager = clusterManager;
metadataNodeId = clusterManager.getCurrentMetadataNodeId();
}
private void process(RegistrationTasksRequestMessage msg) throws HyracksDataException {
final String nodeId = msg.getNodeId();
nodeSecretsMap.put(nodeId, msg.getSecrets());
List<INCLifecycleTask> tasks =
buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState(), msg.getActivePartitions());
RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks);
try {
messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
} catch (Exception e) {
throw HyracksDataException.create(e);
}
}
private void process(NCLifecycleTaskReportMessage msg) throws HyracksDataException {
if (!pendingStartupCompletionNodes.remove(msg.getNodeId())) {
LOGGER.warn("Received unexpected startup completion message from node {}", msg.getNodeId());
}
if (!gatekeeper.isAuthorized(msg.getNodeId())) {
LOGGER.warn("Node {} lost authorization before startup completed; ignoring registration result",
msg.getNodeId());
return;
}
if (msg.isSuccess()) {
clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters(), msg.getActivePartitions());
if (msg.getNodeId().equals(metadataNodeId)) {
clusterManager.updateMetadataNode(metadataNodeId, true);
}
clusterManager.refreshState();
} else {
LOGGER.error("Node {} failed to complete startup", msg.getNodeId(), msg.getException());
}
}
protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state,
Set<Integer> activePartitions) {
LOGGER.info(
"Building registration tasks for node {} with status {} and system state: {} and active partitions {}",
nodeId, nodeStatus, state, activePartitions);
final boolean isMetadataNode = nodeId.equals(metadataNodeId);
switch (nodeStatus) {
case ACTIVE:
return buildActiveNCRegTasks(isMetadataNode);
case IDLE:
return buildIdleNcRegTasks(nodeId, isMetadataNode, state, activePartitions);
default:
return new ArrayList<>();
}
}
protected List<INCLifecycleTask> buildActiveNCRegTasks(boolean metadataNode) {
final List<INCLifecycleTask> tasks = new ArrayList<>();
if (metadataNode) {
tasks.add(new BindMetadataNodeTask());
}
return tasks;
}
@Override
public synchronized void notifyMetadataNodeChange(String node) throws HyracksDataException {
if (metadataNodeId.equals(node)) {
return;
}
// if current metadata node is active, we need to unbind its metadata proxy objects
if (clusterManager.isMetadataNodeActive()) {
MetadataNodeRequestMessage msg =
new MetadataNodeRequestMessage(false, clusterManager.getMetadataPartition().getPartitionId());
try {
messageBroker.sendApplicationMessageToNC(msg, metadataNodeId);
// when the current node responses, we will bind to the new one
metadataNodeId = node;
} catch (Exception e) {
throw HyracksDataException.create(e);
}
} else {
requestMetadataNodeTakeover(node);
}
}
protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state,
Set<Integer> activePartitions) {
final List<INCLifecycleTask> tasks = new ArrayList<>();
Set<Integer> nodeActivePartitions = getNodeActivePartitions(newNodeId, activePartitions, metadataNode);
tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING, nodeActivePartitions));
int metadataPartitionId = clusterManager.getMetadataPartition().getPartitionId();
tasks.add(new LocalStorageCleanupTask(metadataPartitionId));
if (state == SystemState.CORRUPTED) {
// need to perform local recovery for node active partitions
LocalRecoveryTask rt = new LocalRecoveryTask(nodeActivePartitions);
tasks.add(rt);
}
if (replicationEnabled) {
tasks.add(new StartReplicationServiceTask());
}
if (metadataNode) {
tasks.add(new MetadataBootstrapTask(metadataPartitionId));
}
tasks.add(new CheckpointTask());
tasks.add(new StartLifecycleComponentsTask());
if (isLibraryFetchEnabled() && clusterManager.getState() == ClusterState.ACTIVE) {
Set<String> nodes = clusterManager.getParticipantNodes(true);
if (nodes.size() > 0) {
try {
tasks.add(nodesToLibraryTask(newNodeId, nodes));
} catch (HyracksDataException e) {
LOGGER.error("Could not construct library recovery task", e);
}
}
}
if (metadataNode) {
tasks.add(new ExportMetadataNodeTask(true));
tasks.add(new BindMetadataNodeTask());
}
tasks.add(new UpdateNodeStatusTask(NodeStatus.ACTIVE, nodeActivePartitions));
return tasks;
}
private synchronized void process(MetadataNodeResponseMessage response) throws HyracksDataException {
// rebind metadata node since it might be changing
MetadataManager.INSTANCE.rebindMetadataNode();
clusterManager.updateMetadataNode(response.getNodeId(), response.isExported());
if (!response.isExported()) {
requestMetadataNodeTakeover(metadataNodeId);
}
}
protected String getNCAuthToken(String node) {
return (String) nodeSecretsMap.get(node).get(SYS_AUTH_HEADER);
}
protected URI constructNCRecoveryUri(String nodeId) throws HyracksDataException {
Map<IOption, Object> nodeConfig = clusterManager.getNcConfiguration().get(nodeId);
String host = (String) nodeConfig.get(NCConfig.Option.PUBLIC_ADDRESS);
int port = (Integer) nodeConfig.get(NC_API_PORT);
URIBuilder builder = new URIBuilder().setScheme(HttpScheme.HTTP.toString()).setHost(host).setPort(port);
try {
return builder.build();
} catch (URISyntaxException e) {
LOGGER.error("Could not find URL for NC recovery", e);
throw HyracksDataException.create(e);
}
}
private void requestMetadataNodeTakeover(String node) throws HyracksDataException {
MetadataNodeRequestMessage msg =
new MetadataNodeRequestMessage(true, clusterManager.getMetadataPartition().getPartitionId());
try {
messageBroker.sendApplicationMessageToNC(msg, node);
} catch (Exception e) {
throw HyracksDataException.create(e);
}
}
protected RetrieveLibrariesTask nodesToLibraryTask(String newNodeId, Set<String> referenceNodes)
throws HyracksDataException {
List<Pair<URI, String>> referenceNodeLocAndAuth = new ArrayList<>();
for (String node : referenceNodes) {
referenceNodeLocAndAuth.add(new Pair<>(constructNCRecoveryUri(node), getNCAuthToken(node)));
}
return getRetrieveLibrariesTask(referenceNodeLocAndAuth);
}
protected RetrieveLibrariesTask getRetrieveLibrariesTask(List<Pair<URI, String>> referenceNodeLocAndAuth) {
return new RetrieveLibrariesTask(referenceNodeLocAndAuth);
}
protected boolean isLibraryFetchEnabled() {
return true;
}
protected Set<Integer> getNodeActivePartitions(String nodeId, Set<Integer> nodePartitions, boolean metadataNode) {
if (metadataNode) {
nodePartitions.add(clusterManager.getMetadataPartition().getPartitionId());
}
return nodePartitions;
}
private void notifyFailedReplica(IClusterStateManager clusterManager, String nodeID,
InetSocketAddress replicaAddress) {
LOGGER.info("notify replica failure of nodeId {} at {}", nodeID, replicaAddress);
Set<String> ncs = clusterManager.getParticipantNodes(true);
ReplicaFailedMessage message =
new ReplicaFailedMessage(replicaAddress, HyracksDataException.create(NODE_FAILED, nodeID));
for (String nodeId : ncs) {
try {
messageBroker.sendApplicationMessageToNC(message, nodeId);
} catch (Exception e) {
LOGGER.info("failed to notify replica failure to node {}", nodeID);
}
}
}
}