blob: 913cc34306317d9fdbcc660ca8e62dc6324a9324 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.UpgradeManager;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.util.Daemon;
/**
* Upgrade manager for data-nodes.
*
* Distributed upgrades for a data-node are performed in a separate thread.
* The upgrade starts when the data-node receives the start upgrade command
* from the namenode. At that point the manager finds a respective upgrade
* object and starts a daemon in order to perform the upgrade defined by the
* object.
*/
class UpgradeManagerDatanode extends UpgradeManager {
DataNode dataNode = null;
Daemon upgradeDaemon = null;
UpgradeManagerDatanode(DataNode dataNode) {
super();
this.dataNode = dataNode;
}
public HdfsConstants.NodeType getType() {
return HdfsConstants.NodeType.DATA_NODE;
}
synchronized void initializeUpgrade(NamespaceInfo nsInfo) throws IOException {
if( ! super.initializeUpgrade())
return; // distr upgrade is not needed
DataNode.LOG.info("\n Distributed upgrade for DataNode "
+ dataNode.dnRegistration.getName()
+ " version " + getUpgradeVersion() + " to current LV "
+ FSConstants.LAYOUT_VERSION + " is initialized.");
UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
curUO.setDatanode(dataNode);
upgradeState = curUO.preUpgradeAction(nsInfo);
// upgradeState is true if the data-node should start the upgrade itself
}
/**
* Start distributed upgrade.
* Instantiates distributed upgrade objects.
*
* @return true if distributed upgrade is required or false otherwise
* @throws IOException
*/
public synchronized boolean startUpgrade() throws IOException {
if(upgradeState) { // upgrade is already in progress
assert currentUpgrades != null :
"UpgradeManagerDatanode.currentUpgrades is null.";
UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
curUO.startUpgrade();
return true;
}
if(broadcastCommand != null) {
if(broadcastCommand.getVersion() > this.getUpgradeVersion()) {
// stop broadcasting, the cluster moved on
// start upgrade for the next version
broadcastCommand = null;
} else {
// the upgrade has been finished by this data-node,
// but the cluster is still running it,
// reply with the broadcast command
assert currentUpgrades == null :
"UpgradeManagerDatanode.currentUpgrades is not null.";
assert upgradeDaemon == null :
"UpgradeManagerDatanode.upgradeDaemon is not null.";
dataNode.namenode.processUpgradeCommand(broadcastCommand);
return true;
}
}
if(currentUpgrades == null)
currentUpgrades = getDistributedUpgrades();
if(currentUpgrades == null) {
DataNode.LOG.info("\n Distributed upgrade for DataNode version "
+ getUpgradeVersion() + " to current LV "
+ FSConstants.LAYOUT_VERSION + " cannot be started. "
+ "The upgrade object is not defined.");
return false;
}
upgradeState = true;
UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
curUO.setDatanode(dataNode);
curUO.startUpgrade();
upgradeDaemon = new Daemon(curUO);
upgradeDaemon.start();
DataNode.LOG.info("\n Distributed upgrade for DataNode "
+ dataNode.dnRegistration.getName()
+ " version " + getUpgradeVersion() + " to current LV "
+ FSConstants.LAYOUT_VERSION + " is started.");
return true;
}
synchronized void processUpgradeCommand(UpgradeCommand command
) throws IOException {
assert command.getAction() == UpgradeCommand.UC_ACTION_START_UPGRADE :
"Only start upgrade action can be processed at this time.";
this.upgradeVersion = command.getVersion();
// Start distributed upgrade
if(startUpgrade()) // upgrade started
return;
throw new IOException(
"Distributed upgrade for DataNode " + dataNode.dnRegistration.getName()
+ " version " + getUpgradeVersion() + " to current LV "
+ FSConstants.LAYOUT_VERSION + " cannot be started. "
+ "The upgrade object is not defined.");
}
public synchronized void completeUpgrade() throws IOException {
assert currentUpgrades != null :
"UpgradeManagerDatanode.currentUpgrades is null.";
UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
broadcastCommand = curUO.completeUpgrade();
upgradeState = false;
currentUpgrades = null;
upgradeDaemon = null;
DataNode.LOG.info("\n Distributed upgrade for DataNode "
+ dataNode.dnRegistration.getName()
+ " version " + getUpgradeVersion() + " to current LV "
+ FSConstants.LAYOUT_VERSION + " is complete.");
}
synchronized void shutdownUpgrade() {
if(upgradeDaemon != null)
upgradeDaemon.interrupt();
}
}