blob: 330bef56f9aa5c29ffcfda5b8c07eabec772d14e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.stanbol.commons.stanboltools.datafileprovider.impl.tracking;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
//DO NOT REMOVE - workaround for FELIX-2906
import java.lang.Integer;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.ReferencePolicy;
import org.apache.felix.scr.annotations.ReferenceStrategy;
import org.apache.felix.scr.annotations.Service;
import org.apache.stanbol.commons.stanboltools.datafileprovider.DataFileListener;
import org.apache.stanbol.commons.stanboltools.datafileprovider.DataFileProvider;
import org.apache.stanbol.commons.stanboltools.datafileprovider.DataFileTracker;
import org.osgi.framework.Constants;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation of the {@link DataFileTracker} interface. This implementation
* can work within and outside of an OSGI Environment.<p>
* When within an OSGI Environment tracking in started by
* {@link #activate(ComponentContext)}and stopped by
* {@link #deactivate(ComponentContext)} the required {@link DataFileProvider} is
* injected.<p>
* Outside an OSGI Environment the {@link DataFileProvider} needs to be
* provided by the constructor (DO NOT use the default constructor).
* Tracking is started and stopped by calls to the {@link #startTracking()} and
* {@link #stopTracking()} methods.
* @author Rupert Westenthaler.
*
*/
@Component(immediate=true, metatype=true)
@Service
@Property(name=Constants.SERVICE_RANKING, intValue=Integer.MAX_VALUE)
public final class DataFileTrackerImpl implements DataFileTracker {
protected final Logger log = LoggerFactory.getLogger(DataFileTrackerImpl.class);
/**
* The default interval set to five seconds.
*/
public static final long DEFAULT_INTERVAL = 5000;
/**
* The minimum interval between two polls is set to 0.5 seconds.
*/
private static final long MIN_INTERVAL = 500;
@Property(longValue=DEFAULT_INTERVAL)
public static final String PROPERTY_TRACKING_INTERVAL = "data.files.tracking.interval";
/**
* The {@link DataFileProvider} used for tracking. Injected by OSGI or
* paresed by the constructor (outside OSGI). <p>
* Do not directly access (use {@link #getDataFileProvider()}) and store
* a local reference in case of multiple requests because within OSGI this
* uses {@link ReferencePolicy#DYNAMIC} and
* {@link ReferenceStrategy#EVENT} to avoid a service restart
* on a change of the {@link DataFileProvider} with the highest
* {@link Constants#SERVICE_RANKING}. Service restarts cause all existing
* registrations to track data files to be lost!.
*/
@Reference(
cardinality=ReferenceCardinality.MANDATORY_UNARY,
policy=ReferencePolicy.DYNAMIC,
strategy=ReferenceStrategy.EVENT,
bind="bindDataFileProvider",
unbind="unbindDataFileProvider")
protected DataFileProvider _dataFileProvider;
/**
* Getter for the current {@link DataFileProvider} instance used to track
* data files. This value may change dynamically without a restart of this
* Service.
* @return The current {@link DataFileProvider} instance
* @see #_dataFileProvider
*/
protected DataFileProvider getDataFileProvider(){
return _dataFileProvider;
}
/**
* Bind and Update method for {@link #_dataFileProvider}
* @param dfp
*/
protected void bindDataFileProvider(DataFileProvider dfp){
_dataFileProvider = dfp;
}
/**
* Unbind method for {@link #_dataFileProvider}
* @param dfp
*/
protected void unbindDataFileProvider(DataFileProvider dfp){
if(dfp != null && dfp.equals(_dataFileProvider)){
_dataFileProvider = null;
} //else ignore
}
/**
* The current interval inbetween the end of the previous and the start of
* the next poll for tracked resources.
*/
protected long interval;
/**
* Internally used to shutdown the {@link TrackingDaemon}
*/
protected boolean active = false;
/**
* Default constructor used by OSGI. This expects that the
* {@link #_dataFileProvider} is injected during activation
*/
public DataFileTrackerImpl() {
}
/**
* Constructor to be used outside an OSGI environment to manually
* instantiate a {@link DataFileTracker} for a given provider
* @param provider the provider used to track data files
*/
public DataFileTrackerImpl(DataFileProvider provider){
if(provider == null){
throw new IllegalArgumentException("The parsed DataFileProvider MUST NOT be NULL!");
}
bindDataFileProvider(provider);
}
@Activate
protected void activate(ComponentContext context){
Object value = context.getProperties().get(PROPERTY_TRACKING_INTERVAL);
if(value instanceof Number){
setInterval(((Number)value).longValue());
} else if(value != null){
try {
setInterval(new BigDecimal(value.toString()).longValue());
} catch (NumberFormatException e){
log.warn("Value of property '"+value+"' can not be converted into a LONG",e);
} catch (ArithmeticException e) {
log.warn("Value of property '"+value+"' can not be converted into a LONG",e);
}
}
//do not start here ... only if the first resource is added
//startTracking();
}
@Deactivate
protected void deactivate(ComponentContext context){
stopTracking();
setInterval(DEFAULT_INTERVAL);
trackedResources.clear();
}
/**
* @return the interval
*/
public final long getInterval() {
return interval;
}
/**
* Setter for the interval between two polls. If the parsed interval is
* smaller equals zero the interval is set to the {@link #DEFAULT_INTERVAL}.
* If the parsed interval bigger than zero but lower than the
* {@link #MIN_INTERVAL} the value is set to MIN_INTERVAL.
* @param interval the interval to set
*/
public final void setInterval(long interval) {
if(interval <= 0){
this.interval = DEFAULT_INTERVAL;
} else if(interval < MIN_INTERVAL){
this.interval = MIN_INTERVAL;
} else {
this.interval = interval;
}
}
private final Map<DataFileReference,TrackingState> trackedResources = Collections.synchronizedMap(
new HashMap<DataFileReference,TrackingState>());
/* (non-Javadoc)
* @see org.apache.stanbol.commons.stanboltools.datafileprovider.impl.ResourceTracker#add(org.apache.stanbol.commons.stanboltools.datafileprovider.impl.ResourceTrackerImpl.ResourceListener, java.lang.String)
*/
@Override
public void add(DataFileListener resourceListener, String name,Map<String,String> properties){
add(resourceListener,null,name,properties);
}
/* (non-Javadoc)
* @see org.apache.stanbol.commons.stanboltools.datafileprovider.impl.ResourceTracker#add(org.apache.stanbol.commons.stanboltools.datafileprovider.impl.ResourceTrackerImpl.ResourceListener, java.lang.String, java.lang.String)
*/
@Override
public void add(DataFileListener resourceListener, String bundleSymbolicName, String name, Map<String,String> properties){
if(resourceListener == null){
throw new IllegalArgumentException("The parsed ResourceListener MUST NOT be NULL!");
}
DataFileReference r = new DataFileReference(bundleSymbolicName, name, properties);
synchronized (trackedResources) {
TrackingState trackingState = trackedResources.get(r);
if(trackingState == null){ //add new
trackingState = new TrackingState();
trackedResources.put(r, trackingState);
}
trackingState.addListener(resourceListener);
if(!trackedResources.isEmpty()){ //maybe this was the first added RDFTerm
startTracking(); //so me might want to start tracking
}
}
}
@Override
public boolean isTracked(String bundleSymbolicName,String resourceName) {
return trackedResources.containsKey(new DataFileReference(bundleSymbolicName, resourceName));
}
@Override
public boolean isTracked(DataFileListener resourceListener, String bundleSymbolicName,String resourceName) {
TrackingState state = trackedResources.get(new DataFileReference(bundleSymbolicName, resourceName));
return state != null ? state.isListener(resourceListener) : false;
}
/* (non-Javadoc)
* @see org.apache.stanbol.commons.stanboltools.datafileprovider.impl.ResourceTracker#remove(org.apache.stanbol.commons.stanboltools.datafileprovider.impl.ResourceTrackerImpl.ResourceListener, java.lang.String)
*/
public void remove(DataFileListener resourceListener, String resource){
remove(resourceListener,null,resource);
}
/* (non-Javadoc)
* @see org.apache.stanbol.commons.stanboltools.datafileprovider.impl.ResourceTracker#remove(org.apache.stanbol.commons.stanboltools.datafileprovider.impl.ResourceTrackerImpl.ResourceListener, java.lang.String, java.lang.String)
*/
public void remove(DataFileListener resourceListener, String bundleSymbolicName, String name){
if(resourceListener != null && name != null && !name.isEmpty()){
DataFileReference r = new DataFileReference(bundleSymbolicName,name);
synchronized (trackedResources) {
TrackingState state = trackedResources.get(r);
if(state != null && state.removeListener(resourceListener) != null && state.isEmpty()){
trackedResources.remove(r);
}
if(trackedResources.isEmpty()){ //no tracked resources left?
stopTracking(); //stop tracking until others are added
}
}
} //else ignore
}
/* (non-Javadoc)
* @see org.apache.stanbol.commons.stanboltools.datafileprovider.impl.ResourceTracker#removeAll(org.apache.stanbol.commons.stanboltools.datafileprovider.impl.ResourceTrackerImpl.ResourceListener)
*/
public void removeAll(DataFileListener resourceListener){
if(resourceListener != null){
synchronized (trackedResources) {
//try to remove the listener for all tracked resources
Iterator<Entry<DataFileReference,TrackingState>> entries = trackedResources.entrySet().iterator();
while(entries.hasNext()){
Entry<DataFileReference,TrackingState> entry = entries.next();
if(entry.getValue().removeListener(resourceListener) != null && entry.getValue().isEmpty()){
//and remove tracked resources if no others are listening
entries.remove();
}
}
if(trackedResources.isEmpty()){ //no tracked resources left?
stopTracking(); //stop tracking until others are added
}
}
}
}
/**
* used to start a new {@link TrackingDaemon} e.g. if the first resource
* to be tracked was added to {@link #trackedResources}.
* If tracking is already active this method has no effect
*/
private void startTracking(){
if(!active){
log.info("start Tracking ...");
active=true;
Thread t = new Thread(new TrackingDaemon(), "DataFileTrackingDaemon");
t.setDaemon(true);
t.start();
} //else already active
}
/**
* Used to stop the {@link TrackingDaemon} e.g. if the last tracked
* resource was removed, or this component is deactivated
* @see #deactivate(ComponentContext)
* @see #close()
*/
private void stopTracking(){
active = false;
}
/**
* Can be used outside an OSGI environment to close this instance and
* stop tracking for all currently registered sources
*/
public void close(){
deactivate(null);
}
@Override
protected void finalize() throws Throwable {
close();
super.finalize();
}
protected class TrackingDaemon implements Runnable {
public void run(){
while(active){
//a clone of the tracked resource
Set<DataFileReference> resources;
synchronized (trackedResources) {
resources = new HashSet<DataFileReference>(trackedResources.keySet());
}
DataFileProvider dataFileProvider = getDataFileProvider();
if(dataFileProvider == null){
log.info("Currently no DataFileProvider available");
} else {
log.debug("Track {} resources",resources.size());
for(DataFileReference resoruce : resources){
TrackingState resourceState;
synchronized (trackedResources) {
resourceState = trackedResources.get(resoruce);
}
if(log.isDebugEnabled()){
log.debug(" > {} (state:{})",resoruce.getName(),
resourceState != null ?
resourceState.getTrackingState() != null ?
resourceState.getTrackingState() : "none" :
null);
}
if(resourceState != null){ //might be null if removed in the meantime
STATE state;
if(dataFileProvider.isAvailable(resoruce.getBundleSymbolicName(),
resoruce.getName(), resoruce.getProperties())){
state = STATE.AVAILABLE;
} else {
state = STATE.UNAVAILABLE;
}
fire(dataFileProvider,resoruce,resourceState,state);
}
}
}
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
log.debug("interrupped",e);
}
}
log.info(" ... tracking stopped!");
}
/**
* Processes the the tracking event and fires events to the {@link DataFileListener}
* registered for the parsed resource
* @param dataFileProvider the {@link DataFileProvider} used to get an
* {@link InputStream} per {@link DataFileListener} that needs to be notified
* @param resoruce the resource
* @param trackinState the {@link DataFileListener} and {@link STATE}s for that
* resource
* @param state the updated state of the parsed resource
* @param is the {@link InputStream} already available from the tracking
* process or <code>null</code> in case the parsed state is {@link STATE#UNAVAILABLE}.
* If parsed the stream will be {@link InputStream#close() closed} even if
* not used.
*/
private void fire(DataFileProvider dataFileProvider, DataFileReference resoruce, TrackingState trackinState, STATE state) {
InputStream is = null;
//iterate over clone to avoid concurrent modification exceptions
//by external and/or this calls to remove(..)
for(Entry<DataFileListener,STATE> listenerState : trackinState){
if(listenerState.getValue() != state){
boolean remove = false;
switch (state) {
case UNAVAILABLE:
try {
remove = listenerState.getKey().unavailable(resoruce.getName());
trackinState.updateListener(listenerState.getKey(), state);
} catch (Exception e) {
log.warn("Exception from Listener '"+listenerState.getKey()+
"' while calling unavailable for resource '"+
resoruce.getName()+"'!",e);
//the state to ERROR ... will try again on the next check
trackinState.updateListener(listenerState.getKey(), STATE.ERROR);
}
break;
case AVAILABLE:
if(is == null){ //get a new InputStream
try {
is = dataFileProvider.getInputStream(resoruce.getBundleSymbolicName(),
resoruce.getName(), resoruce.getProperties());
} catch (IOException e) {
//now again unavailable ?!
//Send anyway with null as stream
is = null;
}
}
try {
remove = listenerState.getKey().available(resoruce.getName(), is);
trackinState.updateListener(listenerState.getKey(), state);
} catch (Exception e) {
log.warn("Exception from Listener '"+listenerState.getKey()+
"' while calling available for resource '"+
resoruce.getName()+"'!",e);
//the state is no changed ... will try again on the next check
trackinState.updateListener(listenerState.getKey(), STATE.ERROR);
}
is = null; //we need a new InputStream getName() each resource
break;
// ignore other values
}
if(remove){ //remove if requested
remove(listenerState.getKey(), resoruce.getBundleSymbolicName(), resoruce.getName());
}
} //else this listener is already in the notified state ... ignore
}
// if(is != null) { //the created InputStream was not sent to a Listener
// try {
// is.close(); //clean up
// } catch (IOException e) {
// //ignore
// }
// }
}
}
}