blob: b13fe4197294f1509ea8963c4da1dad63ca8c3d8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.apollo.broker.store.leveldb
import java.{lang=>jl}
import java.{util=>ju}
import org.fusesource.hawtbuf.proto.PBMessageFactory
import org.apache.activemq.apollo.broker.store._
import java.io._
import java.util.concurrent.TimeUnit
import org.apache.activemq.apollo.util._
import java.util.concurrent.locks.ReentrantReadWriteLock
import org.fusesource.hawtdispatch._
import org.apache.activemq.apollo.util.{TreeMap=>ApolloTreeMap}
import collection.immutable.TreeMap
import org.fusesource.leveldbjni.internal.Util
import org.apache.activemq.apollo.broker.Broker
import org.apache.activemq.apollo.util.ProcessSupport._
import collection.mutable.{HashMap, ListBuffer}
import org.apache.activemq.apollo.dto.JsonCodec
import org.iq80.leveldb._
import org.apache.activemq.apollo.broker.store.leveldb.RecordLog.LogInfo
import org.apache.activemq.apollo.broker.store.PBSupport
import java.util.concurrent.atomic.AtomicReference
import org.fusesource.hawtbuf.{AsciiBuffer, Buffer, AbstractVarIntSupport}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
object LevelDBClient extends Log {
final val STORE_SCHEMA_PREFIX = "leveldb_store:"
final val STORE_SCHEMA_VERSION = 1
final val queue_prefix = 'q'.toByte
final val queue_entry_prefix = 'e'.toByte
final val map_prefix = 'p'.toByte
final val tmp_prefix = 't'.toByte
final val queue_prefix_array = Array(queue_prefix)
final val map_prefix_array = Array(map_prefix)
final val queue_entry_prefix_array = Array(queue_entry_prefix)
final val dirty_index_key = bytes(":dirty")
final val log_refs_index_key = bytes(":log-refs")
final val TRUE = bytes("true")
final val FALSE = bytes("false")
final val LOG_ADD_QUEUE = 1.toByte
final val LOG_REMOVE_QUEUE = 2.toByte
final val LOG_ADD_MESSAGE = 3.toByte
final val LOG_REMOVE_MESSAGE = 4.toByte
final val LOG_ADD_QUEUE_ENTRY = 5.toByte
final val LOG_REMOVE_QUEUE_ENTRY = 6.toByte
final val LOG_MAP_ENTRY = 7.toByte
final val LOG_SUFFIX = ".log"
final val INDEX_SUFFIX = ".index"
def bytes(value:String) = value.getBytes("UTF-8")
import FileSupport._
def create_sequence_file(directory:File, id:Long, suffix:String) = directory / ("%016x%s".format(id, suffix))
def find_sequence_files(directory:File, suffix:String):TreeMap[Long, File] = {
TreeMap((directory.list_files.flatMap { f=>
if( f.getName.endsWith(suffix) ) {
try {
val base = f.getName.stripSuffix(suffix)
val position = java.lang.Long.parseLong(base, 16);
Some(position -> f)
} catch {
case e:NumberFormatException => None
}
} else {
None
}
}): _* )
}
val on_windows = System.getProperty("os.name").toLowerCase().startsWith("windows")
var link_strategy = 0
def link(source:File, target:File):Unit = {
link_strategy match {
case 0 =>
// We first try to link via a native system call. Fails if
// we cannot load the JNI module.
try {
Util.link(source, target)
} catch {
case e:IOException => throw e
case e:Throwable =>
// Fallback.. to a slower impl..
debug("Native link system call not available")
link_strategy = 2
link(source, target)
}
// TODO: consider implementing a case which does the native system call using JNA
case 2 =>
// Next try JNA (might not be in classpath)
try {
IOHelper.hardlink(source, target)
} catch {
case e:IOException => throw e
case e:Throwable =>
// Fallback.. to a slower impl..
debug("JNA based hard link system call not available")
link_strategy = 5
link(source, target)
}
case 5 =>
// Next we try to do the link by executing an
// operating system shell command
try {
if( on_windows ) {
system("fsutil", "hardlink", "create", target.getCanonicalPath, source.getCanonicalPath) match {
case(0, _, _) => // Success
case (_, out, err) =>
// TODO: we might want to look at the out/err to see why it failed
// to avoid falling back to the slower strategy.
debug("fsutil OS command not available either")
link_strategy = 10
link(source, target)
}
} else {
system("ln", source.getCanonicalPath, target.getCanonicalPath) match {
case(0, _, _) => // Success
case (_, out, err) => None
// TODO: we might want to look at the out/err to see why it failed
// to avoid falling back to the slower strategy.
debug("ln OS command not available either")
link_strategy = 10
link(source, target)
}
}
} catch {
case e:Throwable =>
}
case _ =>
// this final strategy is slow but sure to work.
source.copy_to(target)
}
}
}
/**
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
class LevelDBClient(store: LevelDBStore) {
import HelperTrait._
import LevelDBClient._
import FileSupport._
def dispatchQueue = store.dispatch_queue
implicit def toByteArray(buf:Buffer):Array[Byte] = buf.toByteArray
implicit def toBuffer(buf:Array[Byte]):Buffer = new Buffer(buf)
/////////////////////////////////////////////////////////////////////
//
// Helpers
//
/////////////////////////////////////////////////////////////////////
def config = store.config
def directory = config.directory
/////////////////////////////////////////////////////////////////////
//
// Public interface used by the LevelDBStore
//
/////////////////////////////////////////////////////////////////////
var sync = false;
var verify_checksums = false;
var log:RecordLog = _
var index:RichDB = _
var index_options:Options = _
var last_index_snapshot_ts = System.currentTimeMillis()
var last_index_snapshot_pos:Long = _
val snapshot_rw_lock = new ReentrantReadWriteLock(true)
var factory:DBFactory = _
val log_refs = HashMap[Long, LongCounter]()
def dirty_index_file = directory / ("dirty"+INDEX_SUFFIX)
def temp_index_file = directory / ("temp"+INDEX_SUFFIX)
def snapshot_index_file(id:Long) = create_sequence_file(directory,id, INDEX_SUFFIX)
def create_log: RecordLog = {
new RecordLog(directory, LOG_SUFFIX)
}
def log_size = {
Option(config.log_size).map(MemoryPropertyEditor.parse(_)).map{size=>
if(size == MemoryPropertyEditor.parse("2G")) {
Int.MaxValue // which is 2G - 1 (close enough!)
} else if(size > Int.MaxValue) {
warn("leveldb log_size was configured to be '"+config.log_size+"' but the maximum supported log size is 2G")
Int.MaxValue
} else {
size.toInt
}
}.getOrElse(1024 * 1024 * 100)
}
def start() = {
import OptionSupport._
val factory_names = Option(config.index_factory).getOrElse("org.fusesource.leveldbjni.JniDBFactory, org.iq80.leveldb.impl.Iq80DBFactory")
factory = factory_names.split("""(,|\s)+""").map(_.trim()).flatMap { name=>
try {
Some(Broker.class_loader.loadClass(name).newInstance().asInstanceOf[DBFactory])
} catch {
case x:Throwable =>
None
}
}.headOption.getOrElse(throw new Exception("Could not load any of the index factory classes: "+factory_names))
if( factory.getClass.getName == "org.iq80.leveldb.impl.Iq80DBFactory") {
warn("Using the pure java LevelDB implementation which is still experimental. If the JNI version is not available for your platform, please switch to the BDB store instead. http://activemq.apache.org/apollo/documentation/user-manual.html#BDB_Store")
}
sync = config.sync.getOrElse(true);
verify_checksums = config.verify_checksums.getOrElse(false);
index_options = new Options();
index_options.createIfMissing(true);
val paranoid_checks = config.paranoid_checks.getOrElse(false)
config.index_max_open_files.foreach( index_options.maxOpenFiles(_) )
config.index_block_restart_interval.foreach( index_options.blockRestartInterval(_) )
index_options.paranoidChecks(paranoid_checks)
Option(config.index_write_buffer_size).map(MemoryPropertyEditor.parse(_).toInt).foreach( index_options.writeBufferSize(_) )
Option(config.index_block_size).map(MemoryPropertyEditor.parse(_).toInt).foreach( index_options.blockSize(_) )
Option(config.index_compression).foreach(x => index_options.compressionType( x match {
case "snappy" => CompressionType.SNAPPY
case "none" => CompressionType.NONE
case _ => CompressionType.SNAPPY
}) )
index_options.cacheSize(Option(config.index_cache_size).map(MemoryPropertyEditor.parse(_).toLong).getOrElse(1024*1024*256L))
index_options.logger(new Logger() {
def log(msg: String) = trace(msg.stripSuffix("\n"))
})
log = create_log
log.sync = sync
log.logSize = log_size
log.verify_checksums = verify_checksums
log.on_log_rotate = ()=> {
// lets queue a request to checkpoint when
// the logs rotate.. queue it on the GC thread since GC's lock
// the index for a long time.
store.write_executor {
snapshot_index
}
}
lock_file = new LockFile(directory / "lock", true)
def time[T](func: =>T):Long = {
val start = System.nanoTime()
func
System.nanoTime() - start
}
// Lock before we open anything..
lock_store
// Lets check store compatibility...
val version_file = directory / "store-version.txt"
if( version_file.exists( ) ) {
val ver = try {
var tmp: String = version_file.read_text().trim()
if( tmp.startsWith(STORE_SCHEMA_PREFIX) ) {
tmp.stripPrefix(STORE_SCHEMA_PREFIX).toInt
} else {
-1
}
} catch {
case e => throw new Exception("Unexpected version file format: "+version_file)
}
ver match {
case STORE_SCHEMA_VERSION => // All is good.
case _ => throw new Exception("Cannot open the store. It's schema version is not supported.")
}
}
version_file.write_text(STORE_SCHEMA_PREFIX+STORE_SCHEMA_VERSION)
val log_open_duration = time {
retry {
log.open
}
}
info("Opening the log file took: %.2f ms", (log_open_duration/TimeUnit.MILLISECONDS.toNanos(1).toFloat))
// Find out what was the last snapshot.
val snapshots = find_sequence_files(directory, INDEX_SUFFIX)
var last_snapshot_index = snapshots.lastOption
last_index_snapshot_pos = last_snapshot_index.map(_._1).getOrElse(0)
// Only keep the last snapshot..
snapshots.filterNot(_._1 == last_index_snapshot_pos).foreach( _._2.recursive_delete )
temp_index_file.recursive_delete // usually does not exist.
retry {
// Delete the dirty indexes
dirty_index_file.recursive_delete
dirty_index_file.mkdirs()
last_snapshot_index.foreach { case (id, file) =>
// Resume log replay from a snapshot of the index..
try {
file.list_files.foreach { file =>
link(file, dirty_index_file / file.getName)
}
} catch {
case e:Exception =>
warn(e, "Could not recover snapshot of the index: "+e)
last_snapshot_index = None
}
}
index = new RichDB(factory.open(dirty_index_file, index_options));
try {
load_log_refs
index.put(dirty_index_key, TRUE)
if( paranoid_checks ) {
check_index_integrity(index)
}
// Update the index /w what was stored on the logs..
var pos = last_index_snapshot_pos;
var replay_operations = 0
val log_replay_duration = time {
while (pos < log.appender_limit) {
log.read(pos).map {
case (kind, data, next_pos) =>
kind match {
case LOG_ADD_MESSAGE =>
case LOG_ADD_QUEUE_ENTRY =>
replay_operations+=1
val record = QueueEntryPB.FACTORY.parseFramed(data)
val pos = decode_vlong(record.getMessageLocator)
index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), data)
pos.foreach(log_ref_increment(_))
case LOG_REMOVE_QUEUE_ENTRY =>
replay_operations+=1
index.get(data, new ReadOptions).foreach { value=>
val record = QueueEntryPB.FACTORY.parseFramed(value)
val pos = decode_vlong(record.getMessageLocator)
pos.foreach(log_ref_decrement(_))
index.delete(data)
}
case LOG_ADD_QUEUE =>
replay_operations+=1
val record = QueuePB.FACTORY.parseFramed(data)
index.put(encode_key(queue_prefix, record.getKey), data)
case LOG_REMOVE_QUEUE =>
replay_operations+=1
val ro = new ReadOptions
ro.fillCache(false)
ro.verifyChecksums(verify_checksums)
val queue_key = decode_vlong(data)
index.delete(encode_key(queue_prefix, queue_key))
index.cursor_prefixed(encode_key(queue_entry_prefix, queue_key), ro) { (key, value)=>
index.delete(key)
// Figure out what log file that message entry was in so we can,
// decrement the log file reference.
val record = QueueEntryPB.FACTORY.parseFramed(value)
val pos = decode_vlong(record.getMessageLocator)
log_ref_decrement(pos)
true
}
case LOG_MAP_ENTRY =>
replay_operations+=1
val entry = MapEntryPB.FACTORY.parseFramed(data)
if (entry.getValue == null) {
index.delete(encode_key(map_prefix, entry.getKey))
} else {
index.put(encode_key(map_prefix, entry.getKey), entry.getValue.toByteArray)
}
case _ =>
// Skip unknown records
}
pos = next_pos
}
}
if(replay_operations > 0) {
snapshot_index
}
}
info("Took %.2f second(s) to recover %d operations in the log file.", (log_replay_duration/TimeUnit.SECONDS.toNanos(1).toFloat), replay_operations)
} catch {
case e:Throwable =>
// replay failed.. good thing we are in a retry block...
index.close
throw e;
}
}
}
def check_index_integrity(index:RichDB) = {
val actual_log_refs = HashMap[Long, LongCounter]()
var referenced_queues = Set[Long]()
// Lets find out what the queue entries are..
var fixed_records = 0
index.cursor_prefixed(queue_entry_prefix_array) { (key, value)=>
try {
val (_, queue_key, seq_key) = decode_long_long_key(key)
val record = QueueEntryPB.FACTORY.parseFramed(value)
val (pos, len) = decode_locator(record.getMessageLocator)
if (record.getQueueKey != queue_key) {
throw new IOException("key missmatch")
}
if (record.getQueueSeq != seq_key) {
throw new IOException("key missmatch")
}
log.log_info(pos).foreach {
log_info =>
actual_log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
}
referenced_queues += queue_key
} catch {
case e =>
trace("invalid queue entry record: %s, error: %s", new Buffer(key), e)
fixed_records += 1
// Invalid record.
index.delete(key)
}
true
}
// Lets cross check the queues.
index.cursor_prefixed(queue_prefix_array) { (key, value)=>
try {
val (_, queue_key) = decode_long_key(key)
val record = QueuePB.FACTORY.parseFramed(value)
if (record.getKey != queue_key) {
throw new IOException("key missmatch")
}
referenced_queues -= queue_key
} catch {
case e =>
trace("invalid queue record: %s, error: %s", new Buffer(key), e)
fixed_records += 1
// Invalid record.
index.delete(key)
}
true
}
referenced_queues.foreach { queue_key=>
// We have queue entries for a queue that does not exist..
index.cursor_prefixed(encode_key(queue_entry_prefix, queue_key)) { (key, value)=>
trace("invalid queue entry record: %s, error: queue key does not exits %s", new Buffer(key), queue_key)
fixed_records += 1
index.delete(key)
val record = QueueEntryPB.FACTORY.parseFramed(value)
val pos = decode_vlong(record.getMessageLocator)
log.log_info(pos).foreach { log_info =>
actual_log_refs.get(log_info.position).foreach { counter =>
if (counter.decrementAndGet() == 0) {
actual_log_refs.remove(log_info.position)
}
}
}
true
}
}
if( actual_log_refs != log_refs ) {
debug("expected != actual log references. expected: %s, actual %s", log_refs, actual_log_refs)
log_refs.clear()
log_refs ++= actual_log_refs
}
if( fixed_records > 0 ) {
warn("Fixed %d invalid index enties in the leveldb store", fixed_records)
}
}
var lock_file:LockFile = _
def lock_store = {
import OptionSupport._
if (config.fail_if_locked.getOrElse(false)) {
lock_file.lock()
} else {
retry {
lock_file.lock()
}
}
}
def unlock_store = {
lock_file.unlock()
}
private def store_log_refs = {
index.put(log_refs_index_key, JsonCodec.encode(collection.JavaConversions.mapAsJavaMap(log_refs.mapValues(_.get()))).toByteArray)
}
private def load_log_refs = {
log_refs.clear()
index.get(log_refs_index_key, new ReadOptions).foreach { value=>
val javamap = JsonCodec.decode(new Buffer(value), classOf[java.util.Map[String, Object]])
collection.JavaConversions.mapAsScalaMap(javamap).foreach { case (k,v)=>
log_refs.put(k.toLong, new LongCounter(v.asInstanceOf[Number].longValue()))
}
}
}
def stop() = {
// this blocks until all io completes..
// Suspend also deletes the index.
suspend()
if (log != null) {
log.close
}
copy_dirty_index_to_snapshot
log = null
unlock_store
}
def using_index[T](func: =>T):T = {
val lock = snapshot_rw_lock.readLock();
lock.lock()
try {
func
} finally {
lock.unlock()
}
}
def retry_using_index[T](func: =>T):T = retry(using_index( func ))
/**
* TODO: expose this via management APIs, handy if you want to
* do a file system level snapshot and want the data to be consistent.
*/
def suspend() = {
// Make sure we are the only ones accessing the index. since
// we will be closing it to create a consistent snapshot.
snapshot_rw_lock.writeLock().lock()
// Close the index so that it's files are not changed async on us.
store_log_refs
index.put(dirty_index_key, FALSE, new WriteOptions().sync(true))
index.close
}
/**
* TODO: expose this via management APIs, handy if you want to
* do a file system level snapshot and want the data to be consistent.
*/
def resume() = {
// re=open it..
retry {
index = new RichDB(factory.open(dirty_index_file, index_options));
index.put(dirty_index_key, TRUE)
}
snapshot_rw_lock.writeLock().unlock()
}
def copy_dirty_index_to_snapshot {
if( log.appender_limit == last_index_snapshot_pos ) {
// no need to snapshot again...
return
}
// Where we start copying files into. Delete this on
// restart.
val tmp_dir = temp_index_file
tmp_dir.mkdirs()
try {
// Hard link all the index files.
dirty_index_file.list_files.foreach { file =>
link(file, tmp_dir / file.getName)
}
// Rename to signal that the snapshot is complete.
val new_snapshot_index_pos = log.appender_limit
tmp_dir.renameTo(snapshot_index_file(new_snapshot_index_pos))
snapshot_index_file(last_index_snapshot_pos).recursive_delete
last_index_snapshot_pos = new_snapshot_index_pos
last_index_snapshot_ts = System.currentTimeMillis()
} catch {
case e: Exception =>
// if we could not snapshot for any reason, delete it as we don't
// want a partial check point..
warn(e, "Could not snapshot the index: " + e)
tmp_dir.recursive_delete
}
}
def snapshot_index:Unit = {
if( log.appender_limit == last_index_snapshot_pos ) {
// no need to snapshot again...
return
}
suspend()
try {
copy_dirty_index_to_snapshot
} finally {
resume()
}
}
def retry[T](func: => T): T = {
var error:Throwable = null
var rc:Option[T] = None
// We will loop until the tx succeeds. Perhaps it's
// failing due to a temporary condition like low disk space.
while(!rc.isDefined) {
try {
rc = Some(func)
} catch {
case e:Throwable =>
if( error==null ) {
warn(e, "DB operation failed. (entering recovery mode): "+e)
}
error = e
}
if (!rc.isDefined) {
// We may need to give up if the store is being stopped.
if ( !store.service_state.is_starting_or_started ) {
throw error
}
Thread.sleep(1000)
}
}
if( error!=null ) {
info("DB recovered from failure.")
}
rc.get
}
def purge() = {
suspend()
try{
log.close
directory.list_files.foreach(_.recursive_delete)
log_refs.clear()
} finally {
retry {
log.open
}
resume()
}
}
def add_queue(record: QueueRecord, callback:Runnable) = {
retry_using_index {
log.appender { appender =>
val value:Buffer = PBSupport.encode_queue_record(record)
appender.append(LOG_ADD_QUEUE, value)
index.put(encode_key(queue_prefix, record.key), value)
}
}
callback.run
}
def log_ref_decrement(pos: Long) = this.synchronized {
log.log_info(pos) match {
case Some(log_info)=>
log_refs.get(log_info.position).foreach { counter =>
val count = counter.decrementAndGet()
if (count == 0) {
log_refs.remove(log_info.position)
}
}
case None =>
warn("Invalid log position: "+pos)
}
}
def log_ref_increment(pos: Long) = this.synchronized {
log.log_info(pos) match {
case Some(log_info)=>
val count = log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
case None =>
warn("Invalid log position: "+pos)
}
}
def remove_queue(queue_key: Long, callback:Runnable) = {
retry_using_index {
log.appender { appender =>
val ro = new ReadOptions
ro.fillCache(false)
ro.verifyChecksums(verify_checksums)
appender.append(LOG_REMOVE_QUEUE, encode_vlong(queue_key))
index.delete(encode_key(queue_prefix, queue_key))
index.cursor_prefixed(encode_key(queue_entry_prefix, queue_key), ro) { (key, value)=>
index.delete(key)
// Figure out what log file that message entry was in so we can,
// decrement the log file reference.
val record = QueueEntryPB.FACTORY.parseFramed(value)
val pos = decode_vlong(record.getMessageLocator)
log_ref_decrement(pos)
true
}
}
}
callback.run
}
def store(uows: Seq[LevelDBStore#DelayableUOW], callback:Runnable) {
retry_using_index {
log.appender { appender =>
var sync_needed = false
index.write() { batch =>
uows.foreach { uow =>
for((key,value) <- uow.map_actions) {
val entry = new MapEntryPB.Bean()
entry.setKey(key)
if( value==null ) {
batch.delete(encode_key(map_prefix, key))
} else {
entry.setValue(value)
batch.put(encode_key(map_prefix, key), value.toByteArray)
}
appender.append(LOG_MAP_ENTRY, entry.freeze().toFramedByteArray)
}
uow.actions.foreach { case (msg, action) =>
val message_record = action.message_record
var locator:(Long, Int) = null
if (message_record != null) {
val message_data = PBSupport.encode_message_record(message_record)
val len = message_data.length
val pos = appender.append(LOG_ADD_MESSAGE, message_data)
locator = (pos, len)
message_record.locator.set(locator);
}
action.dequeues.foreach { entry =>
if( locator==null ) {
locator = entry.message_locator.get().asInstanceOf[(Long, Int)]
}
assert(locator!=null)
val (pos, len) = locator
val key = encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq)
appender.append(LOG_REMOVE_QUEUE_ENTRY, key)
batch.delete(key)
log_ref_decrement(pos)
}
var locator_buffer:Buffer = null
action.enqueues.foreach { entry =>
assert(locator!=null)
val (pos, len) = locator
entry.message_locator.set(locator)
if ( locator_buffer==null ) {
locator_buffer = encode_locator(pos, len)
}
val record = PBSupport.to_pb(entry)
record.setMessageLocator(locator_buffer)
val encoded = record.freeze().toFramedBuffer
appender.append(LOG_ADD_QUEUE_ENTRY, encoded)
batch.put(encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq), encoded)
// Increment it.
log_ref_increment(pos)
}
}
if( uow.flush_sync ) {
sync_needed = true
}
}
}
if( sync_needed && sync ) {
appender.flush
appender.force
}
}
}
callback.run
}
val metric_load_from_index_counter = new TimeCounter
var metric_load_from_index = metric_load_from_index_counter(false)
def loadMessages(requests: ListBuffer[(Long, AtomicReference[Object], (Option[MessageRecord])=>Unit)]):Unit = {
val ro = new ReadOptions
ro.verifyChecksums(verify_checksums)
ro.fillCache(true)
val missing = retry_using_index {
index.snapshot { snapshot =>
ro.snapshot(snapshot)
requests.flatMap { x =>
val (_, locator, callback) = x
val record = metric_load_from_index_counter.time {
val (pos, len ) = locator.get().asInstanceOf[(Long, Int)]
log.read(pos, len).map { data =>
val rc = PBSupport.decode_message_record(data)
rc.locator = locator
assert( rc.protocol!=null )
rc
}
}
if( record.isDefined ) {
callback(record)
None
} else {
Some(x)
}
}
}
}
if (missing.isEmpty)
return
// There's a small chance that a message was missing, perhaps we started a read tx, before the
// write tx completed. Lets try again..
retry_using_index {
index.snapshot { snapshot =>
ro.snapshot(snapshot)
missing.foreach { x =>
val (_, locator, callback) = x
val record:Option[MessageRecord] = metric_load_from_index_counter.time {
val (pos, len ) = locator.get().asInstanceOf[(Long, Int)]
log.read(pos, len).map { x =>
val rc:MessageRecord = PBSupport.decode_message_record(x)
rc.locator = locator
rc
}
}
callback(record)
}
}
}
}
def list_queues: Seq[Long] = {
val rc = ListBuffer[Long]()
retry_using_index {
val ro = new ReadOptions
ro.verifyChecksums(verify_checksums)
ro.fillCache(false)
index.cursor_keys_prefixed(queue_prefix_array, ro) { key =>
rc += decode_long_key(key)._2
true // to continue cursoring.
}
}
rc
}
def get_queue(queue_key: Long): Option[QueueRecord] = {
retry_using_index {
val ro = new ReadOptions
ro.fillCache(false)
ro.verifyChecksums(verify_checksums)
index.get(encode_key(queue_prefix, queue_key), ro).map{ x=>
PBSupport.decode_queue_record(x)
}
}
}
def listQueueEntryGroups(queue_key: Long, limit: Int) : Seq[QueueEntryRange] = {
var rc = ListBuffer[QueueEntryRange]()
val ro = new ReadOptions
ro.verifyChecksums(verify_checksums)
ro.fillCache(false)
retry_using_index {
index.snapshot { snapshot =>
ro.snapshot(snapshot)
var group:QueueEntryRange = null
index.cursor_prefixed( encode_key(queue_entry_prefix, queue_key), ro) { (key, value) =>
val (_,_,current_key) = decode_long_long_key(key)
if( group == null ) {
group = new QueueEntryRange
group.first_entry_seq = current_key
}
val entry = QueueEntryPB.FACTORY.parseFramed(value)
val pos = decode_vlong(entry.getMessageLocator)
group.last_entry_seq = current_key
group.count += 1
group.size += entry.getSize
if(group.expiration == 0){
group.expiration = entry.getExpiration
} else {
if( entry.getExpiration != 0 ) {
group.expiration = entry.getExpiration.min(group.expiration)
}
}
if( group.count == limit) {
rc += group
group = null
}
true // to continue cursoring.
}
if( group!=null ) {
rc += group
}
}
}
rc
}
def getQueueEntries(queue_key: Long, firstSeq:Long, lastSeq:Long): Seq[QueueEntryRecord] = {
var rc = ListBuffer[QueueEntryRecord]()
val ro = new ReadOptions
ro.verifyChecksums(verify_checksums)
ro.fillCache(true)
retry_using_index {
index.snapshot { snapshot =>
ro.snapshot(snapshot)
val start = encode_key(queue_entry_prefix, queue_key, firstSeq)
val end = encode_key(queue_entry_prefix, queue_key, lastSeq+1)
index.cursor_range( start, end, ro ) { (key, value) =>
val record = QueueEntryPB.FACTORY.parseFramed(value)
val entry = PBSupport.from_pb(record)
entry.message_locator = new AtomicReference[Object](decode_locator(record.getMessageLocator))
rc += entry
true
}
}
}
rc
}
def getLastMessageKey:Long = 0
def get(key: Buffer):Option[Buffer] = {
retry_using_index {
index.get(encode_key(map_prefix, key)).map(new Buffer(_))
}
}
def get_last_queue_key:Long = {
retry_using_index {
index.last_key(queue_prefix_array).map(decode_long_key(_)._2).getOrElse(0)
}
}
def gc:Unit = {
// TODO:
// Perhaps we should snapshot_index if the current snapshot is old.
//
last_index_snapshot_pos
val empty_journals = log.log_infos.keySet.toSet -- log_refs.keySet
// We don't want to delete any journals that the index has not snapshot'ed or
// the the
val delete_limit = log.log_info(last_index_snapshot_pos).map(_.position).
getOrElse(last_index_snapshot_pos).min(log.appender_start)
empty_journals.foreach { id =>
if ( id < delete_limit ) {
log.delete(id)
}
}
}
case class UsageCounter(info:LogInfo) {
var count = 0L
var size = 0L
var first_reference_queue:QueueRecord = _
def increment(value:Int) = {
count += 1
size += value
}
}
//
// Collects detailed usage information about the journal like who's referencing it.
//
// def get_log_usage_details = {
//
// val usage_map = new ApolloTreeMap[Long,UsageCounter]()
// log.log_mutex.synchronized {
// log.log_infos.foreach(entry=> usage_map.put(entry._1, UsageCounter(entry._2)) )
// }
//
// def lookup_usage(pos: Long) = {
// var entry = usage_map.floorEntry(pos)
// if (entry != null) {
// val usage = entry.getValue()
// if (pos < usage.info.limit) {
// Some(usage)
// } else {
// None
// }
// } else {
// None
// }
// }
//
// val ro = new ReadOptions()
// ro.fillCache(false)
// ro.verifyChecksums(verify_checksums)
//
// retry_using_index {
// index.snapshot { snapshot =>
// ro.snapshot(snapshot)
//
// // Figure out which journal files are still in use by which queues.
// index.cursor_prefixed(queue_entry_prefix_array, ro) { (_,value) =>
//
// val entry_record:QueueEntryRecord = value
// val pos = if(entry_record.message_locator!=null) {
// Some(decode_locator(entry_record.message_locator)._1)
// } else {
// index.get(encode_key(message_prefix, entry_record.message_key)).map(decode_locator(_)._1)
// }
//
// pos.flatMap(lookup_usage(_)).foreach { usage =>
// if( usage.first_reference_queue == null ) {
// usage.first_reference_queue = index.get(encode_key(queue_prefix, entry_record.queue_key), ro).map( x=> decode_queue_record(x) ).getOrElse(null)
// }
// usage.increment(entry_record.size)
// }
//
// true
// }
// }
// }
//
// import collection.JavaConversions._
// usage_map.values.toSeq.toArray
// }
def export_data(os:OutputStream):Option[String] = {
try {
val manager = ExportStreamManager(os, 1)
retry_using_index {
// Delete all the tmp keys..
index.cursor_keys_prefixed(Array(tmp_prefix)) { key =>
index.delete(key)
true
}
index.snapshot { snapshot=>
val nocache = new ReadOptions
nocache.snapshot(snapshot)
nocache.verifyChecksums(verify_checksums)
nocache.fillCache(false)
val cache = new ReadOptions
nocache.snapshot(snapshot)
nocache.verifyChecksums(false)
nocache.fillCache(false)
// Build a temp table of all references messages by the queues
// Remember 2 queues could reference the same message.
index.cursor_prefixed(queue_entry_prefix_array, cache) { (_, value) =>
val record = QueueEntryPB.FACTORY.parseFramed(value)
val (pos, len) = decode_locator(record.getMessageLocator)
index.put(encode_key(tmp_prefix, pos), encode_vlong(len))
true
}
// Use the temp table to export all the referenced messages. Use
// the log position as the message key.
index.cursor_prefixed(Array(tmp_prefix)) { (key, value) =>
val (_, pos) = decode_long_key(key)
val len = decode_vlong(value).toInt
log.read(pos, len).foreach { value =>
// Set the message key to be the position in the log.
val record = MessagePB.FACTORY.parseFramed(value).copy
record.setMessageKey(pos)
manager.store_message(record)
}
true
}
// Now export the queue entries
index.cursor_prefixed(queue_entry_prefix_array, nocache) { (_, value) =>
val record = QueueEntryPB.FACTORY.parseFramed(value).copy()
val (pos, len) = decode_locator(record.getMessageLocator)
record.setMessageKey(pos)
manager.store_queue_entry(record)
true
}
index.cursor_prefixed(queue_prefix_array) { (_, value) =>
val record = QueuePB.FACTORY.parseFramed(value)
manager.store_queue(record)
true
}
index.cursor_prefixed(map_prefix_array, nocache) { (key, value) =>
val key_buffer = new Buffer(key)
key_buffer.moveHead(1)
val record = new MapEntryPB.Bean
record.setKey(key_buffer)
record.setValue(new Buffer(value))
manager.store_map_entry(record)
true
}
}
// Delete all the tmp keys..
index.cursor_keys_prefixed(Array(tmp_prefix)) { key =>
index.delete(key)
true
}
}
manager.finish
None
} catch {
case x:Exception=>
debug(x, "Export failed")
x.printStackTrace()
Some(x.getMessage)
}
}
def import_data(is:InputStream):Option[String] = {
try {
val manager = ImportStreamManager(is)
if(manager.version!=1) {
return Some("Cannot import from an export file of version: "+manager.version)
}
purge
retry_using_index {
log.appender { appender =>
while(manager.getNext match {
case record:MessagePB.Buffer =>
val message_data = record.toFramedBuffer
val pos = appender.append(LOG_ADD_MESSAGE, message_data)
index.put(encode_key(tmp_prefix, record.getMessageKey), encode_locator(pos, message_data.length))
true
case record:QueueEntryPB.Buffer =>
val copy = record.copy();
var original_msg_key: Long = record.getMessageKey
index.get(encode_key(tmp_prefix, original_msg_key)) match {
case Some(locator)=>
val (pos, len) = decode_locator(locator)
copy.setMessageLocator(locator)
index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), copy.freeze().toFramedBuffer)
log.log_info(pos).foreach { log_info =>
log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
}
case None =>
println("Invalid queue entry, references message that was not in the export: "+original_msg_key)
}
true
case record:QueuePB.Buffer =>
index.put(encode_key(queue_prefix, record.getKey), record.toFramedBuffer)
true
case record:MapEntryPB.Buffer =>
index.put(encode_key(map_prefix, record.getKey), record.getValue)
true
case null =>
false
}) { // keep looping
}
}
}
store_log_refs
// Delete all the tmp keys..
index.cursor_keys_prefixed(Array(tmp_prefix)) { key =>
index.delete(key)
true
}
snapshot_index
None
} catch {
case x:Exception=>
debug(x, "Import failed")
Some(x.getMessage)
}
}
}