blob: 4cdc3dd19e96479385113d89b551a16ed476c260 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oodt.commons.activity;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InvalidClassException;
import java.io.ObjectInputStream;
import java.io.OptionalDataException;
import java.io.StreamCorruptedException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.util.HashMap;
import java.util.Map;
/**
* The datagram logger accepts incidents in UDP datagrams and saves them in a {@link
* Storage}. The datagrams contain serialized {@link Incident}s, and are probably sent by
* a {@link DatagramLoggingActivityFactory}.
*
* @author Kelly
* @version $Revision: 1.1 $
*/
class DatagramLogger {
public static void main(String[] argv) throws Throwable {
if (argv.length > 0) {
System.err.println("This program takes NO command line arguments.");
System.err.println("Set the activity.port property to adjust the port number.");
System.err.println("Set the activity.storage property to set the Storage class to use.");
System.exit(1);
}
int port = Integer.getInteger("activity.port", 4556).intValue();
String className = System.getProperty("activity.storage");
if (className == null) {
System.err.println("No Storage class defined via the `activity.storage' property; exiting...");
System.exit(1);
}
Class storageClass = Class.forName(className);
storage = (Storage) storageClass.newInstance();
DatagramSocket socket = new DatagramSocket(port);
byte[] buf = new byte[2048];
DatagramPacket packet = new DatagramPacket(buf, buf.length);
for (;;) {
socket.receive(packet);
byte[] received = new byte[packet.getLength()];
System.arraycopy(packet.getData(), packet.getOffset(), received, 0, packet.getLength());
new ReceiverThread(received).start();
}
}
/** Long term storage for incidents. */
private static Storage storage;
/** History awaiting long-term storage. Keys are {@link String} activity IDs, and values are {@link History} objects. */
private static Map histories = new HashMap();
/**
* Thread that saves off an incident into a history so the main thread can go back
* to receiving more datagrams.
*/
private static class ReceiverThread extends Thread {
/**
* Creates a new {@link ReceiverThread} instance.
*
* @param data Copy of bytes received in a datagram.
*/
private ReceiverThread(byte[] data) {
this.data = data;
}
/**
* Reconstitute the incident in the received byte array and store it into
* a {@link History}.
*/
public void run() {
try {
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data));
Incident incident = (Incident) in.readObject();
String id = incident.getActivityID();
in.close();
History history;
synchronized (histories) {
history = (History) histories.get(id);
if (history == null) {
histories.put(id, new History(incident, storage));
return;
}
}
history.addIncident(incident);
} catch (ClassNotFoundException ex) {
System.err.println("Dropping Incident of unknown class: " + ex.getMessage());
} catch (InvalidClassException ex) {
System.err.println("Dropping Incident of invalid class: " + ex.getMessage());
} catch (StreamCorruptedException ex) {
System.err.println("Unable to read Incident from packet: " + ex.getMessage());
} catch (OptionalDataException ex) {
System.err.println("Primitive data instead of Incident in packet: " + ex.getMessage());
} catch (IOException ex) {
throw new IllegalStateException("Unexpected IOException: " + ex.getMessage());
}
}
/** Bytes received in a datagram. */
private byte[] data;
}
}