blob: 70f5e8f626d0b21c6b66528b035e6b80ded236bc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aries.jax.rs.whiteboard.internal;
import org.apache.aries.jax.rs.whiteboard.internal.cxf.CxfJaxrsServiceRegistrator;
import org.apache.aries.jax.rs.whiteboard.internal.utils.Utils;
import org.apache.aries.jax.rs.whiteboard.internal.utils.PropertyHolder;
import org.apache.aries.jax.rs.whiteboard.internal.utils.ServiceTuple;
import org.apache.aries.component.dsl.CachingServiceReference;
import org.apache.aries.component.dsl.OSGi;
import org.apache.aries.component.dsl.OSGiResult;
import org.apache.cxf.Bus;
import org.apache.cxf.bus.extension.ExtensionManagerBus;
import org.apache.cxf.transport.servlet.CXFNonSpringServlet;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.framework.Filter;
import org.osgi.framework.FrameworkUtil;
import org.osgi.framework.InvalidSyntaxException;
import org.osgi.framework.PrototypeServiceFactory;
import org.osgi.framework.ServiceReference;
import org.osgi.framework.ServiceRegistration;
import org.osgi.framework.wiring.BundleWiring;
import org.osgi.service.http.context.ServletContextHelper;
import org.osgi.service.jaxrs.runtime.JaxrsServiceRuntime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.Servlet;
import javax.ws.rs.container.ContainerRequestFilter;
import javax.ws.rs.container.ContainerResponseFilter;
import javax.ws.rs.container.DynamicFeature;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.Feature;
import javax.ws.rs.ext.ContextResolver;
import javax.ws.rs.ext.ExceptionMapper;
import javax.ws.rs.ext.MessageBodyReader;
import javax.ws.rs.ext.MessageBodyWriter;
import javax.ws.rs.ext.ParamConverterProvider;
import javax.ws.rs.ext.ReaderInterceptor;
import javax.ws.rs.ext.WriterInterceptor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import static java.lang.String.format;
import static java.util.stream.Collectors.toMap;
import static org.apache.aries.jax.rs.whiteboard.internal.AriesJaxrsServiceRuntime.getServiceName;
import static org.apache.aries.jax.rs.whiteboard.internal.utils.LogUtils.ifDebugEnabled;
import static org.apache.aries.jax.rs.whiteboard.internal.utils.LogUtils.ifErrorEnabled;
import static org.apache.aries.jax.rs.whiteboard.internal.utils.Utils.canonicalize;
import static org.apache.aries.jax.rs.whiteboard.internal.utils.Utils.canonicalizeAddress;
import static org.apache.aries.jax.rs.whiteboard.internal.utils.Utils.generateApplicationName;
import static org.apache.aries.jax.rs.whiteboard.internal.utils.Utils.getProperties;
import static org.apache.aries.jax.rs.whiteboard.internal.utils.Utils.getString;
import static org.apache.aries.jax.rs.whiteboard.internal.utils.Utils.highestPer;
import static org.apache.aries.jax.rs.whiteboard.internal.utils.Utils.mergePropertyMaps;
import static org.apache.aries.jax.rs.whiteboard.internal.utils.Utils.onlyGettables;
import static org.apache.aries.jax.rs.whiteboard.internal.utils.Utils.service;
import static org.apache.aries.jax.rs.whiteboard.internal.utils.Utils.updateProperty;
import static org.apache.aries.component.dsl.OSGi.NOOP;
import static org.apache.aries.component.dsl.OSGi.all;
import static org.apache.aries.component.dsl.OSGi.changeContext;
import static org.apache.aries.component.dsl.OSGi.effects;
import static org.apache.aries.component.dsl.OSGi.ignore;
import static org.apache.aries.component.dsl.OSGi.just;
import static org.apache.aries.component.dsl.OSGi.nothing;
import static org.apache.aries.component.dsl.OSGi.once;
import static org.apache.aries.component.dsl.OSGi.register;
import static org.apache.aries.component.dsl.OSGi.serviceReferences;
import static org.apache.aries.component.dsl.Utils.accumulateInMap;
import static org.apache.aries.component.dsl.Utils.highest;
import static org.osgi.framework.Constants.SERVICE_PID;
import static org.osgi.service.http.whiteboard.HttpWhiteboardConstants.HTTP_WHITEBOARD_CONTEXT_NAME;
import static org.osgi.service.http.whiteboard.HttpWhiteboardConstants.HTTP_WHITEBOARD_CONTEXT_PATH;
import static org.osgi.service.http.whiteboard.HttpWhiteboardConstants.HTTP_WHITEBOARD_CONTEXT_SELECT;
import static org.osgi.service.http.whiteboard.HttpWhiteboardConstants.HTTP_WHITEBOARD_DEFAULT_CONTEXT_NAME;
import static org.osgi.service.http.whiteboard.HttpWhiteboardConstants.HTTP_WHITEBOARD_SERVLET_ASYNC_SUPPORTED;
import static org.osgi.service.http.whiteboard.HttpWhiteboardConstants.HTTP_WHITEBOARD_SERVLET_NAME;
import static org.osgi.service.http.whiteboard.HttpWhiteboardConstants.HTTP_WHITEBOARD_SERVLET_PATTERN;
import static org.osgi.service.http.whiteboard.HttpWhiteboardConstants.HTTP_WHITEBOARD_TARGET;
import static org.osgi.service.jaxrs.runtime.JaxrsServiceRuntimeConstants.JAX_RS_SERVICE_ENDPOINT;
import static org.osgi.service.jaxrs.whiteboard.JaxrsWhiteboardConstants.JAX_RS_APPLICATION_BASE;
import static org.osgi.service.jaxrs.whiteboard.JaxrsWhiteboardConstants.JAX_RS_APPLICATION_SELECT;
import static org.osgi.service.jaxrs.whiteboard.JaxrsWhiteboardConstants.JAX_RS_DEFAULT_APPLICATION;
import static org.osgi.service.jaxrs.whiteboard.JaxrsWhiteboardConstants.JAX_RS_EXTENSION;
import static org.osgi.service.jaxrs.whiteboard.JaxrsWhiteboardConstants.JAX_RS_EXTENSION_SELECT;
import static org.osgi.service.jaxrs.whiteboard.JaxrsWhiteboardConstants.JAX_RS_NAME;
import static org.osgi.service.jaxrs.whiteboard.JaxrsWhiteboardConstants.JAX_RS_RESOURCE;
import static org.osgi.service.jaxrs.whiteboard.JaxrsWhiteboardConstants.JAX_RS_WHITEBOARD_TARGET;
/**
* @author Carlos Sierra Andrés
*/
public class Whiteboard {
public static final Map<String, Class<?>> SUPPORTED_EXTENSION_INTERFACES =
Collections.unmodifiableMap(
Stream.of(ContainerRequestFilter.class,
ContainerResponseFilter.class,
ReaderInterceptor.class,
WriterInterceptor.class,
MessageBodyReader.class,
MessageBodyWriter.class,
ContextResolver.class,
ExceptionMapper.class,
ParamConverterProvider.class,
Feature.class,
DynamicFeature.class,
org.apache.cxf.feature.Feature.class,
org.apache.cxf.jaxrs.ext.ContextProvider.class)
.collect(toMap(Class::getName, Function.identity())));
static final String DEFAULT_NAME = ".default";
private static final Logger _log = LoggerFactory.getLogger(
Whiteboard.class);
private final String _applicationBasePrefix;
private final ApplicationExtensionRegistry _applicationExtensionRegistry;
private final ExtensionRegistry _extensionRegistry;
private final AriesJaxrsServiceRuntime _runtime;
private final Map<String, ?> _configurationMap;
private volatile BundleContext _bundleContext;
private volatile ServiceRegistrationChangeCounter _counter;
private volatile ServiceReference<?> _runtimeReference;
private final OSGi<Void> _program;
private final List<Object> _endpoints;
private volatile ServiceRegistration<?> _runtimeRegistration;
private OSGiResult _osgiResult;
private Whiteboard(Dictionary<String, ?> configuration) {
_runtime = new AriesJaxrsServiceRuntime(this);
_configurationMap = Maps.from(configuration);
_endpoints = new ArrayList<>();
_applicationExtensionRegistry = new ApplicationExtensionRegistry();
_extensionRegistry = new ExtensionRegistry();
_applicationBasePrefix = canonicalizeAddress(
getString(_configurationMap.get("application.base.prefix")));
_program =
all(
ignore(registerDefaultApplication()),
ignore(getAllServices())
);
}
public static Whiteboard createWhiteboard(
Dictionary<String, ?> configuration) {
return new Whiteboard(configuration);
}
public void start(BundleContext bundleContext) {
_bundleContext = bundleContext;
_runtimeRegistration = registerJaxRSServiceRuntime(
new HashMap<>(_configurationMap));
_runtimeReference = _runtimeRegistration.getReference();
_counter = new ServiceRegistrationChangeCounter(_runtimeRegistration);
_osgiResult = _program.run(bundleContext);
}
public void stop() {
_osgiResult.close();
_runtimeRegistration.unregister();
_applicationExtensionRegistry.close();
_extensionRegistry.close();
}
public void addHttpEndpoints(List<String> endpoints) {
synchronized (_runtimeRegistration) {
_endpoints.addAll(endpoints);
updateProperty(
_runtimeRegistration, JAX_RS_SERVICE_ENDPOINT, _endpoints);
}
}
private OSGi<?> applicationExtensions(
OSGi<CachingServiceReference<Object>> extensions) {
return
onlyValid(
onlySupportedInterfaces(
extensions,
_runtime::addInvalidExtension,
_runtime::removeInvalidExtension),
_runtime::addInvalidExtension,
_runtime::removeInvalidExtension).
effects(
_extensionRegistry::registerExtension,
_extensionRegistry::unregisterExtension).
flatMap(extensionReference ->
chooseApplication(
extensionReference, allApplicationReferences(),
_runtime::addApplicationDependentExtension,
_runtime::removeApplicationDependentExtension).
flatMap(registratorReference ->
service(registratorReference).flatMap(registrator ->
waitForExtensionDependencies(
extensionReference, registratorReference,
_runtime::addDependentExtension,
_runtime::removeDependentExtension).
then(
safeRegisterExtension(
extensionReference, registratorReference::getProperty,
registrator)
))));
}
private OSGi<?> applicationResources(
OSGi<CachingServiceReference<Object>> resources) {
return
onlyValid(
resources, _runtime::addInvalidResource,
_runtime::removeInvalidResource).
flatMap(resourceReference ->
chooseApplication(
resourceReference, defaultApplication(),
_runtime::addApplicationDependentResource,
_runtime::removeApplicationDependentResource).
flatMap(registratorReference ->
service(registratorReference).flatMap(registrator ->
waitForExtensionDependencies(
resourceReference, registratorReference,
_runtime::addDependentService,
_runtime::removeDependentService).
then(
safeRegisterEndpoint(
resourceReference, registratorReference::getProperty,
registrator)
))));
}
@SuppressWarnings("unchecked")
private OSGi<?> getAllServices() {
OSGi<CachingServiceReference<Object>> applicationsForWhiteboard =
(OSGi)getApplicationsForWhiteboard();
return
highestPer(
sr -> just(getServiceName(sr::getProperty)),
countChanges(
all(
getResourcesForWhiteboard(),
getApplicationExtensionsForWhiteboard(),
applicationsForWhiteboard
)
),
this::registerShadowedService,
this::unregisterShadowedService
).
effects(
_runtime::addServiceForName,
_runtime::removedServiceForName
).
distribute(
p -> ignore(applications((OSGi)p.filter(this::isApplication))),
p -> ignore(applicationResources(p.filter(this::isResource))),
p -> ignore(applicationExtensions(p.filter(this::isExtension)))
);
}
private boolean isApplication(CachingServiceReference<?> sr) {
return _applicationsFilter.match(sr.getServiceReference());
}
private boolean isExtension(CachingServiceReference<?> sr) {
return _extensionsFilter.match(sr.getServiceReference());
}
private boolean isResource(CachingServiceReference<?> sr) {
return _resourcesFilter.match(sr.getServiceReference());
}
private <T> boolean matchesWhiteboard(CachingServiceReference<T> ref) {
String target = (String)ref.getProperty(JAX_RS_WHITEBOARD_TARGET);
if (target == null) {
return true;
}
Filter filter;
try {
filter = FrameworkUtil.createFilter(target);
}
catch (InvalidSyntaxException ise) {
if (_log.isErrorEnabled()) {
_log.error(
"Invalid '{}' filter syntax in {}",
JAX_RS_WHITEBOARD_TARGET, ref);
}
return false;
}
return filter.match(_runtimeReference);
}
private void registerShadowedService(CachingServiceReference<?> sr) {
if (isApplication(sr)) {
_runtime.addClashingApplication(sr);
}
if (isExtension(sr)) {
_runtime.addClashingExtension(sr);
}
if (isResource(sr)) {
_runtime.addClashingResource(sr);
}
}
private void unregisterShadowedService(CachingServiceReference<?> sr) {
if (isApplication(sr)) {
_runtime.removeClashingApplication(sr);
}
if (isExtension(sr)) {
_runtime.removeClashingExtension(sr);
}
if (isResource(sr)) {
_runtime.removeClashingResource(sr);
}
}
private static <T> OSGi<CachingServiceReference<T>> onlyValid(
OSGi<CachingServiceReference<T>> serviceReferences,
Consumer<CachingServiceReference<T>> onAddingInvalid,
Consumer<CachingServiceReference<T>> onRemovingInvalid) {
return serviceReferences.flatMap(serviceReference -> {
OSGi<CachingServiceReference<T>> error = effects(
() -> onAddingInvalid.accept(serviceReference),
() -> onRemovingInvalid.accept(serviceReference)
).
effects(
ifDebugEnabled(_log, () -> "Invalid service {}"),
ifDebugEnabled(_log, () -> "Invalid service {} is gone")
).
then(
nothing()
);
Object propertyObject = serviceReference.getProperty(JAX_RS_NAME);
if (propertyObject != null &&
!propertyObject.toString().equals(JAX_RS_DEFAULT_APPLICATION) &&
propertyObject.toString().startsWith(".")) {
if (_log.isWarnEnabled()) {
_log.warn(
"Invalid property {} in service {}",
JAX_RS_DEFAULT_APPLICATION, serviceReference);
}
return error;
}
if (!testFilters(
serviceReference.getProperty(JAX_RS_APPLICATION_SELECT))) {
if (_log.isWarnEnabled()) {
_log.warn(
"Invalid value for property {} in service {}",
JAX_RS_APPLICATION_SELECT, serviceReference);
}
return error;
}
if (!testFilters(
serviceReference.getProperty(JAX_RS_EXTENSION_SELECT))) {
if (_log.isWarnEnabled()) {
_log.warn(
"Invalid value for property {} in service {}",
JAX_RS_EXTENSION_SELECT, serviceReference);
}
return error;
}
return just(serviceReference);
});
}
private static boolean testFilters(Object propertyObject) {
if (propertyObject != null) {
try {
String[] properties = canonicalize(propertyObject);
for (String property : properties) {
FrameworkUtil.createFilter(property);
}
}
catch (InvalidSyntaxException e) {
return false;
}
}
return true;
}
private OSGi<ApplicationReferenceWithContext> waitForApplicationContext(
OSGi<CachingServiceReference<Application>> application,
Consumer<CachingServiceReference<Application>> onWaitingForContext,
Consumer<CachingServiceReference<Application>> onResolvedContext) {
return application.flatMap(serviceReference -> {
Object propertyObject = serviceReference.getProperty(
HTTP_WHITEBOARD_CONTEXT_SELECT);
if (propertyObject == null) {
propertyObject = _configurationMap.get(
HTTP_WHITEBOARD_CONTEXT_SELECT);
if (propertyObject == null) {
return just(
new ApplicationReferenceWithContext(
null, serviceReference));
}
}
String contextSelect = propertyObject.toString();
try {
FrameworkUtil.createFilter(contextSelect);
}
catch (InvalidSyntaxException e) {
return effects(
() -> _runtime.addInvalidApplication(serviceReference),
() -> _runtime.removeInvalidApplication(serviceReference)
).then(
nothing()
);
}
return
effects(
() -> onWaitingForContext.accept(serviceReference),
() -> onResolvedContext.accept(serviceReference)
).then(
highest(
serviceReferences(ServletContextHelper.class, contextSelect)
).flatMap(
sr -> just(
new ApplicationReferenceWithContext(sr, serviceReference))
).effects(
__ -> onResolvedContext.accept(serviceReference),
__ -> onWaitingForContext.accept(serviceReference)
));
});
}
private OSGi<?> applications(
OSGi<CachingServiceReference<Application>> applications) {
OSGi<CachingServiceReference<Application>> applicationsForWhiteboard =
waitForApplicationDependencies(
onlyValid(
applications,
_runtime::addInvalidApplication,
_runtime::removeInvalidApplication)
);
OSGi<ApplicationReferenceWithContext> applicationsWithContext =
waitForApplicationContext(
applicationsForWhiteboard,
_runtime::addContextDependentApplication,
_runtime::removeContextDependentApplication);
OSGi<ApplicationReferenceWithContext> highestRankedPerPath =
highestPer(
arwc -> just(arwc.getActualBasePath()),
applicationsWithContext,
t -> _runtime.addShadowedApplication(
t.getApplicationReference(), t.getActualBasePath()),
t -> _runtime.removeShadowedApplication(
t.getApplicationReference())
);
return highestRankedPerPath.flatMap(application ->
onlyGettables(
just(application.getApplicationReference()),
_runtime::addNotGettableApplication,
_runtime::removeNotGettableApplication, _log).
recoverWith(
(t, e) ->
just(t).map(
ServiceTuple::getCachingServiceReference
).effects(
_runtime::addErroredApplication,
_runtime::removeErroredApplication
).then(
nothing()
)
).
flatMap(at ->
deployApplication(at, application.getContextReference()).foreach(
registrator ->
_runtime.setApplicationForPath(
getApplicationBase(
at.getCachingServiceReference()::getProperty),
at.getCachingServiceReference(),
registrator),
registrator ->
_runtime.unsetApplicationForPath(
getApplicationBase(
at.getCachingServiceReference()::getProperty))
)
));
}
private ExtensionManagerBus createBus(
Map<String, ServiceTuple<Object>> extensions) {
BundleWiring wiring = _bundleContext.getBundle().adapt(
BundleWiring.class);
@SuppressWarnings("unchecked")
Map<String, Object> properties = new HashMap<>(_configurationMap);
HashMap<Class<?>, Object> cxfExtensions = new HashMap<>();
if (extensions.isEmpty()) {
cxfExtensions = null;
}
else {
for (Map.Entry<String, ServiceTuple<Object>> entry :
extensions.entrySet()) {
String className = entry.getKey();
ServiceTuple<Object> serviceTuple = entry.getValue();
ClassLoader classLoader = getClassLoader(serviceTuple);
try {
Class<?> clazz = classLoader.loadClass(className);
cxfExtensions.put(clazz, serviceTuple.getService());
}
catch (Exception e) {
if (_log.isErrorEnabled()) {
_log.error("Could not load extension for CXF bus", e);
}
}
}
}
if (_log.isDebugEnabled()) {
_log.debug(
"Creating CXF Bus with extensions {} and properties {}",
extensions, properties);
}
ExtensionManagerBus bus = new ExtensionManagerBus(
cxfExtensions, properties, wiring.getClassLoader());
bus.initialize();
if (_log.isDebugEnabled()) {
_log.debug(
"Created CXF Bus with extensions {} and properties {}",
extensions, properties);
}
return bus;
}
private OSGi<CachingServiceReference<CxfJaxrsServiceRegistrator>>
defaultApplication() {
return
highest(
serviceReferences(
CxfJaxrsServiceRegistrator.class,
String.format("(%s=%s)", JAX_RS_NAME, DEFAULT_NAME)
).filter(this::matchesWhiteboard)
);
}
private OSGi<CxfJaxrsServiceRegistrator> deployApplication(
ServiceTuple<Application> tuple,
CachingServiceReference<ServletContextHelper> contextReference) {
Supplier<Map<String, ?>> properties = () -> {
CachingServiceReference<Application> serviceReference =
tuple.getCachingServiceReference();
Map<String, Object> props = getProperties(
serviceReference);
props.computeIfAbsent(
JAX_RS_NAME, (__) -> generateApplicationName(
serviceReference::getProperty));
props.put(
"original.service.id",
serviceReference.getProperty("service.id"));
props.put(
"original.service.bundleid",
serviceReference.getProperty("service.bundleid"));
return props;
};
return
getCxfExtensions(tuple.getCachingServiceReference()).
flatMap(extensions ->
createRegistrator(extensions, tuple, properties).
flatMap(registrator ->
registerCXFServletService(
registrator.getBus(), properties, contextReference).
then(
register(
CxfJaxrsServiceRegistrator.class,
() -> registrator, properties).
then(
just(registrator)
))));
}
public OSGi<Map<String, ServiceTuple<Object>>> getCxfExtensions(
CachingServiceReference<Application> applicationReference) {
OSGi<ServiceTuple<Object>> cxfExtensionsForApplication =
onlyGettables(
serviceReferences("(cxf.extension=true)").filter(
sr -> {
Object appFilter = sr.getProperty(
JAX_RS_APPLICATION_SELECT);
if (appFilter == null) {
return true;
}
else {
try {
Filter filter = FrameworkUtil.createFilter(
appFilter.toString());
return filter.match(
applicationReference.getServiceReference());
}
catch (InvalidSyntaxException e) {
return false;
}
}
}
).filter(this::matchesWhiteboard),
__ -> {}, __ -> {}, _log);
return accumulateInMap(
cxfExtensionsForApplication,
st -> just(
Arrays.asList(
canonicalize(
st.getCachingServiceReference().
getProperty("objectClass")))),
OSGi::just);
}
private ClassLoader getClassLoader(ServiceTuple<?> serviceTuple) {
return serviceTuple.
getCachingServiceReference().
getServiceReference().
getBundle().
adapt(BundleWiring.class).
getClassLoader();
}
private OSGi<CxfJaxrsServiceRegistrator> createRegistrator(
Map<String, ServiceTuple<Object>> extensions,
ServiceTuple<Application> tuple, Supplier<Map<String, ?>> props) {
return
just(() -> new CxfJaxrsServiceRegistrator(
createBus(extensions), tuple, props.get())).
effects(
__ -> {}, __ -> {}, CxfJaxrsServiceRegistrator::close,
__ -> {});
}
private OSGi<CachingServiceReference<Object>>
getApplicationExtensionsForWhiteboard() {
return serviceReferences(_extensionsFilter.toString()).
filter(this::matchesWhiteboard);
}
private OSGi<CachingServiceReference<Application>>
getApplicationsForWhiteboard() {
return
serviceReferences(
Application.class, _applicationsFilter.toString()).
filter(this::matchesWhiteboard);
}
private OSGi<CachingServiceReference<Object>> getResourcesForWhiteboard() {
return serviceReferences(_resourcesFilter.toString()).
filter(this::matchesWhiteboard);
}
private OSGi<ServiceRegistration<Application>>
registerDefaultApplication() {
return register(
Application.class,
() -> new DefaultApplication() {
@Override
public Set<Object> getSingletons() {
Object defaultApplication = _configurationMap.get(
"default.web");
if (defaultApplication == null ||
Boolean.parseBoolean(defaultApplication.toString())) {
return Collections.singleton(new DefaultWeb());
}
else {
return Collections.emptySet();
}
}
},
() -> {
Object defaultApplicationBase = _configurationMap.get(
"default.application.base");
if (defaultApplicationBase == null ||
!(defaultApplicationBase instanceof String)) {
defaultApplicationBase = "/";
}
Map<String, Object> properties = new HashMap<>();
mergePropertyMaps(properties, _configurationMap);
properties.put(JAX_RS_NAME, DEFAULT_NAME);
properties.put(JAX_RS_APPLICATION_BASE, defaultApplicationBase);
properties.put("service.ranking", Integer.MIN_VALUE);
properties.put(
JAX_RS_WHITEBOARD_TARGET,
"(" + SERVICE_PID + "=" + _configurationMap.get(SERVICE_PID)
+ ")");
return properties;
});
}
private ServiceRegistration<?> registerJaxRSServiceRuntime(
Map<String, Object> properties) {
properties.putIfAbsent(Constants.SERVICE_RANKING, Integer.MIN_VALUE);
return _bundleContext.registerService(
JaxrsServiceRuntime.class, _runtime, new Hashtable<>(properties));
}
public void removeHttpEndpoints(List<String> endpoints) {
synchronized (_runtimeRegistration) {
_endpoints.removeAll(endpoints);
updateProperty(
_runtimeRegistration, JAX_RS_SERVICE_ENDPOINT, _endpoints);
}
}
private <T> OSGi<?> safeRegisterEndpoint(
CachingServiceReference<T> serviceReference,
PropertyHolder registratorProperties,
CxfJaxrsServiceRegistrator registrator) {
Bundle originalBundle = _bundleContext.getBundle(
(long)registratorProperties.get("original.service.bundleid"));
return
changeContext(
originalBundle.getBundleContext(),
onlyGettables(
just(serviceReference),
_runtime::addNotGettableEndpoint,
_runtime::removeNotGettableEndpoint,
_log
)
).recoverWith((t, e) ->
just(serviceReference).
effects(
_runtime::addErroredEndpoint,
_runtime::removeErroredEndpoint).
effects(
ifErrorEnabled(
_log,
() -> "ServiceReference {} for endpoint produced " +
"error: {}",
e),
ifErrorEnabled(
_log,
() -> "Errored ServiceReference {} for endpoint left")
).
then(nothing())
).flatMap(st ->
just(st.getServiceObjects()).
map(
Utils::getResourceProvider
).effects(
rp -> _runtime.addApplicationEndpoint(
registratorProperties, st.getCachingServiceReference(),
registrator.getBus(), st.getService().getClass()),
rp -> _runtime.removeApplicationEndpoint(
registratorProperties, st.getCachingServiceReference())
).effects(
registrator::add,
registrator::remove
).effects(
ifDebugEnabled(
_log,
() -> "Registered endpoint " +
st.getCachingServiceReference().
getServiceReference() + " into application " +
getServiceName(registratorProperties)
),
ifDebugEnabled(
_log,
() -> "Unregistered endpoint " +
st.getCachingServiceReference().
getServiceReference() + " from application " +
getServiceName(registratorProperties)
)
)
);
}
private OSGi<?> safeRegisterExtension(
CachingServiceReference<?> serviceReference,
PropertyHolder registratorProperties,
CxfJaxrsServiceRegistrator registrator) {
Bundle originalBundle = _bundleContext.getBundle(
(long)registratorProperties.get("original.service.bundleid"));
return
just(() -> getServiceName(registratorProperties)).
flatMap(applicationName ->
changeContext(
originalBundle.getBundleContext(),
onlyGettables(
just(serviceReference),
_runtime::addNotGettableExtension,
_runtime::removeNotGettableExtension,
_log
)
).recoverWith(
(t, e) ->
just(t.getCachingServiceReference()).
effects(
_runtime::addErroredExtension,
_runtime::removeErroredExtension
).
effects(
ifErrorEnabled(
_log,
() -> "ServiceReference {} for extension " +
"produced error: {}",
e),
ifErrorEnabled(
_log,
() -> "Errored ServiceReference {} for extension " +
"left")
).
then(nothing())
).effects(
registrator::addProvider,
registrator::removeProvider
).effects(
t -> _runtime.addApplicationExtension(
registratorProperties, serviceReference,
t.getService().getClass()),
__ -> _runtime.removeApplicationExtension(
registratorProperties, serviceReference)
).
effects(
ifDebugEnabled(
_log,
() ->
"Registered extension " +
serviceReference.getServiceReference() +
" into application " +
getServiceName(registratorProperties)
),
ifDebugEnabled(
_log,
() ->
"Unregistered extension " +
serviceReference.getServiceReference() +
" from application " +
getServiceName(registratorProperties)
)
).
effects(
__ ->
_applicationExtensionRegistry.
registerExtensionInApplication(
applicationName, serviceReference),
__ ->
_applicationExtensionRegistry.
unregisterExtensionInApplication(
applicationName, serviceReference)
));
}
private OSGi<CachingServiceReference<Application>>
waitForApplicationDependencies(
OSGi<CachingServiceReference<Application>> references) {
return references.flatMap(reference -> {
String[] extensionDependencies = canonicalize(
reference.getProperty(JAX_RS_EXTENSION_SELECT));
OSGi<CachingServiceReference<Application>> program = just(
reference);
if (extensionDependencies.length == 0) {
return program;
}
for (String extensionDependency : extensionDependencies) {
if (_log.isDebugEnabled()) {
_log.debug(
"Application {} has a dependency on {}",
reference, extensionDependency);
}
program =
once(
_extensionRegistry.waitForExtension(
extensionDependency).
flatMap(
sr -> {
Object applicationSelectProperty =
sr.getProperty(JAX_RS_APPLICATION_SELECT);
if (applicationSelectProperty == null) {
return just(reference);
}
Filter filter;
try {
filter = _bundleContext.createFilter(
applicationSelectProperty.toString());
}
catch (InvalidSyntaxException e) {
return nothing();
}
if (filter.match(
reference.getServiceReference())) {
return just(reference);
}
return nothing();
}
)).effects(
__ -> {},
__ -> _runtime.addDependentApplication(
reference)
).
effects(
ifDebugEnabled(
_log,
() -> "Application "+ reference +
" dependency " + extensionDependency +
" has been fullfiled"),
ifDebugEnabled(
_log,
() -> "Application "+ reference +
" dependency " + extensionDependency +
" has gone")
).
then(program);
}
program = effects(
() -> _runtime.addDependentApplication(reference),
() -> _runtime.removeDependentApplication(reference)
).then(program);
program = program.effects(
__ -> _runtime.removeDependentApplication(reference),
__ -> {}
);
return program;
});
}
private OSGi<?> waitForExtensionDependencies(
CachingServiceReference<?> reference,
CachingServiceReference<CxfJaxrsServiceRegistrator>
applicationRegistratorReference,
Consumer<CachingServiceReference<?>> onAddingDependent,
Consumer<CachingServiceReference<?>> onRemovingDependent) {
String applicationName = getServiceName(
applicationRegistratorReference::getProperty);
String[] extensionDependencies = canonicalize(
reference.getProperty(JAX_RS_EXTENSION_SELECT));
OSGi<CachingServiceReference<?>> program = just(reference);
if (extensionDependencies.length == 0) {
return program;
}
for (String extensionDependency : extensionDependencies) {
if (_log.isDebugEnabled()) {
_log.debug(
"Extension {} has a dependency on {}",
reference, extensionDependency);
}
try {
String finalExtensionDependency = extensionDependency.replace(
"(objectClass=", "(original.objectClass=");
Filter extensionFilter = _bundleContext.createFilter(
finalExtensionDependency);
if (
extensionFilter.match(_runtimeReference) ||
extensionFilter.match(
applicationRegistratorReference.getServiceReference()))
{
continue;
}
program =
once(_applicationExtensionRegistry.waitForApplicationExtension(
applicationName, extensionDependency).effects(
__ -> {},
__ -> onAddingDependent.accept(reference)
)).
effects(
ifDebugEnabled(
_log,
() -> "Extension " + reference +
" dependency " + extensionDependency +
" has been fullfiled"),
ifDebugEnabled(
_log,
() -> "Extension " + reference +
" dependency " + extensionDependency +
" has gone")
).
then(program);
}
catch (InvalidSyntaxException e) {
}
}
program = effects(
() -> onAddingDependent.accept(reference),
() -> onRemovingDependent.accept(reference)).
then(program);
program = program.effects(
__ -> onRemovingDependent.accept(reference),
__ -> {}
);
return program;
}
String getApplicationBase(PropertyHolder properties) {
return _applicationBasePrefix + getString(
properties.get(JAX_RS_APPLICATION_BASE));
}
private static OSGi<CachingServiceReference<CxfJaxrsServiceRegistrator>>
allApplicationReferences() {
return serviceReferences(CxfJaxrsServiceRegistrator.class);
}
private static OSGi<CachingServiceReference<CxfJaxrsServiceRegistrator>>
chooseApplication(
CachingServiceReference<?> serviceReference,
OSGi<CachingServiceReference<CxfJaxrsServiceRegistrator>>
theDefault,
Consumer<CachingServiceReference<?>> onWaiting,
Consumer<CachingServiceReference<?>> onResolved) {
Object applicationSelectProperty = serviceReference.getProperty(
JAX_RS_APPLICATION_SELECT);
if (applicationSelectProperty == null) {
return theDefault;
}
return
just(AtomicInteger::new).flatMap(counter ->
effects(
() -> onWaiting.accept(serviceReference),
() -> {
onResolved.accept(serviceReference);
counter.set(0);
}).then(
serviceReferences(
CxfJaxrsServiceRegistrator.class,
applicationSelectProperty.toString()).
effects(
__ -> {
if (counter.getAndIncrement() == 0) {
onResolved.accept(serviceReference);
}
},
__ -> {
if (counter.decrementAndGet() == 0) {
onWaiting.accept(serviceReference);
}
})
));
}
private <T> OSGi<T> countChanges(OSGi<T> program) {
return program.effects(
__ -> {},
__ -> _counter.inc(),
__ -> {},
__ -> _counter.inc()
);
}
private static CXFNonSpringServlet createCXFServlet(Bus bus) {
CXFNonSpringServlet cxfNonSpringServlet = new CXFNonSpringServlet() {
@Override
public void destroyBus() {
}
};
cxfNonSpringServlet.setBus(bus);
return cxfNonSpringServlet;
}
private static OSGi<CachingServiceReference<Object>>
onlySupportedInterfaces(
OSGi<CachingServiceReference<Object>> program,
Consumer<CachingServiceReference<?>> onInvalidAdded,
Consumer<CachingServiceReference<?>> onInvalidRemoved) {
return program.flatMap(sr -> {
if (signalsValidInterface(sr)) {
return just(sr);
}
else {
return effects(
() -> onInvalidAdded.accept(sr),
() -> onInvalidRemoved.accept(sr)).
then(nothing());
}
});
}
private OSGi<ServiceRegistration<Servlet>> registerCXFServletService(
Bus bus, Supplier<Map<String, ?>> servicePropertiesSup,
CachingServiceReference<ServletContextHelper> contextReference) {
Map<String, ?> serviceProperties = servicePropertiesSup.get();
String address = canonicalizeAddress(
getApplicationBase(serviceProperties::get));
String applicationName = getServiceName(serviceProperties::get);
Supplier<Map<String, ?>> contextPropertiesSup;
OSGi<?> program = effects(NOOP, NOOP);
if (contextReference == null) {
contextPropertiesSup = () -> {
HashMap<String, Object> contextProperties = new HashMap<>();
Utils.mergePropertyMaps(contextProperties, serviceProperties);
Utils.mergePropertyMaps(contextProperties, _configurationMap);
contextProperties.putIfAbsent(
HTTP_WHITEBOARD_TARGET, "(osgi.http.endpoint=*)");
String contextName;
if ("".equals(address)) {
contextName = HTTP_WHITEBOARD_DEFAULT_CONTEXT_NAME;
} else {
contextName = "context.for" + applicationName;
}
contextProperties.put(
HTTP_WHITEBOARD_CONTEXT_NAME, contextName);
contextProperties.put(
HTTP_WHITEBOARD_CONTEXT_PATH, address);
return contextProperties;
};
program = register(
ServletContextHelper.class,
() -> new ServletContextHelper() {}, contextPropertiesSup);
}
else {
contextPropertiesSup = () -> {
HashMap<String, Object> properties = new HashMap<>();
properties.put(
HTTP_WHITEBOARD_CONTEXT_NAME,
contextReference.getProperty(HTTP_WHITEBOARD_CONTEXT_NAME));
return properties;
};
}
Supplier<Map<String, ?>> servletPropertiesSup = () -> {
HashMap<String, Object> servletProperties = new HashMap<>();
Utils.mergePropertyMaps(servletProperties, serviceProperties);
Utils.mergePropertyMaps(servletProperties, _configurationMap);
servletProperties.putIfAbsent(
HTTP_WHITEBOARD_TARGET, "(osgi.http.endpoint=*)");
Map<String, ?> contextProperties = contextPropertiesSup.get();
servletProperties.put(
HTTP_WHITEBOARD_CONTEXT_SELECT,
format("(%s=%s)", HTTP_WHITEBOARD_CONTEXT_NAME,
contextProperties.get(HTTP_WHITEBOARD_CONTEXT_NAME)));
if (contextReference == null) {
servletProperties.put(HTTP_WHITEBOARD_SERVLET_PATTERN, "/*");
}
else {
servletProperties.put(
HTTP_WHITEBOARD_SERVLET_PATTERN, address + "/*");
}
servletProperties.put(
HTTP_WHITEBOARD_SERVLET_ASYNC_SUPPORTED, true);
servletProperties.put(
HTTP_WHITEBOARD_SERVLET_NAME, "cxf-servlet");
return servletProperties;
};
return program.then(
register(
Servlet.class,
new PrototypeServiceFactory<Servlet>() {
@Override
public Servlet getService(
Bundle bundle,
ServiceRegistration<Servlet> registration) {
return createCXFServlet(bus);
}
@Override
public void ungetService(
Bundle bundle,
ServiceRegistration<Servlet> registration,
Servlet service) {
}
},
servletPropertiesSup));
}
private static boolean signalsValidInterface(
CachingServiceReference<Object> serviceReference) {
String[] objectClasses = canonicalize(serviceReference.getProperty(
"objectClass"));
return Arrays.stream(objectClasses).
anyMatch(SUPPORTED_EXTENSION_INTERFACES::containsKey);
}
private interface ChangeCounter {
void inc();
}
private static class ServiceRegistrationChangeCounter
implements ChangeCounter{
private static final String changecount = "service.changecount";
private final AtomicLong _atomicLong = new AtomicLong();
private final ServiceRegistration<?> _serviceRegistration;
ServiceRegistrationChangeCounter(
ServiceRegistration<?> serviceRegistration) {
_serviceRegistration = serviceRegistration;
}
@Override
public void inc() {
long l = _atomicLong.incrementAndGet();
synchronized (_serviceRegistration) {
updateProperty(_serviceRegistration, changecount, l);
}
}
}
private static final Filter _extensionsFilter;
private static final Filter _resourcesFilter;
private static Filter _applicationsFilter;
static {
try {
_applicationsFilter = FrameworkUtil.createFilter(
format(
"(&(objectClass=%s)(%s=*))", Application.class.getName(),
JAX_RS_APPLICATION_BASE));
_extensionsFilter = FrameworkUtil.createFilter(
format("(%s=true)", JAX_RS_EXTENSION));
_resourcesFilter = FrameworkUtil.createFilter(
format("(%s=true)", JAX_RS_RESOURCE));
}
catch (InvalidSyntaxException e) {
throw new RuntimeException(e);
}
}
private class ApplicationReferenceWithContext
implements Comparable<ApplicationReferenceWithContext> {
@Override
public int compareTo(ApplicationReferenceWithContext o) {
return _applicationReference.compareTo(o._applicationReference);
}
private CachingServiceReference<ServletContextHelper> _contextReference;
private CachingServiceReference<Application> _applicationReference;
public ApplicationReferenceWithContext(
CachingServiceReference<ServletContextHelper> contextReference,
CachingServiceReference<Application> applicationReference) {
_contextReference = contextReference;
_applicationReference = applicationReference;
}
public String getActualBasePath() {
String applicationBase = getApplicationBase(
_applicationReference::getProperty);
if (_contextReference == null) {
return applicationBase;
}
Object property = _contextReference.getProperty(
HTTP_WHITEBOARD_CONTEXT_PATH);
if (property == null) {
return applicationBase;
}
String contextPath = property.toString();
return contextPath + applicationBase;
}
public CachingServiceReference<ServletContextHelper>
getContextReference() {
return _contextReference;
}
public CachingServiceReference<Application> getApplicationReference() {
return _applicationReference;
}
}
}