blob: d41e37afab00ff6e6455f263cfb47397b64cb12b [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.function;
import java.io.File;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
import java.util.UUID;
/**
* Common functions and functional utilities.
*
*/
public class Functions {
/**
* Single instance of the identity function.
*/
private static final UnaryOperator<Object> IDENTITY = t -> t;
/**
* Single instance of constant function that returns zero.
*/
private static final Function<Object,Integer> ZERO = t -> 0;
/**
* Returns the identity function that returns its single argument.
* @param <T> tuple type
* @return Identity function that returns its single argument.
*/
@SuppressWarnings("unchecked")
public static <T> UnaryOperator<T> identity() {
return (UnaryOperator<T>) IDENTITY;
}
/**
* Returns a constant function that returns zero (0).
* @param <T> tuple type
* @return Constant function that returns zero (0).
*/
@SuppressWarnings("unchecked")
public static <T> Function<T,Integer> zero() {
return (Function<T, Integer>) ZERO;
}
/**
* Returns a constant function that returns zero (0).
* This is identical to {@link #zero()} but is more
* readable when applied as a key function.
* @param <T> tuple type
* @return Constant function that returns zero (0).
*/
public static <T> Function<T,Integer> unpartitioned() {
return zero();
}
/**
* Close the function.
* If {@code function} is an instance of {@code AutoCloseable}
* then close is called, otherwise no action is taken.
* @param function Function to be closed.
* @throws Exception Error throwing function.
*/
public static void closeFunction(Object function) throws Exception {
AutoCloseable closeable = WrappedFunction.unwrap(AutoCloseable.class, function);
if (closeable != null)
closeable.close();
}
/**
* Return a thread-safe version of a {@code Function} function.
* If the function is guaranteed to be immutable (stateless)
* then the function is returned, as it is thread safe,
* otherwise a wrapper is returned that grabs synchronization
* on {@code function} when calling {@link Function#apply(Object)}.
* <BR>
* If {@code function} implements {@code AutoCloseable} then
* the function is assumed to be stateful and a thread-safe
* version is returned.
* @param <T> source tuple type
* @param <R> result tuple type
* @param function Function to return a thread-safe version of.
* @return A thread-safe function
*/
public static <T,R> Function<T,R> synchronizedFunction(final Function<T,R> function) {
if (isImmutable(function) && !(function instanceof AutoCloseable))
return function;
// Return a function that is synchronized on the passed in function reference.
return new ThreadSafeFunction<T,R>(function);
}
private static class ThreadSafeFunction<T,R> extends WrappedFunction<Function<T,R>> implements Function<T,R> {
private static final long serialVersionUID = 1L;
ThreadSafeFunction(Function<T,R> function) {
super(function);
}
@Override
public R apply(T value) {
final Function<T,R> function = f();
synchronized (function) {
return function.apply(value);
}
}
}
/**
* Return a thread-safe version of a {@code Supplier} function.
* If the function is guaranteed to be immutable (stateless)
* then the function is returned, as it is thread safe,
* otherwise a wrapper is returned that grabs synchronization
* on {@code function} when calling {@link Supplier#get()}.
* <BR>
* If {@code function} implements {@code AutoCloseable} then
* the function is assumed to be stateful and a thread-safe
* version is returned.
* @param <T> tuple type
* @param function Function to return a thread-safe version of.
* @return A thread-safe function
*/
public static <T> Supplier<T> synchronizedSupplier(final Supplier<T> function) {
if (isImmutable(function) && !(function instanceof AutoCloseable))
return function;
// Return a function that is synchronized on the passed in function reference.
return new ThreadSafeSupplier<T>(function);
}
private static class ThreadSafeSupplier<T> extends WrappedFunction<Supplier<T>> implements Supplier<T> {
private static final long serialVersionUID = 1L;
ThreadSafeSupplier(Supplier<T> function) {
super(function);
}
@Override
public T get() {
final Supplier<T> function = f();
synchronized (function) {
return function.get();
}
}
}
/**
* Return a thread-safe version of a {@code Consumer} function.
* If the function is guaranteed to be immutable (stateless)
* then the function is returned, as it is thread safe,
* otherwise a wrapper is returned that grabs synchronization
* on {@code function} when calling {@link Consumer#accept(Object)}.
* <BR>
* If {@code function} implements {@code AutoCloseable} then
* the function is assumed to be stateful and a thread-safe
* version is returned.
* @param <T> tuple type
* @param function Function to return a thread-safe version of.
* @return A thread-safe function
*/
public static <T> Consumer<T> synchronizedConsumer(final Consumer<T> function) {
if (isImmutable(function) && !(function instanceof AutoCloseable))
return function;
// Return a function that is synchronized on the passed in function reference.
return new ThreadSafeConsumer<T>(function);
}
private static class ThreadSafeConsumer<T>
extends WrappedFunction<Consumer<T>> implements Consumer<T> {
private static final long serialVersionUID = 1L;
ThreadSafeConsumer(Consumer<T> function) {
super(function);
}
@Override
public void accept(T value) {
final Consumer<T> function = f();
synchronized (function) {
function.accept(value);
}
}
}
/**
* Return a thread-safe version of a {@code BiFunction} function.
* If the function is guaranteed to be immutable (stateless)
* then the function is returned, as it is thread safe,
* otherwise a wrapper is returned that grabs synchronization
* on {@code function} when calling {@link BiFunction#apply(Object, Object)}.
* <BR>
* If {@code function} implements {@code AutoCloseable} then
* the function is assumed to be stateful and a thread-safe
* version is returned.
* @param <T> Type of function's first argument
* @param <U> Type of function's second argument
* @param <R> Type of function's third argument
* @param function Function to return a thread-safe version of.
* @return A thread-safe function
*/
public static <T,U,R> BiFunction<T,U,R> synchronizedBiFunction(final BiFunction<T,U,R> function) {
if (isImmutable(function) && !(function instanceof AutoCloseable))
return function;
// Return a function that is synchronized on the passed in function reference.
return new ThreadSafeBiFunction<T,U,R>(function);
}
private static class ThreadSafeBiFunction<T,U,R>
extends WrappedFunction<BiFunction<T,U,R>> implements BiFunction<T,U,R> {
private static final long serialVersionUID = 1L;
ThreadSafeBiFunction(BiFunction<T,U,R> function) {
super(function);
}
@Override
public R apply(T t, U u) {
final BiFunction<T,U,R> function = f();
synchronized (function) {
return function.apply(t, u);
}
}
}
/**
* See if the functional logic is immutable.
*
* Logic is stateful if:
* Has a non-final instance field.
* Has a final instance field that is not a primitive
* or a known immutable object.
*
* @param function Function to check
* @return True if the function is immutable..
*/
public static boolean isImmutable(Object function) {
return isImmutableClass(function.getClass());
}
/**
* See if a function class is immutable.
*
* Logic is stateful if:
* Has a non-final instance field.
* Has a final instance field that is not a primitive
* or a known immutable object.
*
* @param clazz Class to check
* @return True if the function is immutable..
*/
public static boolean isImmutableClass(Class<?> clazz) {
do {
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
if (Modifier.isStatic(field.getModifiers()))
continue;
if (Modifier.isTransient(field.getModifiers()))
continue;
if (!Modifier.isFinal(field.getModifiers()))
return false;
if (field.getType().isPrimitive())
continue;
if (immutableClasses.contains(field.getType()))
continue;
if (field.getType().isEnum()) {
if (isImmutable(field.getType()))
continue;
}
return false;
}
clazz = clazz.getSuperclass();
} while (!Object.class.equals(clazz));
return true;
}
private static final Set<Class<?>> immutableClasses = new HashSet<>();
static {
immutableClasses.add(String.class);
immutableClasses.add(Boolean.class);
immutableClasses.add(Byte.class);
immutableClasses.add(Short.class);
immutableClasses.add(Integer.class);
immutableClasses.add(Long.class);
immutableClasses.add(BigInteger.class);
immutableClasses.add(BigDecimal.class);
immutableClasses.add(Float.class);
immutableClasses.add(Double.class);
immutableClasses.add(File.class);
immutableClasses.add(Character.class);
immutableClasses.add(Locale.class);
immutableClasses.add(UUID.class);
}
/**
* Create a {@code Runnable} that calls
* {@code consumer.accept(value)} when {@code run()} is called.
* This can be used to delay the execution of the consumer
* until some time in the future using an executor service.
*
* @param <T> tuple type
* @param consumer Function to be applied to {@code value}.
* @param value Value to be consumed.
*
* @return {@code Runnable} that invokes {@code consumer.accept(value)}.
*/
public static <T> Runnable delayedConsume(Consumer<T> consumer, T value) {
return () -> consumer.accept(value);
}
/**
* Wrap a {@code Runnable} with a final action that
* is always called when {@code action.run()} completes.
* @param action Action to be invoked before {@code finalAction}.
* @param finalAction Action to be invoked after {@code action.run()} is called.
* @return {@code Runnable} that invokes {@code action.run()} and then {@code finalAction.run()}
*/
public static Runnable runWithFinal(Runnable action, Runnable finalAction) {
return () -> { try {action.run(); } finally {finalAction.run();}};
}
/**
* A Consumer that discards all items passed to it.
*/
private static final Consumer<Object> DISCARDER = t -> {};
/**
* A Consumer that discards all items passed to it.
* @param <T> tuple type
* @return A Consumer that discards all items passed to it.
*/
@SuppressWarnings("unchecked")
public static <T> Consumer<T> discard() {
return (Consumer<T>) DISCARDER;
}
/**
* A Predicate that is always true.
*/
private static final Predicate<Object> TRUE = t -> true;
/**
* A Predicate that is always false.
*/
private static final Predicate<Object> FALSE = t -> false;
/**
* A Predicate that is always true
* @param <T> tuple type
* @return A Predicate that is always true.
*/
@SuppressWarnings("unchecked")
public static <T> Predicate<T> alwaysTrue() {
return (Predicate<T>) TRUE;
}
/**
* A Predicate that is always false
* @param <T> tuple type
* @return A Predicate that is always false.
*/
@SuppressWarnings("unchecked")
public static <T> Predicate<T> alwaysFalse() {
return (Predicate<T>) FALSE;
}
}