blob: 56b0ed8352427db19fac856b2c41b61f3750091f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aries.osgi.functional;
import org.apache.aries.functional.Function2;
import org.apache.aries.functional.Function3;
import org.apache.aries.functional.Function4;
import org.apache.aries.functional.Function5;
import org.apache.aries.functional.Function6;
import org.apache.aries.osgi.functional.internal.BundleContextOSGiImpl;
import org.apache.aries.osgi.functional.internal.BundleOSGi;
import org.apache.aries.osgi.functional.internal.ChangeContextOSGiImpl;
import org.apache.aries.osgi.functional.internal.ConfigurationOSGiImpl;
import org.apache.aries.osgi.functional.internal.ConfigurationsOSGiImpl;
import org.apache.aries.osgi.functional.internal.DistributeOSGi;
import org.apache.aries.osgi.functional.internal.IgnoreImpl;
import org.apache.aries.osgi.functional.internal.JustOSGiImpl;
import org.apache.aries.osgi.functional.internal.NothingOSGiImpl;
import org.apache.aries.osgi.functional.internal.OnCloseOSGiImpl;
import org.apache.aries.osgi.functional.internal.ServiceReferenceOSGi;
import org.apache.aries.osgi.functional.internal.ServiceRegistrationOSGiImpl;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceFactory;
import org.osgi.framework.ServiceObjects;
import org.osgi.framework.ServiceRegistration;
import java.util.Collection;
import java.util.Collections;
import java.util.Dictionary;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
/**
* @author Carlos Sierra Andrés
*/
public interface OSGi<T> extends OSGiRunnable<T> {
Runnable NOOP = () -> {};
OSGi<T> recover(BiFunction<T, Exception, T> onError);
OSGi<T> recoverWith(BiFunction<T, Exception, OSGi<T>> onError);
OSGi<T> effects(
Consumer<? super T> onAdded, Consumer<? super T> onRemoved);
static OSGi<Void> ignore(OSGi<?> program) {
return new IgnoreImpl(program);
}
<S> OSGi<S> map(Function<? super T, ? extends S> function);
<S> OSGi<S> flatMap(Function<? super T, OSGi<? extends S>> fun);
<S> OSGi<S> then(OSGi<S> next);
OSGi<Void> foreach(Consumer<? super T> onAdded);
OSGi<Void> foreach(
Consumer<? super T> onAdded, Consumer<? super T> onRemoved);
<S> OSGi<S> transformer(Function<Function<S, Runnable>, Function<T, Runnable>> fun);
static OSGi<BundleContext> bundleContext() {
return new BundleContextOSGiImpl();
}
static OSGi<Bundle> bundles(int stateMask) {
return new BundleOSGi(stateMask);
}
static <T> OSGi<T> changeContext(
BundleContext bundleContext, OSGi<T> program) {
return new ChangeContextOSGiImpl<>(program, bundleContext);
}
static OSGi<Dictionary<String, ?>> configuration(String pid) {
return new ConfigurationOSGiImpl(pid);
}
static OSGi<Dictionary<String, ?>> configurations(String factoryPid) {
return new ConfigurationsOSGiImpl(factoryPid);
}
static <S> OSGi<S> just(S s) {
return new JustOSGiImpl<>(s);
}
static <S> OSGi<S> just(Collection<S> s) {
return new JustOSGiImpl<>(s);
}
static <S> OSGi<S> just(Supplier<S> s) {
return new JustOSGiImpl<>(() -> Collections.singletonList(s.get()));
}
static <S> OSGi<S> join(OSGi<OSGi<S>> program) {
return program.flatMap(x -> x);
}
static <S> OSGi<S> nothing() {
return new NothingOSGiImpl<>();
}
static OSGi<Void> onClose(Runnable action) {
return new OnCloseOSGiImpl(action);
}
static OSGi<ServiceObjects<Object>> prototypes(String filterString) {
return prototypes(null, filterString);
}
static <T> OSGi<ServiceObjects<T>> prototypes(Class<T> clazz) {
return prototypes(clazz, null);
}
static <T> OSGi<ServiceObjects<T>> prototypes(
Class<T> clazz, String filterString) {
return
bundleContext().flatMap(
bundleContext ->
serviceReferences(clazz, filterString).map(
CachingServiceReference::getServiceReference
).map(
bundleContext::getServiceObjects)
);
}
static <T> OSGi<ServiceRegistration<T>> register(
Class<T> clazz, T service, Map<String, Object> properties) {
return new ServiceRegistrationOSGiImpl<>(clazz, service, properties);
}
static <T> OSGi<ServiceRegistration<T>> register(
Class<T> clazz, ServiceFactory<T> service, Map<String, Object> properties) {
return new ServiceRegistrationOSGiImpl<>(clazz, service, properties);
}
static OSGi<ServiceRegistration<?>> register(
String[] classes, Object service, Map<String, ?> properties) {
return new ServiceRegistrationOSGiImpl(classes, service, properties);
}
static <T> OSGi<T> services(Class<T> clazz) {
return services(clazz, null);
}
static <T> OSGi<Object> services(String filterString) {
return services(null, filterString);
}
static <T> OSGi<T> services(Class<T> clazz, String filterString) {
return
bundleContext().flatMap(
bundleContext ->
serviceReferences(clazz, filterString).map(
CachingServiceReference::getServiceReference
).flatMap(
sr -> {
T service = bundleContext.getService(sr);
return
onClose(() -> bundleContext.ungetService(sr)).then(
just(service)
);
}
));
}
public static <T> OSGi<T> once(OSGi<T> program) {
return program.transformer(op -> {
AtomicInteger count = new AtomicInteger();
AtomicReference<Runnable> terminator = new AtomicReference<>();
return t -> {
if (count.getAndIncrement() == 0) {
terminator.set(op.apply(t));
}
return () -> {
if (count.decrementAndGet() == 0) {
Runnable runnable = terminator.getAndSet(NOOP);
runnable.run();
}
};
};
});
}
static <T> OSGi<CachingServiceReference<T>> serviceReferences(
Class<T> clazz) {
return new ServiceReferenceOSGi<>(null, clazz);
}
static OSGi<CachingServiceReference<Object>> serviceReferences(
String filterString) {
return new ServiceReferenceOSGi<>(filterString, null);
}
static <T> OSGi<CachingServiceReference<T>> serviceReferences(
Class<T> clazz, String filterString) {
return new ServiceReferenceOSGi<>(filterString, clazz);
}
static <T> OSGi<CachingServiceReference<T>> serviceReferences(
Class<T> clazz, String filterString,
Refresher<? super CachingServiceReference<T>> onModified) {
return new ServiceReferenceOSGi<>(filterString, clazz, onModified);
}
static <T> OSGi<CachingServiceReference<T>> serviceReferences(
Class<T> clazz, Refresher<? super CachingServiceReference<T>> onModified) {
return new ServiceReferenceOSGi<>(null, clazz, onModified);
}
static OSGi<CachingServiceReference<Object>> serviceReferences(
String filterString,
Refresher<? super CachingServiceReference<Object>> onModified) {
return new ServiceReferenceOSGi<>(filterString, null, onModified);
}
@SafeVarargs
static <T> OSGi<T> all(OSGi<T> ... programs) {
return new DistributeOSGi<>(programs);
}
OSGi<T> filter(Predicate<T> predicate);
public default <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) {
return fun.flatMap(this::map);
}
public static <A, B, C> OSGi<C> combine(Function2<A, B, C> fun, OSGi<A> a, OSGi<B> b) {
return b.applyTo(a.applyTo(just(fun.curried())));
}
public static <A, B, C, D> OSGi<D> combine(Function3<A, B, C, D> fun, OSGi<A> a, OSGi<B> b, OSGi<C> c) {
return c.applyTo(OSGi.combine((A aa, B bb) -> fun.curried().apply(aa).apply(bb), a, b));
}
public static <A, B, C, D, E> OSGi<E> combine(Function4<A, B, C, D, E> fun, OSGi<A> a, OSGi<B> b, OSGi<C> c, OSGi<D> d) {
return d.applyTo(OSGi.combine((A aa, B bb, C cc) -> fun.curried().apply(aa).apply(bb).apply(cc), a, b, c));
}
public static <A, B, C, D, E, F> OSGi<F> combine(Function5<A, B, C, D, E, F> fun, OSGi<A> a, OSGi<B> b, OSGi<C> c, OSGi<D> d, OSGi<E> e) {
return e.applyTo(OSGi.combine((A aa, B bb, C cc, D dd) -> fun.curried().apply(aa).apply(bb).apply(cc).apply(dd), a, b, c, d));
}
public static <A, B, C, D, E, F, G> OSGi<G> combine(Function6<A, B, C, D, E, F, G> fun, OSGi<A> a, OSGi<B> b, OSGi<C> c, OSGi<D> d, OSGi<E> e, OSGi<F> f) {
return f.applyTo(OSGi.combine((A aa, B bb, C cc, D dd, E ee) -> fun.curried().apply(aa).apply(bb).apply(cc).apply(dd).apply(ee), a, b, c, d, e));
}
}