blob: b3f91dce908b473bc76ec658396f9a96e80f5002 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.resourceresolver.impl.observation;
import java.util.Collections;
import java.util.Dictionary;
import java.util.Hashtable;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.sling.api.SlingConstants;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.observation.ExternalResourceChangeListener;
import org.apache.sling.api.resource.observation.ResourceChange;
import org.apache.sling.api.resource.observation.ResourceChange.ChangeType;
import org.apache.sling.api.resource.observation.ResourceChangeListener;
import org.osgi.framework.Constants;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.ConfigurationPolicy;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.event.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component(service = ResourceChangeListener.class,
configurationPolicy = ConfigurationPolicy.IGNORE,
property = {
Constants.SERVICE_VENDOR + "=The Apache Software Foundation",
ResourceChangeListener.PATHS + "=/",
ResourceChangeListener.CHANGES + "=ADDED",
ResourceChangeListener.CHANGES + "=CHANGED",
ResourceChangeListener.CHANGES + "=REMOVED"
})
public class OsgiObservationBridge implements ResourceChangeListener, ExternalResourceChangeListener {
private final Logger logger = LoggerFactory.getLogger(OsgiObservationBridge.class);
@Reference
private EventAdmin eventAdmin;
@Reference
private ResourceResolverFactory resolverFactory;
private ResourceResolver resolver;
private BlockingQueue<ResourceChange> changesQueue;
private EventSendingJob job;
protected void activate() throws LoginException {
resolver = resolverFactory.getServiceResourceResolver(Collections.<String, Object>singletonMap(ResourceResolverFactory.SUBSERVICE, "observation"));
changesQueue = new LinkedBlockingQueue<ResourceChange>();
job = new EventSendingJob(changesQueue);
Executors.newSingleThreadExecutor().submit(job);
}
@Reference(name = "handlers",
cardinality=ReferenceCardinality.AT_LEAST_ONE,
policy=ReferencePolicy.DYNAMIC,
service=EventHandler.class,
target="(|(event.topics=org/apache/sling/api/resource/Resource/*)(event.topics=org/apache/sling/api/resource/ResourceProvider/*))")
private void bindEventHandler(final EventHandler handler) {
logger.warn("Found OSGi Event Handler for deprecated resource bridge: {}", handler);
}
@SuppressWarnings("unused")
private void unbindEventHandler(final EventHandler handler) {
// nothing to do here
}
protected void deactivate() {
changesQueue.clear();
job.stop();
resolver.close();
}
@Override
public void onChange(List<ResourceChange> changes) {
changesQueue.addAll(changes);
}
@SuppressWarnings("deprecation")
private void sendOsgiEvent(ResourceChange change) {
Dictionary<String, Object> props = new Hashtable<String, Object>();
String topic;
switch (change.getType()) {
case ADDED:
topic = SlingConstants.TOPIC_RESOURCE_ADDED;
break;
case CHANGED:
topic = SlingConstants.TOPIC_RESOURCE_CHANGED;
break;
case REMOVED:
topic = SlingConstants.TOPIC_RESOURCE_REMOVED;
break;
default:
return;
}
props.put(SlingConstants.PROPERTY_PATH, change.getPath());
if (change.getUserId() != null) {
props.put(SlingConstants.PROPERTY_USERID, change.getUserId());
}
if (change.getAddedPropertyNames() != null ) {
props.put(SlingConstants.PROPERTY_ADDED_ATTRIBUTES, change.getAddedPropertyNames().toArray(new String[change.getAddedPropertyNames().size()]));
}
if (change.getChangedPropertyNames() != null) {
props.put(SlingConstants.PROPERTY_CHANGED_ATTRIBUTES, change.getChangedPropertyNames().toArray(new String[change.getChangedPropertyNames().size()]));
}
if ( change.getRemovedPropertyNames() != null ) {
props.put(SlingConstants.PROPERTY_REMOVED_ATTRIBUTES, change.getRemovedPropertyNames().toArray(new String[change.getRemovedPropertyNames().size()]));
}
if (change.getType() != ChangeType.REMOVED) {
Resource resource = resolver.getResource(change.getPath());
if (resource == null) {
resolver.refresh();
resource = resolver.getResource(change.getPath());
}
if (resource != null) {
if (resource.getResourceType() != null) {
props.put(SlingConstants.PROPERTY_RESOURCE_TYPE, resource.getResourceType());
}
if (resource.getResourceSuperType() != null) {
props.put(SlingConstants.PROPERTY_RESOURCE_SUPER_TYPE, resource.getResourceSuperType());
}
}
}
if (change.isExternal()) {
props.put("event.application", "unknown");
}
final Event event = new Event(topic, props);
eventAdmin.sendEvent(event);
}
private class EventSendingJob implements Runnable {
private final BlockingQueue<ResourceChange> changes;
private volatile boolean stop;
public EventSendingJob(BlockingQueue<ResourceChange> changes) {
this.changes = changes;
}
@Override
public void run() {
while (!stop) {
ResourceChange change = null;
try {
change = changes.poll(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
logger.warn("Interrupted the OSGi runnable", e);
}
if (change == null) {
continue;
}
try {
sendOsgiEvent(change);
} catch (Exception e) {
logger.error("processOsgiEventQueue: Unexpected problem processing resource change {}", change, e);
}
}
}
public void stop() {
stop = true;
}
}
}