| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.apollo.broker.store.bdb |
| |
| import dto.BDBStoreDTO |
| import java.{lang=>jl} |
| import java.{util=>ju} |
| |
| import collection.mutable.ListBuffer |
| import org.apache.activemq.apollo.broker.store._ |
| import org.apache.activemq.apollo.util._ |
| import java.io.{InputStream, OutputStream} |
| import com.sleepycat.je._ |
| import org.fusesource.hawtbuf.Buffer |
| import FileSupport._ |
| |
| object BDBClient extends Log { |
| final val STORE_SCHEMA_PREFIX = "bdb_store:" |
| final val STORE_SCHEMA_VERSION = 2 |
| } |
| /** |
| * |
| * @author <a href="http://hiramchirino.com">Hiram Chirino</a> |
| */ |
| class BDBClient(store: BDBStore) { |
| |
| import HelperTrait._ |
| |
| import BDBClient._ |
| |
| def dispatchQueue = store.dispatch_queue |
| |
| ///////////////////////////////////////////////////////////////////// |
| // |
| // Helpers |
| // |
| ///////////////////////////////////////////////////////////////////// |
| |
| private def directory = config.directory |
| |
| ///////////////////////////////////////////////////////////////////// |
| // |
| // Public interface used by the BDBStore |
| // |
| ///////////////////////////////////////////////////////////////////// |
| |
| var config: BDBStoreDTO = null |
| |
| var environment:Environment = _ |
| |
| var direct_buffer_allocator: FileDirectBufferAllocator = _ |
| |
| def direct_buffer_file = { |
| import FileSupport._ |
| config.directory / "dbuffer.dat" |
| } |
| |
| def start() = { |
| val env_config = new EnvironmentConfig(); |
| env_config.setAllowCreate(true); |
| env_config.setTransactional(true); |
| env_config.setTxnSerializableIsolation(false) |
| |
| directory.mkdirs |
| |
| val version_file = directory / "store-version.txt" |
| if( version_file.exists( ) ) { |
| val ver = try { |
| var tmp: String = version_file.read_text().trim() |
| if( tmp.startsWith(STORE_SCHEMA_PREFIX) ) { |
| tmp.stripPrefix(STORE_SCHEMA_PREFIX).toInt |
| } else { |
| -1 |
| } |
| } catch { |
| case e => throw new Exception("Unexpected version file format: "+version_file) |
| } |
| ver match { |
| case STORE_SCHEMA_VERSION => // All is good. |
| case _ => throw new Exception("Cannot open the store. It's schema version is not supported.") |
| } |
| } |
| version_file.write_text(STORE_SCHEMA_PREFIX+STORE_SCHEMA_VERSION) |
| |
| direct_buffer_allocator = new FileDirectBufferAllocator(direct_buffer_file) |
| environment = new Environment(directory, env_config); |
| |
| with_ctx() { ctx=> |
| import ctx._ |
| messages_db |
| message_refs_db |
| queues_db |
| |
| lobs_db.cursor(tx) { (_,value)=> |
| val v = decode_lob_value(value) |
| direct_buffer_allocator.alloc_at(v._1, v._2) |
| true |
| } |
| } |
| } |
| |
| def stop() = { |
| environment.close |
| direct_buffer_allocator.close |
| direct_buffer_allocator = null |
| } |
| |
| case class TxContext(tx:Transaction) { |
| |
| def with_entries_db[T](queue_key:Long)(func: (Database) => T): T = { |
| val db = environment.openDatabase(tx, entries_db_name(queue_key), long_key_conf) |
| try { |
| func(db) |
| } finally { |
| db.close |
| } |
| } |
| |
| private var _entries_db:Database = _ |
| def entries_db:Database = { |
| if( _entries_db==null ) { |
| _entries_db = environment.openDatabase(tx, "entries", long_long_key_conf) |
| } |
| _entries_db |
| } |
| |
| private var _messages_db:Database = _ |
| def messages_db:Database = { |
| if( _messages_db==null ) { |
| _messages_db = environment.openDatabase(tx, "messages", long_key_conf) |
| } |
| _messages_db |
| } |
| |
| private var _lobs_db:Database = _ |
| def lobs_db:Database = { |
| if( _lobs_db==null ) { |
| _lobs_db = environment.openDatabase(tx, "lobs", long_key_conf) |
| } |
| _lobs_db |
| } |
| |
| private var _message_refs_db:Database = _ |
| def message_refs_db:Database = { |
| if( _message_refs_db==null ) { |
| _message_refs_db = environment.openDatabase(tx, "message_refs", long_key_conf) |
| } |
| _message_refs_db |
| } |
| |
| private var _queues_db:Database = _ |
| def queues_db:Database = { |
| if( _queues_db==null ) { |
| _queues_db = environment.openDatabase(tx, "queues", long_key_conf) |
| } |
| _queues_db |
| } |
| |
| private var _map_db:Database = _ |
| def map_db:Database = { |
| if( _map_db==null ) { |
| _map_db = environment.openDatabase(tx, "map", buffer_key_conf) |
| } |
| _map_db |
| } |
| |
| def close(ok:Boolean) = { |
| if( _messages_db!=null ) { |
| _messages_db.close |
| } |
| if( _message_refs_db!=null ) { |
| _message_refs_db.close |
| } |
| if( _queues_db!=null ) { |
| _queues_db.close |
| } |
| if( _entries_db!=null ) { |
| _entries_db.close |
| } |
| if( _map_db!=null ) { |
| _map_db.close |
| } |
| if( _lobs_db!=null ) { |
| _lobs_db.close |
| } |
| |
| if(ok){ |
| tx.commit |
| } else { |
| tx.abort |
| } |
| } |
| |
| } |
| |
| |
| def with_ctx[T](sync:Boolean=true)(func: (TxContext) => T): T = { |
| var error:Throwable = null |
| var rc:Option[T] = None |
| |
| // We will loop until the tx succeeds. Perhaps it's |
| // failing due to a temporary condition like low disk space. |
| while(!rc.isDefined) { |
| |
| |
| val ctx = if(sync) { |
| TxContext(environment.beginTransaction(null, null)); |
| } else { |
| TxContext(environment.beginTransaction(null, new TransactionConfig().setDurability(Durability.COMMIT_NO_SYNC))) |
| } |
| |
| try { |
| rc = Some(func(ctx)) |
| } catch { |
| case e:Throwable => |
| if( error==null ) { |
| warn(e, "Message store transaction failed. Will keep retrying after every second.") |
| } |
| error = e |
| } finally { |
| ctx.close(rc.isDefined) |
| } |
| |
| if (!rc.isDefined) { |
| // We may need to give up if the store is being stopped. |
| if ( !store.service_state.is_started ) { |
| throw error |
| } |
| Thread.sleep(1000) |
| } |
| } |
| |
| if( error!=null ) { |
| info("Store recovered from inital failure.") |
| } |
| rc.get |
| } |
| |
| def purge() = { |
| |
| with_ctx() { ctx=> |
| import ctx._ |
| |
| |
| def remove_db(name: String): Unit = { |
| try { |
| environment.removeDatabase(tx, name) |
| } catch { |
| case x: DatabaseNotFoundException => |
| } |
| } |
| |
| listQueues.foreach{ queue_key=> |
| val name = entries_db_name(queue_key) |
| remove_db(name) |
| } |
| remove_db("messages") |
| remove_db("message_refs") |
| remove_db("queues") |
| |
| messages_db |
| message_refs_db |
| queues_db |
| } |
| |
| } |
| |
| def add_and_get[K](db:Database, key:DatabaseEntry, amount:Int, tx:Transaction):Int = { |
| db.get(tx, key) match { |
| case None => |
| if( amount!=0 ) { |
| db.put(tx, key, amount) |
| } |
| amount |
| |
| case Some(value) => |
| val counter:Int = value |
| val update = counter + amount |
| if( update == 0 ) { |
| db.delete(tx, key) |
| } else { |
| db.put(tx, key, update) |
| } |
| update |
| } |
| } |
| |
| def addQueue(record: QueueRecord, callback:Runnable) = { |
| with_ctx() { ctx=> |
| import ctx._ |
| queues_db.put(tx, record.key, record) |
| } |
| callback.run |
| } |
| |
| def decrement_message_reference(ctx:TxContext, msg_key:Long) = { |
| import ctx._ |
| if( add_and_get(message_refs_db, msg_key, -1, tx)==0 ) { |
| messages_db.delete(tx, msg_key) |
| lobs_db.get(tx, to_database_entry(msg_key)).foreach { v=> |
| val location = decode_lob_value(v) |
| direct_buffer_allocator.free(location._1, location._2) |
| } |
| lobs_db.delete(tx, msg_key) |
| } |
| } |
| |
| def removeQueue(queue_key: Long, callback:Runnable) = { |
| with_ctx() { ctx=> |
| import ctx._ |
| |
| queues_db.delete(tx, queue_key) |
| |
| entries_db.cursor_from(tx, (queue_key, 0L)) { (key,value)=> |
| val current_key:(Long,Long)=key |
| if( current_key._1 == queue_key ) { |
| val queueEntry:QueueEntryRecord = value |
| entries_db.delete(tx, key) |
| decrement_message_reference(ctx, queueEntry.message_key) |
| true // keep cursoring.. |
| } else { |
| false |
| } |
| } |
| } |
| callback.run |
| } |
| |
| def store(uows: Seq[BDBStore#DelayableUOW], callback:Runnable) { |
| val sync = uows.find( _.flush_sync ).isDefined |
| with_ctx(sync) { ctx=> |
| import ctx._ |
| var sync_lobs = false |
| uows.foreach { uow => |
| |
| for((key,value) <- uow.map_actions) { |
| if( value==null ) { |
| map_db.delete(tx, key) |
| } else { |
| map_db.put(tx, key, value) |
| } |
| } |
| |
| uow.actions.foreach { |
| case (msg, action) => |
| |
| val message_record = action.message_record |
| if (message_record != null) { |
| import PBSupport._ |
| |
| val pb = if( message_record.direct_buffer != null ) { |
| val r = to_pb(action.message_record).copy |
| val buffer = direct_buffer_allocator.copy(message_record.direct_buffer) |
| r.setDirectOffset(buffer.offset) |
| r.setDirectSize(buffer.size) |
| lobs_db.put(tx, message_record.key, (buffer.offset, buffer.size)) |
| sync_lobs = true |
| r.freeze |
| } else { |
| to_pb(action.message_record) |
| } |
| |
| messages_db.put(tx, action.message_record.key, pb.freeze) |
| } |
| |
| action.enqueues.foreach { queueEntry => |
| entries_db.put(tx, (queueEntry.queue_key, queueEntry.entry_seq), queueEntry) |
| add_and_get(message_refs_db, queueEntry.message_key, 1, tx) |
| } |
| |
| action.dequeues.foreach { queueEntry => |
| entries_db.delete(tx, (queueEntry.queue_key, queueEntry.entry_seq)) |
| decrement_message_reference(ctx, queueEntry.message_key) |
| } |
| } |
| } |
| if( sync_lobs ) { |
| direct_buffer_allocator.sync |
| } |
| } |
| callback.run |
| } |
| |
| def listQueues: Seq[Long] = { |
| val rc = ListBuffer[Long]() |
| with_ctx() { ctx=> |
| import ctx._ |
| |
| queues_db.cursor(tx) { (key, _) => |
| rc += key |
| true // to continue cursoring. |
| } |
| } |
| |
| rc |
| } |
| |
| def getQueue(queue_key: Long): Option[QueueRecord] = { |
| with_ctx() { ctx=> |
| import ctx._ |
| queues_db.get(tx, to_database_entry(queue_key)).map( x=> to_queue_record(x) ) |
| } |
| } |
| |
| def listQueueEntryGroups(queue_key: Long, limit: Int) : Seq[QueueEntryRange] = { |
| var rc = ListBuffer[QueueEntryRange]() |
| with_ctx() { ctx=> |
| import ctx._ |
| var group:QueueEntryRange = null |
| |
| entries_db.cursor_from(tx, (queue_key, 0L)) { (key, value) => |
| val current_key:(Long,Long)= key |
| if( current_key._1 == queue_key ) { |
| if( group == null ) { |
| group = new QueueEntryRange |
| group.first_entry_seq = current_key._2 |
| } |
| |
| val entry:QueueEntryRecord = value |
| |
| group.last_entry_seq = current_key._2 |
| group.count += 1 |
| group.size += entry.size |
| |
| if(group.expiration == 0){ |
| group.expiration = entry.expiration |
| } else { |
| if( entry.expiration != 0 ) { |
| group.expiration = entry.expiration.min(group.expiration) |
| } |
| } |
| |
| if( group.count == limit) { |
| rc += group |
| group = null |
| } |
| |
| true // to continue cursoring. |
| |
| } else { |
| false |
| } |
| } |
| |
| if( group!=null ) { |
| rc += group |
| } |
| |
| } |
| rc |
| } |
| |
| def getQueueEntries(queue_key: Long, firstSeq:Long, lastSeq:Long): Seq[QueueEntryRecord] = { |
| var rc = ListBuffer[QueueEntryRecord]() |
| with_ctx() { ctx=> |
| import ctx._ |
| entries_db.cursor_from(tx, (queue_key, firstSeq)) { (key, value) => |
| val current_key:(Long,Long) = key |
| if( current_key._1 == queue_key ) { |
| |
| val entry_seq = current_key._2 |
| val entry:QueueEntryRecord = value |
| rc += entry |
| entry_seq < lastSeq |
| |
| } else { |
| false |
| } |
| } |
| } |
| rc |
| } |
| |
| val metric_load_from_index_counter = new TimeCounter |
| var metric_load_from_index = metric_load_from_index_counter(false) |
| |
| def loadMessages(requests: ListBuffer[(Long, (Option[MessageRecord])=>Unit)]):Unit = { |
| |
| val missing = with_ctx() { ctx=> |
| import ctx._ |
| requests.flatMap { x => |
| val (message_key, callback) = x |
| val record = metric_load_from_index_counter.time { |
| messages_db.get(tx, to_database_entry(message_key)).map{ data=> |
| import PBSupport._ |
| val pb:MessagePB.Buffer = data |
| val rc = from_pb(pb) |
| if( pb.hasDirectSize ) { |
| rc.direct_buffer = direct_buffer_allocator.slice(pb.getDirectOffset, pb.getDirectSize) |
| } |
| rc |
| } |
| } |
| if( record.isDefined ) { |
| callback(record) |
| None |
| } else { |
| Some(x) |
| } |
| } |
| } |
| |
| if (missing.isEmpty) |
| return |
| |
| // There's a small chance that a message was missing, perhaps we started a read tx, before the |
| // write tx completed. Lets try again.. |
| with_ctx() { ctx=> |
| import ctx._ |
| missing.foreach { x => |
| val (message_key, callback) = x |
| val record = metric_load_from_index_counter.time { |
| messages_db.get(tx, to_database_entry(message_key)).map{ data=> |
| import PBSupport._ |
| val pb:MessagePB.Buffer = data |
| val rc = from_pb(pb) |
| if( pb.hasDirectSize ) { |
| rc.direct_buffer = direct_buffer_allocator.slice(pb.getDirectOffset, pb.getDirectSize) |
| } |
| rc |
| } |
| } |
| callback(record) |
| } |
| } |
| } |
| |
| |
| def getLastMessageKey:Long = { |
| with_ctx() { ctx=> |
| import ctx._ |
| |
| messages_db.last_key(tx).map(to_long _).getOrElse(0) |
| } |
| } |
| |
| |
| def get(key: Buffer):Option[Buffer] = { |
| with_ctx() { ctx=> |
| import ctx._ |
| map_db.get(tx, to_database_entry(key)).map(x=> to_buffer(x)) |
| } |
| } |
| |
| def getLastQueueKey:Long = { |
| with_ctx() { ctx=> |
| import ctx._ |
| |
| queues_db.last_key(tx).map(to_long _).getOrElse(0) |
| } |
| } |
| |
| def export_data(os:OutputStream):Option[String] = { |
| try { |
| val manager = ExportStreamManager(os, 1) |
| |
| with_ctx() { ctx=> |
| import ctx._ |
| |
| messages_db.cursor(tx) { (_, value) => |
| val record = MessagePB.FACTORY.parseFramed(value.getData) |
| manager.store_message(record) |
| true |
| } |
| |
| entries_db.cursor(tx) { (key, value) => |
| val record = QueueEntryPB.FACTORY.parseFramed(value.getData) |
| manager.store_queue_entry(record) |
| true |
| } |
| |
| queues_db.cursor(tx) { (_, value) => |
| val record = QueuePB.FACTORY.parseFramed(value) |
| manager.store_queue(record) |
| true |
| } |
| |
| map_db.cursor(tx) { (key,value) => |
| val record = new MapEntryPB.Bean |
| record.setKey(key) |
| record.setValue(value) |
| manager.store_map_entry(record) |
| true |
| } |
| } |
| |
| manager.finish |
| None |
| } catch { |
| case x:Exception=> |
| Some(x.getMessage) |
| } |
| } |
| |
| def import_data(is:InputStream):Option[String] = { |
| try { |
| val manager = ImportStreamManager(is) |
| if(manager.version!=1) { |
| return Some("Cannot import from an export file of version: "+manager.version) |
| } |
| |
| purge |
| |
| with_ctx() { ctx=> |
| import ctx._ |
| |
| while(manager.getNext match { |
| |
| case record:MessagePB.Buffer => |
| messages_db.put(tx, record.getMessageKey, record.toFramedBuffer) |
| true |
| |
| case record:QueueEntryPB.Buffer => |
| entries_db.put(tx, (record.getQueueKey, record.getQueueSeq), record.toFramedBuffer) |
| add_and_get(message_refs_db, record.getMessageKey, 1, tx) |
| true |
| |
| case record:QueuePB.Buffer => |
| var key: Long = record.getKey |
| queues_db.put(tx, key, record.toFramedBuffer) |
| true |
| |
| case record:MapEntryPB.Buffer => |
| map_db.put(tx, record.getKey, record.getValue) |
| true |
| |
| case null => |
| false |
| }) { // keep looping |
| } |
| |
| } |
| None |
| |
| } catch { |
| case x:Exception=> |
| Some(x.getMessage) |
| } |
| } |
| } |