https://issues.apache.org/jira/browse/AMQNET-290
Adds support for a pluggable class to log transaction recovery information.
diff --git a/src/main/csharp/NetTxConnection.cs b/src/main/csharp/NetTxConnection.cs
index f6fd083..071679f 100644
--- a/src/main/csharp/NetTxConnection.cs
+++ b/src/main/csharp/NetTxConnection.cs
@@ -22,8 +22,17 @@
namespace Apache.NMS.ActiveMQ
{
+ /// <summary>
+ /// Extends the basic Connection class to provide a transacted Connection
+ /// instance that operates within the bounds of a .NET Scoped Transaction.
+ ///
+ /// The default Session creation methods of Connection are overriden here
+ /// to always return a TX capable session instance.
+ /// </summary>
public class NetTxConnection : Connection, INetTxConnection
{
+ private NetTxRecoveryPolicy recoveryPolicy = new NetTxRecoveryPolicy();
+
public NetTxConnection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
: base(connectionUri, transport, clientIdGenerator)
{
@@ -40,6 +49,33 @@
return new NetTxSession(this, NextSessionId);
}
+ public NetTxRecoveryPolicy RecoveryPolicy
+ {
+ get { return this.recoveryPolicy; }
+ set { this.recoveryPolicy = value; }
+ }
+
+ internal Guid ResourceManagerGuid
+ {
+ get { return GuidFromId(this.ResourceManagerId); }
+ }
+
+ private static Guid GuidFromId(string id)
+ {
+ // Remove the ID: prefix, that's non-unique to be sure
+ string resId = id.TrimStart("ID:".ToCharArray());
+
+ // Remaing parts should be host-port-timestamp-instance:sequence
+ string[] parts = resId.Split(":-".ToCharArray());
+
+ // We don't use the hostname here, just the remaining bits.
+ int a = Int32.Parse(parts[1]);
+ short b = Int16.Parse(parts[3]);
+ short c = Int16.Parse(parts[4]);
+ byte[] d = System.BitConverter.GetBytes(Int64.Parse(parts[2]));
+
+ return new Guid(a, b, c, d);
+ }
}
}
diff --git a/src/main/csharp/NetTxConnectionFactory.cs b/src/main/csharp/NetTxConnectionFactory.cs
index 56e2628..169d3b9 100644
--- a/src/main/csharp/NetTxConnectionFactory.cs
+++ b/src/main/csharp/NetTxConnectionFactory.cs
@@ -16,6 +16,7 @@
*/
using System;
+using System.Collections.Specialized;
using Apache.NMS;
using Apache.NMS.Util;
using Apache.NMS.ActiveMQ.Transport;
@@ -24,6 +25,8 @@
{
public class NetTxConnectionFactory : ConnectionFactory, INetTxConnectionFactory
{
+ private NetTxRecoveryPolicy txRecoveryPolicy;
+
public NetTxConnectionFactory() : base(GetDefaultBrokerUrl())
{
}
@@ -59,9 +62,27 @@
protected override Connection CreateActiveMQConnection(ITransport transport)
{
- return new NetTxConnection(this.BrokerUri, transport, this.ClientIdGenerator);
+ NetTxConnection connection = new NetTxConnection(this.BrokerUri, transport, this.ClientIdGenerator);
+
+ Uri brokerUri = this.BrokerUri;
+
+ // Set properties on the Receovery Policy using parameters prefixed with "nms.RecoveryPolicy."
+ if(!String.IsNullOrEmpty(brokerUri.Query) && !brokerUri.OriginalString.EndsWith(")"))
+ {
+ string query = brokerUri.Query.Substring(brokerUri.Query.LastIndexOf(")") + 1);
+ StringDictionary options = URISupport.ParseQuery(query);
+ options = URISupport.GetProperties(options, "nms.RecoveryPolicy.");
+ URISupport.SetProperties(this.txRecoveryPolicy, options);
+ }
+
+ return connection;
}
+ public NetTxRecoveryPolicy TxRecoveryPolicy
+ {
+ get { return this.txRecoveryPolicy; }
+ set { this.txRecoveryPolicy = value; }
+ }
}
}
diff --git a/src/main/csharp/NetTxRecoveryPolicy.cs b/src/main/csharp/NetTxRecoveryPolicy.cs
new file mode 100644
index 0000000..f4a6a11
--- /dev/null
+++ b/src/main/csharp/NetTxRecoveryPolicy.cs
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+
+using Apache.NMS.Util;
+using Apache.NMS.ActiveMQ.Util;
+using Apache.NMS.ActiveMQ.Transactions;
+
+namespace Apache.NMS.ActiveMQ
+{
+ /// <summary>
+ /// Policy class used to configure the options associated with TX
+ /// recovery.
+ /// </summary>
+ public class NetTxRecoveryPolicy
+ {
+ private static readonly FactoryFinder<RecoveryLoggerFactoryAttribute, IRecoveryLoggerFactory> FACTORY_FINDER =
+ new FactoryFinder<RecoveryLoggerFactoryAttribute, IRecoveryLoggerFactory>();
+
+ private static IDictionary<String, Type> LOGGER_FACTORY_TYPES = new Dictionary<String, Type>();
+
+ private IRecoveryLogger recoveryLogger;
+
+ public NetTxRecoveryPolicy()
+ {
+ }
+
+ public void RegisterRecoveryLoggerFactory(string scheme, Type factoryType)
+ {
+ LOGGER_FACTORY_TYPES[scheme] = factoryType;
+ }
+
+ public string RecoveryLoggerType
+ {
+ get { return this.recoveryLogger != null ? this.recoveryLogger.LoggerType : ""; }
+ set
+ {
+ if(string.IsNullOrEmpty(value))
+ {
+ throw new NMSException(String.Format("Recovery Logger name invalid: [{0}]", value));
+ }
+
+ IRecoveryLoggerFactory factory = null;
+
+ try
+ {
+ factory = NewInstance(value.ToLower());
+ }
+ catch(NMSException)
+ {
+ throw;
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create("Error creating Recovery Logger", e);
+ }
+
+ this.recoveryLogger = factory.Create();
+ }
+ }
+
+ public IRecoveryLogger RecoveryLogger
+ {
+ get { return this.recoveryLogger; }
+ set { this.recoveryLogger = value; }
+ }
+
+ private static IRecoveryLoggerFactory NewInstance(string scheme)
+ {
+ try
+ {
+ Type factoryType = FindLoggerFactory(scheme);
+
+ if(factoryType == null)
+ {
+ throw new Exception("NewInstance failed to find a match for id = " + scheme);
+ }
+
+ return (IRecoveryLoggerFactory) Activator.CreateInstance(factoryType);
+ }
+ catch(Exception ex)
+ {
+ Tracer.WarnFormat("NewInstance failed to create an IRecoveryLoggerFactory with error: {1}", ex.Message);
+ throw;
+ }
+ }
+
+ private static Type FindLoggerFactory(string scheme)
+ {
+ if(LOGGER_FACTORY_TYPES.ContainsKey(scheme))
+ {
+ return LOGGER_FACTORY_TYPES[scheme];
+ }
+
+ try
+ {
+ Type factoryType = FACTORY_FINDER.FindFactoryType(scheme);
+ LOGGER_FACTORY_TYPES[scheme] = factoryType;
+ return factoryType;
+ }
+ catch
+ {
+ throw new NMSException("Failed to find Factory for Recovery Logger type: " + scheme);
+ }
+ }
+ }
+}
+
diff --git a/src/main/csharp/TransactionContext.cs b/src/main/csharp/TransactionContext.cs
index a3c5f0a..48553e2 100644
--- a/src/main/csharp/TransactionContext.cs
+++ b/src/main/csharp/TransactionContext.cs
@@ -16,8 +16,6 @@
*/
using System;
-using System.Runtime.Serialization;
-using System.Runtime.Serialization.Formatters.Binary;
using System.IO;
using System.Text;
using System.Net;
@@ -26,6 +24,7 @@
using System.Collections.Generic;
using Apache.NMS.Util;
using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.Transactions;
namespace Apache.NMS.ActiveMQ
{
@@ -261,6 +260,11 @@
BeforeEnd();
+ // Before sending the request to the broker, log the recovery bits, if
+ // this fails we can't prepare and the TX should be rolled back.
+ RecoveryLogger.LogRecoveryInfo(this.transactionId as XATransactionId,
+ preparingEnlistment.RecoveryInformation());
+
// Now notify the broker that a new XA'ish transaction has started.
TransactionInfo info = new TransactionInfo();
info.ConnectionId = this.session.Connection.ConnectionId;
@@ -280,6 +284,10 @@
this.transactionId = null;
this.currentEnlistment = null;
+ // Read Only means there's nothing to recover because there was no
+ // change on the broker.
+ RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
+
// if server responds that nothing needs to be done, then reply prepared
// but clear the current state data so we appear done to the commit method.
preparingEnlistment.Prepared();
@@ -293,8 +301,6 @@
// If work finished correctly, reply prepared
preparingEnlistment.Prepared();
-
- StoreRecoveryInformation(preparingEnlistment.RecoveryInformation());
}
}
catch(Exception ex)
@@ -302,7 +308,7 @@
Tracer.Debug("Transaction Prepare failed with error: " + ex.Message);
AfterRollback();
preparingEnlistment.ForceRollback();
- ClearStoredRecoveryInformation();
+ RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
}
}
@@ -325,7 +331,7 @@
Tracer.Debug("Transaction Commit Reports Done: ");
- ClearStoredRecoveryInformation();
+ RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
// if server responds that nothing needs to be done, then reply done.
enlistment.Done();
@@ -423,7 +429,7 @@
Tracer.Debug("Transaction Rollback Reports Done: ");
- ClearStoredRecoveryInformation();
+ RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
// if server responds that nothing needs to be done, then reply done.
enlistment.Done();
@@ -472,7 +478,7 @@
Tracer.Debug("InDoubt Transaction Rollback Reports Done: ");
- ClearStoredRecoveryInformation();
+ RecoveryLogger.LogRecovered(this.transactionId as XATransactionId);
// if server responds that nothing needs to be done, then reply done.
enlistment.Done();
@@ -496,7 +502,6 @@
#region Distributed Transaction Recovery Bits
- private readonly object logFileLock = new object();
private volatile CountDownLatch recoveryComplete = null;
/// <summary>
@@ -508,8 +513,11 @@
/// </summary>
public void CheckForAndRecoverFailedTransactions()
{
- RecoveryInformation info = TryOpenRecoveryInfoFile();
- if (info == null)
+ // initialize the logger with the current Resource Manager Id
+ RecoveryLogger.ResourceManagerId = ResourceManagerId;
+
+ KeyValuePair<XATransactionId, byte[]>[] localRecoverables = RecoveryLogger.GetRecoverables();
+ if (localRecoverables.Length == 0)
{
Tracer.Debug("Did not detect any open DTC transaction records on disk.");
// No local data so anything stored on the broker can't be recovered here.
@@ -522,109 +530,43 @@
Tracer.Debug("Did not detect any recoverable transactions at Broker.");
// Broker has no recoverable data so nothing to do here, delete the
// old recovery log as its stale.
- ClearStoredRecoveryInformation();
+ RecoveryLogger.Purge();
return;
}
- XATransactionId xid = info.Xid;
+ //XATransactionId xid = info.Xid;
+
+ int matched = 0;
foreach(XATransactionId recoverable in recoverables)
{
- if(xid.Equals(recoverable))
+ foreach(KeyValuePair<XATransactionId, byte[]> entry in localRecoverables)
{
- Tracer.DebugFormat("Found a matching TX on Broker to stored Id: {0} reenlisting.", xid);
+ if(entry.Key.Equals(recoverable))
+ {
+ Tracer.DebugFormat("Found a matching TX on Broker to stored Id: {0} reenlisting.", entry.Key);
- this.recoveryComplete = new CountDownLatch(1);
+ matched++;
- // Reenlist the recovered transaction with the TX Manager.
- this.transactionId = xid;
- this.currentEnlistment = TransactionManager.Reenlist(ResourceManagerGuid, info.TxRecoveryInfo, this);
- TransactionManager.RecoveryComplete(ResourceManagerGuid);
-
- this.recoveryComplete.await();
-
- return;
+ // Reenlist the recovered transaction with the TX Manager.
+ // TODO - Hack for now, we really only support one recoverable with this.
+ this.transactionId = entry.Key;
+ this.currentEnlistment = TransactionManager.Reenlist(ResourceManagerGuid, entry.Value, this);
+ }
}
}
+ if(matched > 0)
+ {
+ this.recoveryComplete = new CountDownLatch(matched);
+ TransactionManager.RecoveryComplete(ResourceManagerGuid);
+ this.recoveryComplete.await();
+ return;
+ }
+
// The old recovery information doesn't match what's on the broker so we
// should discard it as its stale now.
- ClearStoredRecoveryInformation();
- }
-
- [Serializable]
- private sealed class RecoveryInformation
- {
- private byte[] txRecoveryInfo;
- private byte[] globalTxId;
- private byte[] branchId;
- private int formatId;
-
- public RecoveryInformation(XATransactionId xaId, byte[] recoveryInfo)
- {
- this.Xid = xaId;
- this.txRecoveryInfo = recoveryInfo;
- }
-
- public byte[] TxRecoveryInfo
- {
- get { return this.txRecoveryInfo; }
- set { this.txRecoveryInfo = value; }
- }
-
- public XATransactionId Xid
- {
- get
- {
- XATransactionId xid = new XATransactionId();
- xid.BranchQualifier = this.branchId;
- xid.GlobalTransactionId = this.globalTxId;
- xid.FormatId = this.formatId;
-
- return xid;
- }
-
- set
- {
- this.branchId = value.BranchQualifier;
- this.globalTxId = value.GlobalTransactionId;
- this.formatId = value.FormatId;
- }
- }
- }
-
- private RecoveryInformation TryOpenRecoveryInfoFile()
- {
- string filename = ResourceManagerId + ".bin";
- RecoveryInformation result = null;
-
- Tracer.Debug("Checking for Recoverable Transactions filename: " + filename);
-
- lock (logFileLock)
- {
- try
- {
- if (!File.Exists(filename))
- {
- return null;
- }
-
- using(FileStream recoveryLog = new FileStream(filename, FileMode.Open, FileAccess.Read))
- {
- Tracer.Debug("Found Recovery Log File: " + filename);
- IFormatter formatter = new BinaryFormatter();
- result = formatter.Deserialize(recoveryLog) as RecoveryInformation;
- }
- }
- catch(Exception ex)
- {
- Tracer.InfoFormat("Error while opening Recovery file {0} error message: {1}", filename, ex.Message);
- // Nothing to restore.
- return null;
- }
- }
-
- return result;
+ RecoveryLogger.Purge();
}
private XATransactionId[] TryRecoverBrokerTXIds()
@@ -659,118 +601,22 @@
return new XATransactionId[0];
}
- private void StoreRecoveryInformation(byte[] recoveryInfo)
- {
- if (recoveryInfo == null || recoveryInfo.Length == 0)
- {
- return;
- }
-
- try
- {
- lock (logFileLock)
- {
- string filename = ResourceManagerId + ".bin";
- XATransactionId xid = this.transactionId as XATransactionId;
-
- RecoveryInformation info = new RecoveryInformation(xid, recoveryInfo);
-
- Tracer.Debug("Serializing Recovery Info to file: " + filename);
-
- IFormatter formatter = new BinaryFormatter();
- using (FileStream recoveryLog = new FileStream(filename, FileMode.OpenOrCreate, FileAccess.Write))
- {
- formatter.Serialize(recoveryLog, info);
- }
- }
- }
- catch (Exception ex)
- {
- Tracer.Error("Error while storing TX Recovery Info, message: " + ex.Message);
- throw;
- }
- }
-
- private void ClearStoredRecoveryInformation()
- {
- lock (logFileLock)
- {
- string filename = ResourceManagerId + ".bin";
-
- try
- {
- Tracer.Debug("Attempting to remove stale Recovery Info file: " + filename);
- File.Delete(filename);
- }
- catch(Exception ex)
- {
- Tracer.Debug("Caught Exception while removing stale RecoveryInfo file: " + ex.Message);
- return;
- }
- }
- }
-
#endregion
- public string ResourceManagerId
+ internal IRecoveryLogger RecoveryLogger
{
- get { return GuidFromId(this.connection.ResourceManagerId).ToString(); }
+ get { return (this.connection as NetTxConnection).RecoveryPolicy.RecoveryLogger; }
+ }
+
+ internal string ResourceManagerId
+ {
+ get { return (this.connection as NetTxConnection).ResourceManagerGuid.ToString(); }
}
internal Guid ResourceManagerGuid
{
- get { return GuidFromId(this.connection.ResourceManagerId); }
+ get { return (this.connection as NetTxConnection).ResourceManagerGuid; }
}
- private static Guid GuidFromId(string id)
- {
- // Remove the ID: prefix, that's non-unique to be sure
- string resId = id.TrimStart("ID:".ToCharArray());
-
- // Remaing parts should be host-port-timestamp-instance:sequence
- string[] parts = resId.Split(":-".ToCharArray());
-
- // We don't use the hostname here, just the remaining bits.
- int a = Int32.Parse(parts[1]);
- short b = Int16.Parse(parts[3]);
- short c = Int16.Parse(parts[4]);
- byte[] d = System.BitConverter.GetBytes(Int64.Parse(parts[2]));
-
- return new Guid(a, b, c, d);
- }
-
- private static string IdFromGuid(Guid guid)
- {
- byte[] bytes = guid.ToByteArray();
-
- int port = System.BitConverter.ToInt32(bytes, 0);
- int instance = System.BitConverter.ToInt16(bytes, 4);
- int sequence = System.BitConverter.ToInt16(bytes, 6);
- long timestamp = System.BitConverter.ToInt64(bytes, 8);
-
- StringBuilder builder = new StringBuilder("ID:");
-
- string hostname = "localhost";
-
- try
- {
- hostname = Dns.GetHostName();
- }
- catch
- {
- }
-
- builder.Append(hostname);
- builder.Append("-");
- builder.Append(port);
- builder.Append("-");
- builder.Append(timestamp);
- builder.Append("-");
- builder.Append(instance);
- builder.Append(":");
- builder.Append(sequence);
-
- return builder.ToString();
- }
}
}
diff --git a/src/main/csharp/Transactions/IRecoveryLogger.cs b/src/main/csharp/Transactions/IRecoveryLogger.cs
new file mode 100644
index 0000000..ed0866f
--- /dev/null
+++ b/src/main/csharp/Transactions/IRecoveryLogger.cs
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections;
+using System.Collections.Generic;
+
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ.Transactions
+{
+ /// <summary>
+ /// Interface for a Logger object used to store and retrieve Recovery
+ /// Information needed to recover distributed transactions that operate
+ /// in the Microsoft Distributed Transaction Context.
+ /// </summary>
+ public interface IRecoveryLogger
+ {
+ void LogRecoveryInfo(XATransactionId xid, byte[] recoveryInformation);
+
+ KeyValuePair<XATransactionId, byte[]>[] GetRecoverables();
+
+ void LogRecovered(XATransactionId xid);
+
+ string LoggerType{ get; }
+
+ string ResourceManagerId{ get; set; }
+
+ void Purge();
+ }
+}
+
diff --git a/src/main/csharp/Transactions/IRecoveryLoggerFactory.cs b/src/main/csharp/Transactions/IRecoveryLoggerFactory.cs
new file mode 100644
index 0000000..943f6bd
--- /dev/null
+++ b/src/main/csharp/Transactions/IRecoveryLoggerFactory.cs
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.ActiveMQ.Transactions
+{
+ public interface IRecoveryLoggerFactory
+ {
+ IRecoveryLogger Create();
+ }
+}
+
diff --git a/src/main/csharp/Transactions/RecoveryFileLogger.cs b/src/main/csharp/Transactions/RecoveryFileLogger.cs
new file mode 100644
index 0000000..9f39e32
--- /dev/null
+++ b/src/main/csharp/Transactions/RecoveryFileLogger.cs
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.IO;
+using System.Reflection;
+using System.Collections.Generic;
+using System.Runtime.Serialization;
+using System.Runtime.Serialization.Formatters.Binary;
+
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ.Transactions
+{
+ public class RecoveryFileLogger : IRecoveryLogger
+ {
+ private string location;
+ private string resourceManagerId;
+ private object syncRoot = new object();
+
+ public RecoveryFileLogger()
+ {
+ // Set the path by default to the location of the executing assembly.
+ // May need to change this to current working directory, not sure.
+ this.location = Assembly.GetExecutingAssembly().Location;
+ }
+
+ /// <summary>
+ /// The Unique Id of the Resource Manager that this logger is currently
+ /// logging recovery information for.
+ /// </summary>
+ public string ResourceManagerId
+ {
+ get { return this.resourceManagerId; }
+ set { this.resourceManagerId = value; }
+ }
+
+ /// <summary>
+ /// The Path to the location on disk where the recovery log is written
+ /// to and read from.
+ /// </summary>
+ public string Location
+ {
+ get { return this.location; }
+ set { this.location = value; }
+ }
+
+ public void LogRecoveryInfo(XATransactionId xid, byte[] recoveryInformation)
+ {
+ if (recoveryInformation == null || recoveryInformation.Length == 0)
+ {
+ return;
+ }
+
+ try
+ {
+ lock (syncRoot)
+ {
+ string filename = Location + ResourceManagerId + ".bin";
+ RecoveryInformation info = new RecoveryInformation(xid, recoveryInformation);
+ Tracer.Debug("Serializing Recovery Info to file: " + filename);
+
+ IFormatter formatter = new BinaryFormatter();
+ using (FileStream recoveryLog = new FileStream(filename, FileMode.OpenOrCreate, FileAccess.Write))
+ {
+ formatter.Serialize(recoveryLog, info);
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ Tracer.Error("Error while storing TX Recovery Info, message: " + ex.Message);
+ throw;
+ }
+ }
+
+ public KeyValuePair<XATransactionId, byte[]>[] GetRecoverables()
+ {
+ KeyValuePair<XATransactionId, byte[]>[] result = new KeyValuePair<XATransactionId, byte[]>[0];
+ RecoveryInformation info = TryOpenRecoveryInfoFile();
+
+ if(result != null)
+ {
+ result = new KeyValuePair<XATransactionId, byte[]>[1];
+ result[0] = new KeyValuePair<XATransactionId, byte[]>(info.Xid, info.TxRecoveryInfo);
+ }
+
+ return result;
+ }
+
+ public void LogRecovered(XATransactionId xid)
+ {
+ lock (syncRoot)
+ {
+ string filename = Location + ResourceManagerId + ".bin";
+
+ try
+ {
+ Tracer.Debug("Attempting to remove stale Recovery Info file: " + filename);
+ File.Delete(filename);
+ }
+ catch(Exception ex)
+ {
+ Tracer.Debug("Caught Exception while removing stale RecoveryInfo file: " + ex.Message);
+ return;
+ }
+ }
+ }
+
+ public void Purge()
+ {
+ lock (syncRoot)
+ {
+ string filename = Location + ResourceManagerId + ".bin";
+
+ try
+ {
+ Tracer.Debug("Attempting to remove stale Recovery Info file: " + filename);
+ File.Delete(filename);
+ }
+ catch(Exception ex)
+ {
+ Tracer.Debug("Caught Exception while removing stale RecoveryInfo file: " + ex.Message);
+ return;
+ }
+ }
+ }
+
+ public string LoggerType
+ {
+ get { return "file"; }
+ }
+
+ #region Recovery File Opeations
+
+ [Serializable]
+ private sealed class RecoveryInformation
+ {
+ private byte[] txRecoveryInfo;
+ private byte[] globalTxId;
+ private byte[] branchId;
+ private int formatId;
+
+ public RecoveryInformation(XATransactionId xaId, byte[] recoveryInfo)
+ {
+ this.Xid = xaId;
+ this.txRecoveryInfo = recoveryInfo;
+ }
+
+ public byte[] TxRecoveryInfo
+ {
+ get { return this.txRecoveryInfo; }
+ set { this.txRecoveryInfo = value; }
+ }
+
+ public XATransactionId Xid
+ {
+ get
+ {
+ XATransactionId xid = new XATransactionId();
+ xid.BranchQualifier = this.branchId;
+ xid.GlobalTransactionId = this.globalTxId;
+ xid.FormatId = this.formatId;
+
+ return xid;
+ }
+
+ set
+ {
+ this.branchId = value.BranchQualifier;
+ this.globalTxId = value.GlobalTransactionId;
+ this.formatId = value.FormatId;
+ }
+ }
+ }
+
+ private RecoveryInformation TryOpenRecoveryInfoFile()
+ {
+ string filename = Location + ResourceManagerId + ".bin";
+ RecoveryInformation result = null;
+
+ Tracer.Debug("Checking for Recoverable Transactions filename: " + filename);
+
+ lock (syncRoot)
+ {
+ try
+ {
+ if (!File.Exists(filename))
+ {
+ return null;
+ }
+
+ using(FileStream recoveryLog = new FileStream(filename, FileMode.Open, FileAccess.Read))
+ {
+ Tracer.Debug("Found Recovery Log File: " + filename);
+ IFormatter formatter = new BinaryFormatter();
+ result = formatter.Deserialize(recoveryLog) as RecoveryInformation;
+ }
+ }
+ catch(Exception ex)
+ {
+ Tracer.InfoFormat("Error while opening Recovery file {0} error message: {1}", filename, ex.Message);
+ // Nothing to restore.
+ return null;
+ }
+ }
+
+ return result;
+ }
+
+ #endregion
+ }
+}
+
diff --git a/src/main/csharp/Transactions/RecoveryFileLoggerFactory.cs b/src/main/csharp/Transactions/RecoveryFileLoggerFactory.cs
new file mode 100644
index 0000000..e878b56
--- /dev/null
+++ b/src/main/csharp/Transactions/RecoveryFileLoggerFactory.cs
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.ActiveMQ.Transactions
+{
+ [RecoveryLoggerFactory("file")]
+ public class RecoveryFileLoggerFactory : IRecoveryLoggerFactory
+ {
+ public IRecoveryLogger Create()
+ {
+ return new RecoveryFileLogger();
+ }
+ }
+}
+
diff --git a/src/main/csharp/Transactions/RecoveryLoggerFactoryAttribute.cs b/src/main/csharp/Transactions/RecoveryLoggerFactoryAttribute.cs
new file mode 100644
index 0000000..aa2fb0a
--- /dev/null
+++ b/src/main/csharp/Transactions/RecoveryLoggerFactoryAttribute.cs
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+using Apache.NMS.ActiveMQ.Util;
+
+namespace Apache.NMS.ActiveMQ.Transactions
+{
+ /// <summary>
+ /// Attribute that decorates IRecoveryLoggerFactory implementations to allow
+ /// the Recovery Policy to find all the available factories dynamically.
+ /// </summary>
+
+ public class RecoveryLoggerFactoryAttribute : FactoryAttribute
+ {
+ public RecoveryLoggerFactoryAttribute(string scheme) : base(scheme)
+ {
+ }
+ }
+}
+