blob: 4cd4b8252112f8c1906832f37840924c6faacdd0 [file] [log] [blame]
/*
* Copyright 2009-2012 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.asterix.event.management;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import edu.uci.ics.asterix.event.driver.EventDriver;
import edu.uci.ics.asterix.event.schema.cluster.Cluster;
import edu.uci.ics.asterix.event.schema.cluster.Node;
import edu.uci.ics.asterix.event.schema.event.Events;
import edu.uci.ics.asterix.event.schema.pattern.Event;
import edu.uci.ics.asterix.event.schema.pattern.Nodeid;
import edu.uci.ics.asterix.event.schema.pattern.Pattern;
import edu.uci.ics.asterix.event.schema.pattern.Patterns;
import edu.uci.ics.asterix.event.schema.pattern.Value;
public class EventrixClient {
private static final Logger LOGGER = Logger.getLogger(EventrixClient.class
.getName());
private EventTask[] tasks;
private boolean dryRun = false;
private LinkedBlockingQueue<EventTaskReport> msgInbox = new LinkedBlockingQueue<EventTaskReport>();
private AtomicInteger pendingTasks = new AtomicInteger(0);
private final Cluster cluster;
private IPatternListener listener;
private IOutputHandler outputHandler;
private Events events;
private String eventsDir;
public EventrixClient(String eventsDir, Cluster cluster, boolean dryRun,
IOutputHandler outputHandler) throws Exception {
this.eventsDir = eventsDir;
this.events = initializeEvents();
this.cluster = cluster;
this.dryRun = dryRun;
this.outputHandler = outputHandler;
if (!dryRun) {
initializeCluster(eventsDir);
}
}
public void submit(Patterns patterns) throws Exception {
initTasks(patterns);
try {
waitForCompletion();
} catch (InterruptedException ie) {
LOGGER.info("Interrupted exception :" + ie);
} catch (Exception e) {
throw e;
}
}
public void submit(Patterns patterns, IPatternListener listener)
throws Exception {
this.listener = listener;
initTasks(patterns);
}
private void initTasks(Patterns patterns) {
tasks = new EventTask[patterns.getPattern().size()];
pendingTasks.set(tasks.length);
int index = 0;
for (Pattern pattern : patterns.getPattern()) {
tasks[index] = new EventTask(pattern, this);
tasks[index].start();
index++;
}
}
public Cluster getCluster() {
return cluster;
}
public boolean isDryRun() {
return dryRun;
}
public Events getEvents() {
return events;
}
public String getEventsDir() {
return eventsDir;
}
public synchronized void notifyCompletion(EventTaskReport report) {
if (report.isSuccess()) {
if (listener != null) {
pendingTasks.decrementAndGet();
listener.eventCompleted(report);
if (pendingTasks.get() == 0) {
listener.jobCompleted();
}
} else {
try {
msgInbox.put(report);
} catch (InterruptedException e) {
}
}
} else {
for (EventTask t : tasks) {
if (t.getState() == EventTask.State.INITIALIZED
|| t.getState() == EventTask.State.IN_PROGRESS) {
t.cancel();
}
}
if (listener != null) {
listener.jobFailed(report);
} else {
try {
msgInbox.put(report);
} catch (InterruptedException e) {
}
}
}
}
private void waitForCompletion() throws Exception {
while (true) {
EventTaskReport report = msgInbox.take();
if (report.isSuccess()) {
if (pendingTasks.decrementAndGet() == 0) {
break;
}
} else {
throw new RuntimeException(report.getException().getMessage());
}
}
}
private void initializeCluster(String eventsDir) throws Exception {
Patterns patterns = initPattern(eventsDir);
submit(patterns);
}
private Patterns initPattern(String eventsDir) {
Nodeid nodeid = new Nodeid(new Value(null,
EventDriver.CLIENT_NODE.getId()));
List<Pattern> patternList = new ArrayList<Pattern>();
String workingDir = cluster.getWorkingDir().getDir();
String username = cluster.getUsername() == null ? System
.getProperty("user.name") : cluster.getUsername();
patternList.add(getDirectoryTransferPattern(username, eventsDir,
nodeid, cluster.getMasterNode().getIp(), workingDir));
if (!cluster.getWorkingDir().isNFS()) {
for (Node node : cluster.getNode()) {
patternList.add(getDirectoryTransferPattern(username,
eventsDir, nodeid, node.getIp(), workingDir));
}
}
Patterns patterns = new Patterns(patternList);
return patterns;
}
private Pattern getDirectoryTransferPattern(String username, String src,
Nodeid srcNode, String destNodeIp, String destDir) {
String pargs = username + " " + src + " " + destNodeIp + " " + destDir;
Event event = new Event("directory_transfer", srcNode, pargs);
return new Pattern(null, 1, null, event);
}
public IOutputHandler getErrorHandler() {
return outputHandler;
}
private Events initializeEvents() throws JAXBException,
FileNotFoundException {
File file = new File(eventsDir + File.separator + "events"
+ File.separator + "events.xml");
JAXBContext eventCtx = JAXBContext.newInstance(Events.class);
Unmarshaller unmarshaller = eventCtx.createUnmarshaller();
events = (Events) unmarshaller.unmarshal(file);
return events;
}
}