blob: 0ae7ea069f03ce016f6db69e4fdd3a2e25baf07e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.axis2.deploy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.ode.axis2.ODEServer;
import org.apache.ode.bpel.clapi.ClusterLock;
import org.apache.ode.bpel.engine.cron.CronScheduler;
import org.apache.ode.bpel.engine.cron.SystemSchedulesConfig;
import org.apache.ode.utils.WatchDog;
import javax.xml.namespace.QName;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
/**
* Polls a directory for the deployment of a new deployment unit.
*/
public class DeploymentPoller {
private static Logger __log = LoggerFactory.getLogger(DeploymentPoller.class);
/** The polling interval. */
private static final long POLL_TIME = 3000;
private File _deployDir;
private PollingThread _poller;
protected ODEServer _odeServer;
private boolean _onHold = false;
private SystemSchedulesConfig _systemSchedulesConf;
private boolean clusterEnabled;
@SuppressWarnings("unchecked")
private Map<String, WatchDog> dDWatchDogsByPath = new HashMap<String, WatchDog>();
@SuppressWarnings("unchecked")
private WatchDog _systemCronConfigWatchDog;
/** Filter accepting directories containing a ode dd file. */
private static final FileFilter _fileFilter = new FileFilter() {
public boolean accept(File path) {
return new File(path, "deploy.xml").exists();
}
};
/** Filter accepting *.deployed files. */
private static final FileFilter _deployedFilter = new FileFilter() {
public boolean accept(File path) {
return path.isFile() && path.getName().endsWith(".deployed");
}
};
public DeploymentPoller(File deployDir, final ODEServer odeServer) {
_odeServer = odeServer;
_deployDir = deployDir;
clusterEnabled = _odeServer.isClusteringEnabled();
if (!_deployDir.exists()) {
boolean isDeployDirCreated = _deployDir.mkdir();
if (!isDeployDirCreated) {
__log.error("Error while creating deploy directory "
+ deployDir.getName());
}
}
_systemSchedulesConf = createSystemSchedulesConfig(odeServer.getConfigRoot());
_systemCronConfigWatchDog = createSystemCronConfigWatchDog(odeServer.getBpelServer().getContexts().cronScheduler);
}
public void start() {
_poller = new PollingThread();
_poller.start();
__log.info("Poller started.");
}
public void stop() {
_poller.kill();
_poller = null;
__log.info("Poller stopped.");
}
protected boolean isDeploymentFromODEFileSystemAllowed() {
return true;
}
/**
* Scan the directory for new (or removed) files (called mainly from {@link PollingThread}) and calls whoever is in charge of
* the actual deployment (or undeployment).
*/
@SuppressWarnings("unchecked")
private void check() {
File[] files = _deployDir.listFiles(_fileFilter);
boolean duLocked;
// Checking for new deployment directories
if (isDeploymentFromODEFileSystemAllowed() && files != null) {
for (File file : files) {
String duName = file.getName();
if (__log.isDebugEnabled()) {
__log.debug("Trying to acquire the lock for " + duName);
}
duLocked = pollerTryLock(duName);
if (duLocked) {
try {
File deployXml = new File(file, "deploy.xml");
File deployedMarker = new File(_deployDir, file.getName() + ".deployed");
if (!deployXml.exists()) {
// Skip if deploy.xml is abset
if (__log.isDebugEnabled()) {
__log.debug("Not deploying " + file + " (missing deploy.xml)");
}
}
WatchDog ddWatchDog = ensureDeployXmlWatchDog(file, deployXml);
if (deployedMarker.exists()) {
checkDeployXmlWatchDog(ddWatchDog);
continue;
}
try {
boolean isCreated = deployedMarker.createNewFile();
if (!isCreated) {
__log.error("Error while creating file "
+ file.getName()
+ ".deployed ,deployment could be inconsistent");
}
} catch (IOException e1) {
__log.error("Error creating deployed marker file, " + file + " will not be deployed");
continue;
}
try {
_odeServer.getProcessStore().undeploy(file);
} catch (Exception ex) {
__log.error("Error undeploying " + file.getName());
}
try {
Collection<QName> deployed = _odeServer.getProcessStore().deploy(file);
__log.info("Deployment of artifact " + file.getName() + " successful: " + deployed);
} catch (Exception e) {
__log.error("Deployment of " + file.getName() + " failed, aborting for now.", e);
}
} finally {
if (__log.isDebugEnabled()) {
__log.debug("Trying to release the lock for " + file.getName());
}
unlock(file.getName());
}
}
}
}
// Removing deployments that disappeared
File[] deployed = _deployDir.listFiles(_deployedFilter);
for (File file : deployed) {
String pkg = file.getName().substring(0, file.getName().length() - ".deployed".length());
File deployDir = new File(_deployDir, pkg);
if (!deployDir.exists()) {
String duName = deployDir.getName();
if (__log.isDebugEnabled()) {
__log.debug("Trying to acquire the lock for " + duName);
}
duLocked = pollerTryLock(duName);
if (duLocked) {
try {
Collection<QName> undeployed = _odeServer.getProcessStore().undeploy(deployDir);
boolean isDeleted = file.delete();
if (!isDeleted) {
__log.error("Error while deleting file "
+ file.getName()
+ ".deployed , please check if file is locked or if it really exist");
}
disposeDeployXmlWatchDog(deployDir);
if (undeployed.size() > 0)
__log.info("Successfully undeployed " + pkg);
} finally {
if (__log.isDebugEnabled()) {
__log.debug("Trying to release the lock for " + duName);
}
unlock(duName);
}
}
}
}
checkSystemCronConfigWatchDog(_systemCronConfigWatchDog);
}
@SuppressWarnings("unchecked")
protected WatchDog ensureDeployXmlWatchDog(File deployFolder, File deployXml) {
WatchDog ddWatchDog = dDWatchDogsByPath.get(deployXml.getAbsolutePath());
if( ddWatchDog == null ) {
ddWatchDog = WatchDog.watchFile(deployXml, new DDWatchDogObserver(deployFolder.getName()));
dDWatchDogsByPath.put(deployXml.getAbsolutePath(), ddWatchDog);
}
return ddWatchDog;
}
@SuppressWarnings("unchecked")
protected void checkDeployXmlWatchDog(WatchDog ddWatchDog) {
ddWatchDog.check();
}
protected void disposeDeployXmlWatchDog(File deployDir) {
dDWatchDogsByPath.remove(new File(deployDir, "deploy.xml").getAbsolutePath());
}
protected SystemSchedulesConfig createSystemSchedulesConfig(File configRoot) {
return new SystemSchedulesConfig(configRoot);
}
@SuppressWarnings("unchecked")
protected WatchDog createSystemCronConfigWatchDog(final CronScheduler cronScheduler) {
return WatchDog.watchFile(_systemSchedulesConf.getSchedulesFile(),
new WatchDog.DefaultObserver() {
public void init() {
cronScheduler.refreshSystemCronJobs(_systemSchedulesConf);
}
});
}
@SuppressWarnings("unchecked")
protected void checkSystemCronConfigWatchDog(WatchDog ddWatchDog) {
ddWatchDog.check();
}
/**
* Thread that does the actual polling for new files.
*/
private class PollingThread extends Thread {
private boolean _active = true;
public PollingThread() {
setName("DeploymentPoller");
}
/** Stop this poller, and block until it terminates. */
void kill() {
synchronized (this) {
_active = false;
this.notifyAll();
}
try {
join();
} catch (InterruptedException ie) {
__log.error("Thread unexpectedly interrupted.", ie);
}
}
public void run() {
try {
while (_active) {
if (!_onHold)
check();
synchronized (this) {
try {
this.wait(POLL_TIME);
} catch (InterruptedException e) {
}
}
}
} catch (Throwable t) {
__log.error("Encountered an unexpected error. Exiting poller...", t);
}
}
}
public void hold() {
_onHold = true;
}
public void release() {
_onHold = false;
}
public void markAsDeployed(File file) {
File deployedMarker = new File(_deployDir, file.getName() + ".deployed");
try {
boolean isCreated = deployedMarker.createNewFile();
if (!isCreated) {
__log.error("Error while creating file " + file.getName()
+ ".deployed ,deployment could be inconsistent");
}
} catch (IOException e) {
__log.error("Couldn't create marker file for " + file.getName());
}
}
public void markAsUndeployed(File file) {
File deployedMarker = new File(_deployDir, file.getName() + ".deployed");
boolean isDeleted = deployedMarker.delete();
if (!isDeleted) {
__log
.error("Error while deleting file "
+ file.getName()
+ ".deployed , please check if file is locked or if it really exist");
}
}
@SuppressWarnings("unchecked")
protected class DDWatchDogObserver extends WatchDog.DefaultObserver {
private String deploymentPakage;
public DDWatchDogObserver(String deploymentPakage) {
this.deploymentPakage = deploymentPakage;
}
public void init() {
_odeServer.getProcessStore().refreshSchedules(deploymentPakage);
}
}
/**
* Use to acquire the lock by poller
*/
private boolean pollerTryLock(String key) {
if(clusterEnabled) {
ClusterLock clusterLock = _odeServer.getBpelServer().getContexts().clusterManager.getDeploymentLock();
clusterLock.putIfAbsent(key,key);
return clusterLock.tryLock(key);
}
else return true;
}
private boolean unlock(String key) {
if (clusterEnabled) {
_odeServer.getBpelServer().getContexts().clusterManager.getDeploymentLock().unlock(key);
}
return true;
}
}