[maven-release-plugin] copy for tag apollo-project-1.0
git-svn-id: https://svn.apache.org/repos/asf/activemq/activemq-apollo/tags/apollo-project-1.0@1235626 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala b/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
index b13fe41..4936d68 100755
--- a/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
+++ b/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
@@ -216,16 +216,7 @@
}
def log_size = {
- Option(config.log_size).map(MemoryPropertyEditor.parse(_)).map{size=>
- if(size == MemoryPropertyEditor.parse("2G")) {
- Int.MaxValue // which is 2G - 1 (close enough!)
- } else if(size > Int.MaxValue) {
- warn("leveldb log_size was configured to be '"+config.log_size+"' but the maximum supported log size is 2G")
- Int.MaxValue
- } else {
- size.toInt
- }
- }.getOrElse(1024 * 1024 * 100)
+ Option(config.log_size).map(MemoryPropertyEditor.parse(_)).getOrElse(1024 * 1024 * 100L)
}
def start() = {
diff --git a/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala b/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
index db3225b..f8a227e 100644
--- a/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
+++ b/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
@@ -70,7 +70,7 @@
directory.mkdirs()
- var logSize = 1024 * 1024 * 100
+ var logSize = 1024 * 1024 * 100L
var current_appender:LogAppender = _
var verify_checksums = false
var sync = false
@@ -122,9 +122,11 @@
channel.position(logSize-1)
channel.write(new Buffer(1).toByteBuffer)
channel.force(true)
- channel.position(0)
+ if( sync ) {
+ channel.position(0)
+ }
- val write_buffer = new DataByteArrayOutputStream((BUFFER_SIZE)+BUFFER_SIZE)
+ val write_buffer = new DataByteArrayOutputStream(BUFFER_SIZE+LOG_HEADER_SIZE)
def force = {
flush
@@ -194,7 +196,7 @@
}
}
- override def check_read_flush(end_offset:Int) = {
+ override def check_read_flush(end_offset:Long) = {
if( flushed_offset.get() < end_offset ) {
this.synchronized {
println("read flush")
@@ -216,10 +218,12 @@
fd.close()
}
- def check_read_flush(end_offset:Int) = {}
+ def check_read_flush(end_offset:Long) = {}
def read(record_position:Long, length:Int) = {
- val offset = (record_position-position).toInt
+ val offset = record_position-position
+ assert(offset >=0 )
+
check_read_flush(offset+LOG_HEADER_SIZE+length)
if(verify_checksums) {
@@ -273,7 +277,7 @@
}
def read(record_position:Long) = {
- val offset = (record_position-position).toInt
+ val offset = record_position-position
val header = new Buffer(LOG_HEADER_SIZE)
channel.read(header.toByteBuffer, offset)
val is = header.bigEndianEditor();
@@ -300,7 +304,7 @@
}
def check(record_position:Long):Option[(Long, Option[Long])] = {
- var offset = (record_position-position).toInt
+ var offset = record_position-position
val header = new Buffer(LOG_HEADER_SIZE)
channel.read(header.toByteBuffer, offset)
val is = header.bigEndianEditor();