blob: e81b45f4d0223fe487726579da38275896127d29 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.upgrade;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.stream.Collectors.toList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* Generic factory which stores different instances of Type 'T' sharded by
* a key & version. A single key can be associated with different versions
* of 'T'.
*
* Why does this class exist?
* A typical use case during upgrade is to have multiple versions of a class
* / method / object and chose them based on current layout
* version at runtime. Before finalizing, an older version is typically
* needed, and after finalize, a newer version is needed. This class serves
* this purpose in a generic way.
*
* For example, we can create a Factory to create multiple versions of
* OMRequests sharded by Request Type & Layout Version Supported.
*/
public class LayoutVersionInstanceFactory<T> {
private static final Logger LOG =
LoggerFactory.getLogger(LayoutVersionInstanceFactory.class);
/**
* The factory will maintain ALL instances > MLV and 1 instance <= MLV in a
* priority queue (ordered by version). By doing that it guarantees O(1)
* lookup at all times, since we always would lookup the first element (top
* of the PQ).
* Multiple entries will be there ONLY during pre-finalized state.
* On finalization, we will be removing the entry one by one until we reach
* a single entry. On a regular component instance (finalized), there will
* be a single request version associated with a request always.
*/
private final Map<String, PriorityQueue<VersionedInstance<T>>> instances =
new HashMap<>();
/**
* Register an instance with a given factory key (key + version).
* For safety reasons we dont allow (1) re-registering, (2) registering an
* instance with version > SLV.
*
* @param lvm LayoutVersionManager
* @param key VersionFactoryKey key to associate with instance.
* @param instance instance to register.
*/
public boolean register(LayoutVersionManager lvm, VersionFactoryKey key,
T instance) {
// If version is not passed in, go defensive and set the highest possible
// version (SLV).
int version = key.getVersion() == null ?
lvm.getSoftwareLayoutVersion() : key.getVersion();
checkArgument(lvm.getSoftwareLayoutVersion() >= key.getVersion(),
String.format("Cannot register key %s since the version is greater " +
"than the Software layout version %d",
key, lvm.getSoftwareLayoutVersion()));
// If we reach here, we know that the passed in version belongs to
// [0, SLV].
String primaryKey = key.getKey();
instances.computeIfAbsent(primaryKey, s ->
new PriorityQueue<>(Comparator.comparingInt(o -> o.version)));
PriorityQueue<VersionedInstance<T>> versionedInstances =
instances.get(primaryKey);
Optional<VersionedInstance<T>> existingInstance =
versionedInstances.parallelStream()
.filter(v -> v.version == key.getVersion()).findAny();
if (existingInstance.isPresent()) {
throw new IllegalArgumentException(String.format("Cannot register key " +
"%s since there is an existing entry already.", key));
}
if (!versionedInstances.isEmpty() && isValid(lvm, version)) {
VersionedInstance<T> currentPeek = versionedInstances.peek();
if (currentPeek.version < version) {
// Current peek < passed in version (and <= MLV). Hence, we can
// remove it, since the passed in a better candidate.
versionedInstances.poll();
// Add the passed in instance.
versionedInstances.offer(new VersionedInstance<>(version, instance));
return true;
} else if (currentPeek.version > lvm.getMetadataLayoutVersion()) {
// Current peak is > MLV, hence we don't need to remove that. Just
// add passed in instance.
versionedInstances.offer(new VersionedInstance<>(version, instance));
return true;
} else {
// Current peek <= MLV and > passed in version, and hence a better
// canidate. Retaining the peek, and ignoring the passed in instance.
return false;
}
} else {
// Passed in instance version > MLV (or the first version to be
// registered), hence can be registered.
versionedInstances.offer(new VersionedInstance<>(version, instance));
return true;
}
}
private boolean isValid(LayoutVersionManager lvm, int version) {
return version <= lvm.getMetadataLayoutVersion();
}
/**
* From the list of versioned instances for a given "key", this
* returns the "floor" value corresponding to the given version.
* For example, if we have key = "CreateKey", entry -> [(1, CreateKeyV1),
* (3, CreateKeyV2), and if the passed in key = CreateKey & version = 2, we
* return CreateKeyV1.
* Since this is a priority queue based implementation, we use a O(1) peek()
* lookup to get the current valid version.
* @param lvm LayoutVersionManager
* @param key Key and Version.
* @return instance.
*/
public T get(LayoutVersionManager lvm, VersionFactoryKey key) {
Integer version = key.getVersion();
// If version is not passed in, go defensive and set the highest allowed
// version (MLV).
if (version == null) {
version = lvm.getMetadataLayoutVersion();
}
checkArgument(lvm.getMetadataLayoutVersion() >= version,
String.format("Cannot get key %s since the version is greater " +
"than the Metadata layout version %d",
key, lvm.getMetadataLayoutVersion()));
String primaryKey = key.getKey();
PriorityQueue<VersionedInstance<T>> versionedInstances =
instances.get(primaryKey);
if (versionedInstances == null || versionedInstances.isEmpty()) {
throw new IllegalArgumentException(
"No suitable instance found for request : " + key);
}
VersionedInstance<T> value = versionedInstances.peek();
if (value == null || value.version > version) {
throw new IllegalArgumentException(
"No suitable instance found for request : " + key);
} else {
return value.instance;
}
}
/**
* To be called on finalization of a new LayoutFeature.
* Unregisters all the requests handlers that are there for layout versions
* before the feature's layout version.
* If the feature's layout version does not define a new handler for a
* request type, the previously registered handler remains registered.
*
* @param feature the feature to be finalized.
*/
public void finalizeFeature(LayoutFeature feature) {
Iterator<Map.Entry<String, PriorityQueue<VersionedInstance<T>>>> iterator =
instances.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, PriorityQueue<VersionedInstance<T>>> next =
iterator.next();
PriorityQueue<VersionedInstance<T>> vInstances = next.getValue();
VersionedInstance<T> prevInstance = null;
while (!vInstances.isEmpty() &&
vInstances.peek().version < feature.layoutVersion()) {
prevInstance = vInstances.poll();
LOG.info("Unregistering {} from factory. ", prevInstance.instance);
}
if ((vInstances.isEmpty() ||
vInstances.peek().version > feature.layoutVersion())
&& prevInstance != null) {
vInstances.offer(prevInstance);
}
if (vInstances.isEmpty()) {
LOG.info("Unregistering '{}' from factory since it has no entries.",
next.getKey());
iterator.remove();
}
}
}
@VisibleForTesting
protected Map<String, List<T>> getInstances() {
Map<String, List<T>> instancesCopy = new HashMap<>();
instances.forEach((key, value) -> {
List<T> collect =
value.stream().map(v -> v.instance).collect(toList());
instancesCopy.put(key, collect);
});
return Collections.unmodifiableMap(instancesCopy);
}
/**
* Class to encapsulate a instance with version. Not meant to be exposed
* outside this class.
* @param <T> instance
*/
static class VersionedInstance<T> {
private int version;
private T instance;
VersionedInstance(int version, T instance) {
this.version = version;
this.instance = instance;
}
public long getVersion() {
return version;
}
public T getInstance() {
return instance;
}
}
}