blob: 0aac138d4ae1555f704f02e5a6ea3d0118ef573f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aries.osgi.functional.internal;
import org.apache.aries.osgi.functional.CachingServiceReference;
import org.apache.aries.osgi.functional.OSGi;
import org.apache.aries.osgi.functional.OSGiResult;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference;
import org.osgi.util.tracker.ServiceTracker;
import org.osgi.util.tracker.ServiceTrackerCustomizer;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
/**
* @author Carlos Sierra Andrés
*/
public class ServiceReferenceOSGi<T>
extends OSGiImpl<CachingServiceReference<T>> {
private String _filterString;
private Class<T> _clazz;
public ServiceReferenceOSGi(String filterString, Class<T> clazz) {
super((bundleContext, op) -> {
ServiceTracker<T, AtomicReference<Runnable>>
serviceTracker = new ServiceTracker<>(
bundleContext,
buildFilter(bundleContext, filterString, clazz),
new DefaultServiceTrackerCustomizer<>(op));
return new OSGiResultImpl(
serviceTracker::open, serviceTracker::close);
});
_filterString = filterString;
_clazz = clazz;
}
@Override
public <S> OSGiImpl<S> flatMap(
Function<? super CachingServiceReference<T>, OSGi<? extends S>> fun) {
return new OSGiImpl<>((bundleContext, op) -> {
ServiceTracker<T, ?> serviceTracker =
new ServiceTracker<>(
bundleContext,
buildFilter(
bundleContext, _filterString, _clazz),
new FlatMapServiceTrackerCustomizer<>(
fun, bundleContext, op));
return new OSGiResultImpl(
serviceTracker::open, serviceTracker::close);
});
}
private static class DefaultServiceTrackerCustomizer<T>
implements ServiceTrackerCustomizer<T, AtomicReference<Runnable>> {
private final Function<CachingServiceReference<T>, Runnable>
_addedSource;
public DefaultServiceTrackerCustomizer(
Function<CachingServiceReference<T>, Runnable> addedSource) {
_addedSource = addedSource;
}
@Override
public AtomicReference<Runnable> addingService(
ServiceReference<T> reference) {
return new AtomicReference<>(
_addedSource.apply(new CachingServiceReference<>(reference)));
}
@Override
public void modifiedService(
ServiceReference<T> reference,
AtomicReference<Runnable> tupleReference) {
tupleReference.get().run();
tupleReference.set(
_addedSource.apply(new CachingServiceReference<>(reference)));
}
@Override
public void removedService(
ServiceReference<T> reference,
AtomicReference<Runnable> runnable) {
runnable.get().run();
}
}
private static class FlatMapServiceTrackerCustomizer<T, S>
implements ServiceTrackerCustomizer<T, AtomicReference<OSGiResult>> {
private final Function<? super CachingServiceReference<T>, OSGi<? extends S>>
_fun;
private final BundleContext _bundleContext;
private final Function<S, Runnable> _op;
FlatMapServiceTrackerCustomizer(
Function<? super CachingServiceReference<T>, OSGi<? extends S>> fun,
BundleContext bundleContext, Function<S, Runnable> op) {
_fun = fun;
_bundleContext = bundleContext;
_op = op;
}
@Override
public AtomicReference<OSGiResult> addingService(
ServiceReference<T> reference) {
OSGiResultImpl osgiResult = doFlatMap(
new CachingServiceReference<>(reference));
return new AtomicReference<>(osgiResult);
}
private OSGiResultImpl doFlatMap(CachingServiceReference<T> reference) {
OSGiImpl<S> program = (OSGiImpl<S>) _fun.apply(reference);
OSGiResultImpl result = program._operation.run(_bundleContext, _op);
result.start();
return result;
}
@Override
public void modifiedService(
ServiceReference<T> reference,
AtomicReference<OSGiResult> tracked) {
tracked.get().close();
tracked.set(doFlatMap(new CachingServiceReference<>(reference)));
}
@Override
public void removedService(
ServiceReference<T> reference,
AtomicReference<OSGiResult> tracked) {
tracked.get().close();
}
}
}