Expose a JMX attribute to show when the last leveldb replication log entry was recorded.
This can be used to get a time estimate of far behind a master a slave is.
diff --git a/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreViewMBean.java b/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreViewMBean.java
index 9015242..19b2192 100644
--- a/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreViewMBean.java
+++ b/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/ReplicatedLevelDBStoreViewMBean.java
@@ -52,6 +52,9 @@
@MBeanInfo("The current position of the replication log.")
Long getPosition();
+ @MBeanInfo("When the last entry was added to the replication log.")
+ Long getPositionDate();
+
@MBeanInfo("The directory holding the data.")
String getDirectory();
diff --git a/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogWrite.java b/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogWrite.java
index 52e126f..b3749bb 100644
--- a/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogWrite.java
+++ b/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/dto/LogWrite.java
@@ -43,4 +43,6 @@
@XmlAttribute(name="sync")
public boolean sync=false;
+ @XmlAttribute(name="date")
+ public long date;
}
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
index 53e9971..0399bd9 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
@@ -463,6 +463,21 @@
null
}
+ def getPositionDate:java.lang.Long = {
+ val rc = if( slave!=null ) {
+ slave.wal_date
+ } else if( master!=null ) {
+ master.wal_date
+ } else {
+ 0
+ }
+ if( rc != 0 ) {
+ return new java.lang.Long(rc)
+ } else {
+ return null
+ }
+ }
+
def getDirectory = directory.getCanonicalPath
def getSync = sync
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
index 6572aa9..b8289fc 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
@@ -386,6 +386,7 @@
false
}
+ def date = System.currentTimeMillis()
def replicate_wal(file:File, position:Long, offset:Long, length:Long):Unit = {
if( length > 0 ) {
@@ -393,6 +394,8 @@
value.file = position;
value.offset = offset;
value.length = length
+ value.date = date
+ wal_date = value.date;
value.sync = (syncToMask & SYNC_TO_REMOTE_DISK)!=0
val frame1 = ReplicationFrame(WAL_ACTION, JsonCodec.encode(value))
val frame2 = FileTransferFrame(file, offset, length)
@@ -412,5 +415,6 @@
}
def wal_append_position = client.wal_append_position
-
+ @volatile
+ var wal_date = 0L
}
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
index 8c832a7..ce76230 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
@@ -134,6 +134,8 @@
var wal_append_position = 0L
var wal_append_offset = 0L
+ @volatile
+ var wal_date = 0L
def send_wal_ack = {
queue.assertExecuting()
@@ -173,6 +175,7 @@
// info("Slave WAL update: %s, (offset: %d, length: %d), sending ack:%s", file, value.offset, value.length, caughtUp)
wal_append_offset = value.offset+value.length
wal_append_position = value.file + wal_append_offset
+ wal_date = value.date
if( !stopped ) {
if( caughtUp ) {
client.log.current_appender.skip(value.length)