| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.apollo.broker.store.leveldb |
| |
| import java.{lang=>jl} |
| import java.{util=>ju} |
| |
| import java.util.zip.CRC32 |
| import java.util.Map.Entry |
| import collection.immutable.TreeMap |
| import java.util.concurrent.atomic.AtomicLong |
| import java.io._ |
| import org.apache.activemq.apollo.util.FileSupport._ |
| import org.apache.activemq.apollo.util.{Log, LRUCache} |
| import org.fusesource.hawtdispatch.BaseRetained |
| import org.fusesource.hawtbuf.{DataByteArrayInputStream, DataByteArrayOutputStream, Buffer} |
| |
| object RecordLog extends Log { |
| |
| // The log files contain a sequence of variable length log records: |
| // record := header + data |
| // |
| // header := |
| // '*' : int8 // Start of Record Magic |
| // kind : int8 // Help identify content type of the data. |
| // checksum : uint32 // crc32c of the data[] |
| // length : uint32 // the length the the data |
| |
| val LOG_HEADER_PREFIX = '*'.toByte |
| val UOW_END_RECORD = -1.toByte |
| |
| val LOG_HEADER_SIZE = 10 |
| |
| val BUFFER_SIZE = 1024*512 |
| val BYPASS_BUFFER_SIZE = 1024*16 |
| |
| case class LogInfo(file:File, position:Long, length:Long) { |
| def limit = position+length |
| } |
| |
| def encode_long(a1:Long) = { |
| val out = new DataByteArrayOutputStream(8) |
| out.writeLong(a1) |
| out.toBuffer |
| } |
| |
| def decode_long(value:Buffer):Long = { |
| val in = new DataByteArrayInputStream(value) |
| in.readLong() |
| } |
| |
| } |
| |
| case class RecordLog(directory: File, logSuffix:String) { |
| import RecordLog._ |
| |
| directory.mkdirs() |
| |
| var logSize = 1024 * 1024 * 100 |
| var current_appender:LogAppender = _ |
| var verify_checksums = false |
| var sync = false |
| |
| |
| var log_infos = TreeMap[Long, LogInfo]() |
| object log_mutex |
| |
| def delete(id:Long) = { |
| log_mutex.synchronized { |
| // We can't delete the current appender. |
| if( current_appender.position != id ) { |
| log_infos.get(id).foreach { info => |
| onDelete(info.file) |
| log_infos = log_infos.filterNot(_._1 == id) |
| } |
| } |
| } |
| } |
| |
| protected def onDelete(file:File) = { |
| file.delete() |
| } |
| |
| def checksum(data: Buffer): Int = { |
| val checksum = new CRC32 |
| checksum.update(data.data, data.offset, data.length) |
| (checksum.getValue & 0xFFFFFFFF).toInt |
| } |
| |
| class LogAppender(file:File, position:Long) extends LogReader(file, position) { |
| |
| override def open = new RandomAccessFile(file, "rw") |
| |
| override def dispose() = { |
| force |
| super.dispose() |
| } |
| |
| var append_offset = 0L |
| val flushed_offset = new AtomicLong(0) |
| |
| def append_position = { |
| position+append_offset |
| } |
| |
| // set the file size ahead of time so that we don't have to sync the file |
| // meta-data on every log sync. |
| channel.position(logSize-1) |
| channel.write(new Buffer(1).toByteBuffer) |
| channel.force(true) |
| channel.position(0) |
| |
| val write_buffer = new DataByteArrayOutputStream((BUFFER_SIZE)+BUFFER_SIZE) |
| |
| def force = { |
| flush |
| if(sync) { |
| // only need to update the file metadata if the file size changes.. |
| channel.force(append_offset > logSize) |
| } |
| } |
| |
| /** |
| * returns the offset position of the data record. |
| */ |
| def append(id:Byte, data: Buffer): Long = this.synchronized { |
| val record_position = append_position |
| val data_length = data.length |
| val total_length = LOG_HEADER_SIZE + data_length |
| |
| if( write_buffer.position() + total_length > BUFFER_SIZE ) { |
| flush |
| } |
| |
| val cs: Int = checksum(data) |
| // trace("Writing at: "+record_position+" len: "+data_length+" with checksum: "+cs) |
| |
| if( false && total_length > BYPASS_BUFFER_SIZE ) { |
| |
| // Write the header and flush.. |
| write_buffer.writeByte(LOG_HEADER_PREFIX) |
| write_buffer.writeByte(id) |
| write_buffer.writeInt(cs) |
| write_buffer.writeInt(data_length) |
| |
| append_offset += LOG_HEADER_SIZE |
| flush |
| |
| // Directly write the data to the channel since it's large. |
| val buffer = data.toByteBuffer |
| val pos = append_offset+LOG_HEADER_SIZE |
| flushed_offset.addAndGet(buffer.remaining) |
| channel.write(buffer, pos) |
| if( buffer.hasRemaining ) { |
| throw new IOException("Short write") |
| } |
| append_offset += data_length |
| |
| } else { |
| write_buffer.writeByte(LOG_HEADER_PREFIX) |
| write_buffer.writeByte(id) |
| write_buffer.writeInt(cs) |
| write_buffer.writeInt(data_length) |
| write_buffer.write(data.data, data.offset, data_length) |
| append_offset += total_length |
| } |
| record_position |
| } |
| |
| def flush = this.synchronized { |
| if( write_buffer.position() > 0 ) { |
| val buffer = write_buffer.toBuffer.toByteBuffer |
| val pos = append_offset-buffer.remaining |
| flushed_offset.addAndGet(buffer.remaining) |
| channel.write(buffer, pos) |
| if( buffer.hasRemaining ) { |
| throw new IOException("Short write") |
| } |
| write_buffer.reset() |
| } |
| } |
| |
| override def check_read_flush(end_offset:Int) = { |
| if( flushed_offset.get() < end_offset ) { |
| this.synchronized { |
| println("read flush") |
| flush |
| } |
| } |
| } |
| |
| } |
| |
| case class LogReader(file:File, position:Long) extends BaseRetained { |
| |
| def open = new RandomAccessFile(file, "r") |
| |
| val fd = open |
| val channel = fd.getChannel |
| |
| override def dispose() { |
| fd.close() |
| } |
| |
| def check_read_flush(end_offset:Int) = {} |
| |
| def read(record_position:Long, length:Int) = { |
| val offset = (record_position-position).toInt |
| check_read_flush(offset+LOG_HEADER_SIZE+length) |
| |
| if(verify_checksums) { |
| |
| val record = new Buffer(LOG_HEADER_SIZE+length) |
| |
| def record_is_not_changing = { |
| using(open) { fd => |
| val channel = fd.getChannel |
| val new_record = new Buffer(LOG_HEADER_SIZE+length) |
| channel.read(new_record.toByteBuffer, offset) |
| var same = record == new_record |
| println(same) |
| same |
| } |
| } |
| |
| if( channel.read(record.toByteBuffer, offset) != record.length ) { |
| assert( record_is_not_changing ) |
| throw new IOException("short record at position: "+record_position+" in file: "+file+", offset: "+offset) |
| } |
| |
| val is = new DataByteArrayInputStream(record) |
| val prefix = is.readByte() |
| if( prefix != LOG_HEADER_PREFIX ) { |
| assert(record_is_not_changing) |
| throw new IOException("invalid record at position: "+record_position+" in file: "+file+", offset: "+offset) |
| } |
| |
| val id = is.readByte() |
| val expectedChecksum = is.readInt() |
| val expectedLength = is.readInt() |
| val data = is.readBuffer(length) |
| |
| // If your reading the whole record we can verify the data checksum |
| if( expectedLength == length ) { |
| if( expectedChecksum != checksum(data) ) { |
| assert(record_is_not_changing) |
| throw new IOException("checksum does not match at position: "+record_position+" in file: "+file+", offset: "+offset) |
| } |
| } |
| |
| data |
| } else { |
| val data = new Buffer(length) |
| if( channel.read(data.toByteBuffer, offset+LOG_HEADER_SIZE) != data.length ) { |
| throw new IOException("short record at position: "+record_position+" in file: "+file+", offset: "+offset) |
| } |
| data |
| } |
| } |
| |
| def read(record_position:Long) = { |
| val offset = (record_position-position).toInt |
| val header = new Buffer(LOG_HEADER_SIZE) |
| channel.read(header.toByteBuffer, offset) |
| val is = header.bigEndianEditor(); |
| val prefix = is.readByte() |
| if( prefix != LOG_HEADER_PREFIX ) { |
| // Does not look like a record. |
| throw new IOException("invalid record position") |
| } |
| val id = is.readByte() |
| val expectedChecksum = is.readInt() |
| val length = is.readInt() |
| val data = new Buffer(length) |
| |
| if( channel.read(data.toByteBuffer, offset+LOG_HEADER_SIZE) != length ) { |
| throw new IOException("short record") |
| } |
| |
| if(verify_checksums) { |
| if( expectedChecksum != checksum(data) ) { |
| throw new IOException("checksum does not match") |
| } |
| } |
| (id, data, record_position+LOG_HEADER_SIZE+length) |
| } |
| |
| def check(record_position:Long):Option[(Long, Option[Long])] = { |
| var offset = (record_position-position).toInt |
| val header = new Buffer(LOG_HEADER_SIZE) |
| channel.read(header.toByteBuffer, offset) |
| val is = header.bigEndianEditor(); |
| val prefix = is.readByte() |
| if( prefix != LOG_HEADER_PREFIX ) { |
| return None // Does not look like a record. |
| } |
| val kind = is.readByte() |
| val expectedChecksum = is.readInt() |
| val length = is.readInt() |
| |
| val chunk = new Buffer(1024*4) |
| val chunkbb = chunk.toByteBuffer |
| offset += LOG_HEADER_SIZE |
| |
| // Read the data in in chunks to avoid |
| // OOME if we are checking an invalid record |
| // with a bad record length |
| val checksumer = new CRC32 |
| var remaining = length |
| while( remaining > 0 ) { |
| val chunkSize = remaining.min(1024*4); |
| chunkbb.position(0) |
| chunkbb.limit(chunkSize) |
| channel.read(chunkbb, offset) |
| if( chunkbb.hasRemaining ) { |
| return None |
| } |
| checksumer.update(chunk.data, 0, chunkSize) |
| offset += chunkSize |
| remaining -= chunkSize |
| } |
| |
| val checksum = ( checksumer.getValue & 0xFFFFFFFF).toInt |
| if( expectedChecksum != checksum ) { |
| return None |
| } |
| val uow_start_pos = if(kind == UOW_END_RECORD && length==8) Some(decode_long(chunk)) else None |
| return Some(record_position+LOG_HEADER_SIZE+length, uow_start_pos) |
| } |
| |
| def verifyAndGetEndPosition:Long = { |
| var pos = position; |
| var current_uow_start = pos |
| val limit = position+channel.size() |
| while(pos < limit) { |
| check(pos) match { |
| case Some((next, uow_start_pos)) => |
| uow_start_pos.foreach { uow_start_pos => |
| if( uow_start_pos == current_uow_start ) { |
| current_uow_start = next |
| } else { |
| return current_uow_start |
| } |
| } |
| pos = next |
| case None => |
| return current_uow_start |
| } |
| } |
| return current_uow_start |
| } |
| } |
| |
| def create_log_appender(position: Long) = { |
| new LogAppender(next_log(position), position) |
| } |
| |
| def create_appender(position: Long): Any = { |
| log_mutex.synchronized { |
| if(current_appender!=null) { |
| log_infos += position -> new LogInfo(current_appender.file, current_appender.position, current_appender.append_offset) |
| } |
| current_appender = create_log_appender(position) |
| log_infos += position -> new LogInfo(current_appender.file, position, 0) |
| } |
| } |
| |
| def open = { |
| log_mutex.synchronized { |
| log_infos = LevelDBClient.find_sequence_files(directory, logSuffix).map { case (position,file) => |
| position -> LogInfo(file, position, file.length()) |
| } |
| |
| val appendPos = if( log_infos.isEmpty ) { |
| 0L |
| } else { |
| val (_, file) = log_infos.last |
| val r = LogReader(file.file, file.position) |
| try { |
| val actualLength = r.verifyAndGetEndPosition |
| val updated = file.copy(length = actualLength - file.position) |
| log_infos = log_infos + (updated.position->updated) |
| if( updated.file.length != file.length ) { |
| // we need to truncate. |
| using(new RandomAccessFile(file.file, "rw")) ( _.setLength(updated.length)) |
| } |
| actualLength |
| } finally { |
| r.release() |
| } |
| } |
| |
| create_appender(appendPos) |
| } |
| } |
| |
| def close = { |
| log_mutex.synchronized { |
| current_appender.release |
| } |
| } |
| |
| def appender_limit = current_appender.append_position |
| def appender_start = current_appender.position |
| |
| def next_log(position:Long) = LevelDBClient.create_sequence_file(directory, position, logSuffix) |
| |
| def appender[T](func: (LogAppender)=>T):T= { |
| val intial_position = current_appender.append_position |
| try { |
| val rc = func(current_appender) |
| if( current_appender.append_position != intial_position ) { |
| // Record a UOW_END_RECORD so that on recovery we only replay full units of work. |
| current_appender.append(UOW_END_RECORD,encode_long(intial_position)) |
| } |
| rc |
| } finally { |
| current_appender.flush |
| log_mutex.synchronized { |
| if ( current_appender.append_offset >= logSize ) { |
| current_appender.release() |
| on_log_rotate() |
| create_appender(current_appender.append_position) |
| } |
| } |
| } |
| } |
| |
| var on_log_rotate: ()=>Unit = ()=>{} |
| |
| private val reader_cache = new LRUCache[File, LogReader](100) { |
| protected override def onCacheEviction(entry: Entry[File, LogReader]) = { |
| entry.getValue.release() |
| } |
| } |
| |
| def log_info(pos:Long) = log_mutex.synchronized(log_infos.range(0L, pos+1).lastOption.map(_._2)) |
| |
| private def get_reader[T](record_position:Long)(func: (LogReader)=>T) = { |
| |
| val lookup = log_mutex.synchronized { |
| val info = log_info(record_position) |
| info.map { info=> |
| if(info.position == current_appender.position) { |
| current_appender.retain() |
| (info, current_appender) |
| } else { |
| (info, null) |
| } |
| } |
| } |
| |
| lookup.map { case (info, appender) => |
| val reader = if( appender!=null ) { |
| // read from the current appender. |
| appender |
| } else { |
| // Checkout a reader from the cache... |
| reader_cache.synchronized { |
| var reader = reader_cache.get(info.file) |
| if(reader==null) { |
| reader = LogReader(info.file, info.position) |
| reader_cache.put(info.file, reader) |
| } |
| reader.retain() |
| reader |
| } |
| } |
| |
| try { |
| func(reader) |
| } finally { |
| reader.release |
| } |
| } |
| } |
| |
| def read(pos:Long) = { |
| get_reader(pos)(_.read(pos)) |
| } |
| def read(pos:Long, length:Int) = { |
| get_reader(pos)(_.read(pos, length)) |
| } |
| |
| } |