blob: f001a3903acf37c3030ec8e947930f347455b262 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.upgrade;
import static org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.ON_FINALIZE;
import static org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.ON_FIRST_UPGRADE_START;
import static org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.VALIDATE_IN_PREFINALIZE;
import static org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.FIRST_UPGRADE_START_ACTION_FAILED;
import static org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.INVALID_REQUEST;
import static org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.LAYOUT_FEATURE_FINALIZATION_FAILED;
import static org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.PREFINALIZE_ACTION_VALIDATION_FAILED;
import static org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.UPDATE_LAYOUT_VERSION_FAILED;
import static org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
import static org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
import static org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.STARTING_FINALIZATION;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Function;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.hadoop.ozone.common.Storage;
import org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeAction;
import org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType;
import org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes;
import com.google.common.annotations.VisibleForTesting;
/**
* UpgradeFinalizer implementation for the Storage Container Manager service.
*/
public abstract class BasicUpgradeFinalizer
<T, V extends AbstractLayoutVersionManager> implements UpgradeFinalizer<T> {
private V versionManager;
private String clientID;
private T component;
private DefaultUpgradeFinalizationExecutor<T> finalizationExecutor;
private Queue<String> msgs = new ConcurrentLinkedQueue<>();
private boolean isDone = false;
public BasicUpgradeFinalizer(V versionManager) {
this.versionManager = versionManager;
this.finalizationExecutor = new DefaultUpgradeFinalizationExecutor<>();
}
public StatusAndMessages finalize(String upgradeClientID, T service)
throws IOException {
StatusAndMessages response = initFinalize(upgradeClientID, service);
if (response.status() != FINALIZATION_REQUIRED) {
return response;
}
finalizationExecutor.execute(service, this);
return STARTING_MSG;
}
public synchronized StatusAndMessages reportStatus(
String upgradeClientID, boolean takeover) throws UpgradeException {
if (takeover) {
clientID = upgradeClientID;
}
assertClientId(upgradeClientID);
List<String> returningMsgs = new ArrayList<>(msgs.size() + 10);
Status status = versionManager.getUpgradeState();
while (msgs.size() > 0) {
returningMsgs.add(msgs.poll());
}
return new StatusAndMessages(status, returningMsgs);
}
@Override
public synchronized Status getStatus() {
return versionManager.getUpgradeState();
}
protected void preFinalizeUpgrade(T service) throws IOException {
// No Op by default.
}
protected void postFinalizeUpgrade(T service) throws IOException {
// No Op by default.
}
public abstract void finalizeUpgrade(T service) throws UpgradeException;
@Override
public void finalizeAndWaitForCompletion(
String upgradeClientID, T service, long maxTimeToWaitInSeconds)
throws IOException {
StatusAndMessages response = finalize(upgradeClientID, service);
LOG.info("Finalization Messages : {} ", response.msgs());
if (isFinalized(response.status())) {
return;
}
boolean success = false;
long endTime = System.currentTimeMillis() +
TimeUnit.SECONDS.toMillis(maxTimeToWaitInSeconds);
while (System.currentTimeMillis() < endTime) {
try {
response = reportStatus(upgradeClientID, false);
LOG.info("Finalization Messages : {} ", response.msgs());
if (isFinalized(response.status())) {
success = true;
break;
}
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Finalization Wait thread interrupted!");
}
}
if (!success) {
throw new IOException(
String.format("Unable to finalize after waiting for %d seconds",
maxTimeToWaitInSeconds));
}
}
public boolean isFinalizationDone() {
return isDone;
}
public void markFinalizationDone() {
isDone = true;
}
public V getVersionManager() {
return versionManager;
}
private synchronized StatusAndMessages initFinalize(
String upgradeClientID, T id) throws UpgradeException {
switch (versionManager.getUpgradeState()) {
case STARTING_FINALIZATION:
return STARTING_MSG;
case FINALIZATION_IN_PROGRESS:
return FINALIZATION_IN_PROGRESS_MSG;
case FINALIZATION_DONE:
case ALREADY_FINALIZED:
if (versionManager.needsFinalization()) {
throw new UpgradeException("Upgrade found in inconsistent state. " +
"Upgrade state is FINALIZATION Complete while MLV has not been " +
"upgraded to SLV.", INVALID_REQUEST);
}
return FINALIZED_MSG;
default:
if (!versionManager.needsFinalization()) {
throw new UpgradeException("Upgrade found in inconsistent state. " +
"Upgrade state is FINALIZATION_REQUIRED while MLV has been " +
"upgraded to SLV.", INVALID_REQUEST);
}
versionManager.setUpgradeState(STARTING_FINALIZATION);
this.clientID = upgradeClientID;
this.component = id;
return FINALIZATION_REQUIRED_MSG;
}
}
private void assertClientId(String id) throws UpgradeException {
if (this.clientID == null || !this.clientID.equals(id)) {
throw new UpgradeException("Unknown client tries to get finalization " +
"status.\n The requestor is not the initiating client of the " +
"finalization, if you want to take over, and get unsent status " +
"messages, check -takeover option.", INVALID_REQUEST);
}
}
private static boolean isFinalized(Status status) {
return status.equals(Status.ALREADY_FINALIZED)
|| status.equals(FINALIZATION_DONE);
}
protected void finalizeUpgrade(Supplier<Storage> storageSuppplier)
throws UpgradeException {
for (Object obj : versionManager.unfinalizedFeatures()) {
LayoutFeature lf = (LayoutFeature) obj;
Storage layoutStorage = storageSuppplier.get();
Optional<? extends UpgradeAction> action = lf.action(ON_FINALIZE);
runFinalizationAction(lf, action);
updateLayoutVersionInVersionFile(lf, layoutStorage);
versionManager.finalized(lf);
}
versionManager.completeFinalization();
}
protected void runFinalizationAction(LayoutFeature feature,
Optional<?extends UpgradeAction> action) throws UpgradeException {
if (!action.isPresent()) {
emitNOOPMsg(feature.name());
} else {
try {
action.get().execute(component);
} catch (Exception e) {
logFinalizationFailureAndThrow(e, feature.name());
}
}
}
@VisibleForTesting
protected void runPrefinalizeStateActions(Function<LayoutFeature,
Function<UpgradeActionType, Optional<? extends UpgradeAction>>> aFunction,
Storage storage, T service) throws IOException {
if (!versionManager.needsFinalization()) {
return;
}
this.component = service;
LOG.info("Running pre-finalized state validations for unfinalized " +
"layout features.");
for (Object obj : versionManager.unfinalizedFeatures()) {
LayoutFeature lf = (LayoutFeature) obj;
Function<UpgradeActionType, Optional<? extends UpgradeAction>> function =
aFunction.apply(lf);
Optional<? extends UpgradeAction> action =
function.apply(VALIDATE_IN_PREFINALIZE);
if (action.isPresent()) {
runValidationAction(lf, action.get());
}
}
LOG.info("Running first upgrade commands for unfinalized layout features.");
for (Object obj : versionManager.unfinalizedFeatures()) {
LayoutFeature lf = (LayoutFeature) obj;
Function<UpgradeActionType, Optional<? extends UpgradeAction>> function =
aFunction.apply(lf);
Optional<? extends UpgradeAction> action =
function.apply(ON_FIRST_UPGRADE_START);
if (action.isPresent()) {
runFirstUpgradeAction(lf, action.get(), storage);
}
}
}
private void runValidationAction(LayoutFeature f, UpgradeAction action)
throws UpgradeException {
try {
LOG.info("Executing pre finalize state validation {}", action.name());
action.execute(component);
} catch (Exception ex) {
String msg = "Exception while running pre finalize state validation " +
"for feature %s";
LOG.error(String.format(msg, f.name()));
throw new UpgradeException(
String.format(msg, f.name()), ex,
PREFINALIZE_ACTION_VALIDATION_FAILED);
}
}
private void runFirstUpgradeAction(LayoutFeature f, UpgradeAction action,
Storage storage) throws IOException {
try {
int versionOnDisk = storage.getFirstUpgradeActionLayoutVersion();
if (f.layoutVersion() > versionOnDisk) {
LOG.info("Executing first upgrade start action {}", action.name());
action.execute(component);
} else {
LOG.info("Skipping action {} since it has already been run.",
action.name());
}
storage.setFirstUpgradeActionLayoutVersion(f.layoutVersion());
persistStorage(storage);
} catch (Exception ex) {
String msg = "Exception while running first upgrade run actions " +
"for feature %s";
LOG.error(String.format(msg, f.name()));
throw new UpgradeException(
String.format(msg, f.name()), ex, FIRST_UPGRADE_START_ACTION_FAILED);
}
}
protected void updateLayoutVersionInVersionFile(LayoutFeature feature,
Storage config)
throws UpgradeException {
int prevLayoutVersion = currentStoredLayoutVersion(config);
updateStorageLayoutVersion(feature.layoutVersion(), config);
try {
persistStorage(config);
} catch (IOException e) {
updateStorageLayoutVersion(prevLayoutVersion, config);
logLayoutVersionUpdateFailureAndThrow(e);
}
}
private int currentStoredLayoutVersion(Storage config) {
return config.getLayoutVersion();
}
private void updateStorageLayoutVersion(int version, Storage config) {
config.setLayoutVersion(version);
}
private void persistStorage(Storage config) throws IOException {
config.persistCurrentState();
}
protected void emitNOOPMsg(String feature) {
String msg = "No onFinalize work defined for feature: " + feature + ".";
logAndEmit(msg);
}
protected void emitStartingMsg() {
String msg = "Finalization started.";
logAndEmit(msg);
}
protected void emitFinishedMsg() {
String msg = "Finalization is done.";
logAndEmit(msg);
}
protected void logAndEmit(String msg) {
LOG.info(msg);
msgs.offer(msg);
}
protected void logFinalizationFailureAndThrow(Exception e, String feature)
throws UpgradeException {
String msg = "Error during finalization of " + feature + ".";
logAndThrow(e, msg, LAYOUT_FEATURE_FINALIZATION_FAILED);
}
private void logLayoutVersionUpdateFailureAndThrow(IOException e)
throws UpgradeException {
String msg = "Updating the LayoutVersion in the VERSION file failed.";
logAndThrow(e, msg, UPDATE_LAYOUT_VERSION_FAILED);
}
private void logAndThrow(Exception e, String msg, ResultCodes resultCode)
throws UpgradeException {
LOG.error(msg, e);
throw new UpgradeException(msg, e, resultCode);
}
@VisibleForTesting
public void setFinalizationExecutor(DefaultUpgradeFinalizationExecutor
executor) {
finalizationExecutor = executor;
}
}