blob: 19c9eb566bcfdb6cc22a2537f9342e80bc04946f [file] [log] [blame]
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activeio.journal.howl;
import java.io.IOException;
import java.io.InterruptedIOException;
import org.apache.activeio.journal.InvalidRecordLocationException;
import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.JournalEventListener;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.Packet;
import org.objectweb.howl.log.Configuration;
import org.objectweb.howl.log.InvalidFileSetException;
import org.objectweb.howl.log.InvalidLogBufferException;
import org.objectweb.howl.log.InvalidLogKeyException;
import org.objectweb.howl.log.LogConfigurationException;
import org.objectweb.howl.log.LogEventListener;
import org.objectweb.howl.log.LogRecord;
import org.objectweb.howl.log.Logger;
/**
* An implementation of the Journal interface using a HOWL logger. This is is a thin
* wrapper around a HOWL logger.
*
* This implementation can be used to write records but not to retreive them
* yet. Once the HOWL logger implements the methods needed to retreive
* previously stored records, this class can be completed.
*
* @version $Revision: 1.2 $
*/
public class HowlJournal implements Journal {
private final Logger logger;
private RecordLocation lastMark;
public HowlJournal(Configuration configuration)
throws InvalidFileSetException, LogConfigurationException,
InvalidLogBufferException, ClassNotFoundException, IOException,
InterruptedException {
this.logger = new Logger(configuration);
this.logger.open();
lastMark = new LongRecordLocation(logger.getActiveMark());
}
/**
* @see org.apache.activeio.journal.Journal#write(byte[], boolean)
*/
public RecordLocation write(Packet packet, boolean sync) throws IOException {
try {
return new LongRecordLocation(logger.put(packet.sliceAsBytes(), sync));
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException()
.initCause(e);
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw (IOException) new IOException("Journal write failed: " + e)
.initCause(e);
}
}
/**
* @see org.apache.activeio.journal.Journal#setMark(org.codehaus.activemq.journal.RecordLocation, boolean)
*/
public void setMark(RecordLocation recordLocator, boolean force)
throws InvalidRecordLocationException, IOException {
try {
long location = toLong(recordLocator);
logger.mark(location, force);
lastMark = recordLocator;
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException()
.initCause(e);
} catch (IOException e) {
throw e;
} catch (InvalidLogKeyException e) {
throw new InvalidRecordLocationException(e.getMessage(), e);
} catch (Exception e) {
throw (IOException) new IOException("Journal write failed: " + e)
.initCause(e);
}
}
/**
* @param recordLocator
* @return
* @throws InvalidRecordLocationException
*/
private long toLong(RecordLocation recordLocator) throws InvalidRecordLocationException {
if (recordLocator == null
|| recordLocator.getClass() != LongRecordLocation.class)
throw new InvalidRecordLocationException();
long location = ((LongRecordLocation) recordLocator)
.getLongLocation();
return location;
}
/**
* @see org.apache.activeio.journal.Journal#getMark()
*/
public RecordLocation getMark() {
return lastMark;
}
/**
* @see org.apache.activeio.journal.Journal#close()
*/
public void close() throws IOException {
try {
logger.close();
} catch (IOException e) {
throw e;
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException()
.initCause(e);
} catch (Exception e) {
throw (IOException) new IOException("Journal close failed: " + e)
.initCause(e);
}
}
/**
* @see org.apache.activeio.journal.Journal#setJournalEventListener(org.codehaus.activemq.journal.JournalEventListener)
*/
public void setJournalEventListener(final JournalEventListener eventListener) {
logger.setLogEventListener(new LogEventListener() {
public void logOverflowNotification(long key) {
eventListener.overflowNotification(new LongRecordLocation(key));
}
});
}
/**
* @see org.apache.activeio.journal.Journal#getNextRecordLocation(org.codehaus.activemq.journal.RecordLocation)
*/
public RecordLocation getNextRecordLocation(RecordLocation lastLocation)
throws InvalidRecordLocationException {
if( lastLocation ==null ) {
if( this.lastMark !=null ) {
lastLocation = lastMark;
} else {
return null;
}
}
try {
while(true) {
LogRecord record = logger.get(null, toLong(lastLocation));
// I assume getNext will return null if there is no next record.
LogRecord next = logger.getNext(record);
if( next==null || next.length == 0 )
return null;
lastLocation = new LongRecordLocation(next.key);
if( !next.isCTRL() )
return lastLocation;
}
} catch (Exception e) {
throw (InvalidRecordLocationException)new InvalidRecordLocationException().initCause(e);
}
}
/**
* @see org.apache.activeio.journal.Journal#read(org.codehaus.activemq.journal.RecordLocation)
*/
public Packet read(RecordLocation location)
throws InvalidRecordLocationException, IOException {
try {
LogRecord record = logger.get(null, toLong(location));
return new ByteArrayPacket(record.data);
} catch (InvalidLogKeyException e) {
throw new InvalidRecordLocationException(e.getMessage(), e);
} catch (Exception e) {
throw (IOException) new IOException("Journal write failed: " + e)
.initCause(e);
}
}
}