blob: a1087c7c8bc208e123bdbe8a32c5c668e48c542f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.hadoop.impl.fs;
import java.io.Closeable;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
/**
* Maps values by keys.
* Values are created lazily using {@link ValueFactory}.
*
* Despite of the name, does not depend on any Hadoop classes.
*/
public class HadoopLazyConcurrentMap<K, V extends Closeable> {
/** The map storing the actual values. */
private final ConcurrentMap<K, ValueWrapper> map = new ConcurrentHashMap<>();
/** The factory passed in by the client. Will be used for lazy value creation. */
private final ValueFactory<K, V> factory;
/** Lock used to close the objects. */
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
/** Flag indicating that this map is closed and cleared. */
private boolean closed;
/**
* Constructor.
* @param factory the factory to create new values lazily.
*/
public HadoopLazyConcurrentMap(ValueFactory<K, V> factory) {
this.factory = factory;
}
/**
* Gets cached or creates a new value of V.
* Never returns null.
* @param k the key to associate the value with.
* @return the cached or newly created value, never null.
* @throws IgniteException on error
*/
public V getOrCreate(K k) {
ValueWrapper w = map.get(k);
if (w == null) {
closeLock.readLock().lock();
try {
if (closed)
throw new IllegalStateException("Failed to create value for key [" + k
+ "]: the map is already closed.");
final ValueWrapper wNew = new ValueWrapper(k);
w = map.putIfAbsent(k, wNew);
if (w == null) {
wNew.init();
w = wNew;
}
}
finally {
closeLock.readLock().unlock();
}
}
try {
V v = w.getValue();
assert v != null;
return v;
}
catch (IgniteCheckedException ie) {
throw new IgniteException(ie);
}
}
/**
* Clears the map and closes all the values.
*/
public void close() throws IgniteCheckedException {
closeLock.writeLock().lock();
try {
if (closed)
return;
closed = true;
Exception err = null;
Set<K> keySet = map.keySet();
for (K key : keySet) {
V v = null;
try {
v = map.get(key).getValue();
}
catch (IgniteCheckedException ignore) {
// No-op.
}
if (v != null) {
try {
v.close();
}
catch (Exception err0) {
if (err == null)
err = err0;
}
}
}
map.clear();
if (err != null)
throw new IgniteCheckedException(err);
}
finally {
closeLock.writeLock().unlock();
}
}
/**
* Helper class that drives the lazy value creation.
*/
private class ValueWrapper {
/** Future. */
private final GridFutureAdapter<V> fut = new GridFutureAdapter<>();
/** the key */
private final K key;
/**
* Creates new wrapper.
*/
private ValueWrapper(K key) {
this.key = key;
}
/**
* Initializes the value using the factory.
*/
private void init() {
try {
final V v0 = factory.createValue(key);
if (v0 == null)
throw new IgniteException("Failed to create non-null value. [key=" + key + ']');
fut.onDone(v0);
}
catch (Throwable e) {
fut.onDone(e);
}
}
/**
* Gets the available value or blocks until the value is initialized.
* @return the value, never null.
* @throws IgniteCheckedException on error.
*/
V getValue() throws IgniteCheckedException {
return fut.get();
}
}
/**
* Interface representing the factory that creates map values.
* @param <K> the type of the key.
* @param <V> the type of the value.
*/
public interface ValueFactory<K, V> {
/**
* Creates the new value. Should never return null.
*
* @param key the key to create value for
* @return the value.
* @throws IOException On failure.
*/
public V createValue(K key) throws IOException;
}
}