Fixed the loading of 32-bit vs. 64-bit implementations.
Refactoring the publisher/subscriber objects.
Fixes [AMQNET-333]. (See https://issues.apache.org/jira/browse/AMQNET-333)
diff --git a/src/main/csharp/CommonConnectionFactory.cs b/src/main/csharp/CommonConnectionFactory.cs
index c19537d..d579d5a 100644
--- a/src/main/csharp/CommonConnectionFactory.cs
+++ b/src/main/csharp/CommonConnectionFactory.cs
@@ -90,7 +90,7 @@
try
{
- factoryAssembly = Assembly.Load(fullFileName);
+ factoryAssembly = Assembly.LoadFile(fullFileName);
if(null != factoryAssembly)
{
Tracer.DebugFormat("Succesfully loaded provider: {0}", fullFileName);
@@ -126,18 +126,21 @@
ArrayList pathList = new ArrayList();
// Check the current folder first.
- pathList.Add("");
+ pathList.Add(Environment.CurrentDirectory);
- // Check the folder the assembly is located in.
AppDomain currentDomain = AppDomain.CurrentDomain;
+ // Check the folder the assembly is located in.
pathList.Add(Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location));
- if(null != currentDomain.BaseDirectory)
+
+ // Check the domain's base directory
+ if(!string.IsNullOrEmpty(currentDomain.BaseDirectory))
{
pathList.Add(currentDomain.BaseDirectory);
}
- if(null != currentDomain.RelativeSearchPath)
+ // Search the domain's relative paths.
+ if(!string.IsNullOrEmpty(currentDomain.RelativeSearchPath))
{
pathList.Add(currentDomain.RelativeSearchPath);
}
diff --git a/src/main/csharp/Connection.cs b/src/main/csharp/Connection.cs
index 2b2a1a1..13f3d78 100644
--- a/src/main/csharp/Connection.cs
+++ b/src/main/csharp/Connection.cs
@@ -16,7 +16,7 @@
*/
using System;
-using CLRZMQ = ZMQ;
+using ZContext = ZMQ.Context;
namespace Apache.NMS.ZMQ
{
@@ -38,7 +38,7 @@
/// <summary>
/// ZMQ context
/// </summary>
- static private CLRZMQ.Context _context = new CLRZMQ.Context(1);
+ static private ZContext _context = new ZContext(1);
/// <summary>
/// Starts message delivery for this connection.
@@ -155,7 +155,7 @@
/// <summary>
/// Gets ZMQ connection context
/// </summary>
- static internal CLRZMQ.Context Context
+ static internal ZContext Context
{
get
{
diff --git a/src/main/csharp/MessageConsumer.cs b/src/main/csharp/MessageConsumer.cs
index db4dae8..dcc695b 100644
--- a/src/main/csharp/MessageConsumer.cs
+++ b/src/main/csharp/MessageConsumer.cs
@@ -16,9 +16,12 @@
*/
using System;
+using System.Text;
using System.Threading;
using Apache.NMS.Util;
-using CLRZMQ = ZMQ;
+using ZSocket = ZMQ.Socket;
+using ZSocketType = ZMQ.SocketType;
+using ZSendRecvOpt = ZMQ.SendRecvOpt;
namespace Apache.NMS.ZMQ
{
@@ -31,7 +34,14 @@
private readonly Session session;
private readonly AcknowledgementMode acknowledgementMode;
- private ZmqSubscriber messageSubscriber;
+ /// <summary>
+ /// Socket object
+ /// </summary>
+ private ZSocket messageSubscriber = null;
+ /// <summary>
+ /// Context binding string
+ /// </summary>
+ private string contextBinding;
private event MessageListener listener;
private int listenerCount = 0;
private Thread asyncDeliveryThread = null;
@@ -45,11 +55,31 @@
set { this.consumerTransformer = value; }
}
- public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, ZmqSubscriber messageSubscriber)
+ public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, IDestination destination, string selector)
{
+ if(null == Connection.Context)
+ {
+ throw new NMSConnectionException();
+ }
+
this.session = session;
this.acknowledgementMode = acknowledgementMode;
- this.messageSubscriber = messageSubscriber;
+ this.messageSubscriber = Connection.Context.Socket(ZSocketType.SUB);
+ if(null == this.messageSubscriber)
+ {
+ throw new ResourceAllocationException();
+ }
+
+ string clientId = session.Connection.ClientId;
+
+ this.contextBinding = session.Connection.BrokerUri.LocalPath;
+ if(!string.IsNullOrEmpty(clientId))
+ {
+ this.messageSubscriber.StringToIdentity(clientId, Encoding.Unicode);
+ }
+
+ this.messageSubscriber.Connect(contextBinding);
+ this.messageSubscriber.Subscribe(selector ?? string.Empty, Encoding.ASCII);
}
public event MessageListener Listener
@@ -87,7 +117,7 @@
IMessage nmsMessage = null;
if(null != messageSubscriber)
{
- string messageText = messageSubscriber.Subscriber.Recv(System.Text.Encoding.ASCII, CLRZMQ.SendRecvOpt.NOBLOCK);
+ string messageText = messageSubscriber.Recv(Encoding.ASCII, ZSendRecvOpt.NOBLOCK);
if(!string.IsNullOrEmpty(messageText))
{
nmsMessage = ToNmsMessage(messageText);
@@ -107,7 +137,7 @@
IMessage nmsMessage = null;
if(null != messageSubscriber)
{
- string messageText = messageSubscriber.Subscriber.Recv(System.Text.Encoding.ASCII, timeout.Milliseconds);
+ string messageText = messageSubscriber.Recv(Encoding.ASCII, timeout.Milliseconds);
if(!string.IsNullOrEmpty(messageText))
{
nmsMessage = ToNmsMessage(messageText);
@@ -173,7 +203,7 @@
if(asyncDelivery.CompareAndSet(false, true))
{
asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop));
- asyncDeliveryThread.Name = "Message Consumer Dispatch: " + messageSubscriber.Binding;
+ asyncDeliveryThread.Name = "Message Consumer Dispatch: " + contextBinding;
asyncDeliveryThread.IsBackground = true;
asyncDeliveryThread.Start();
}
@@ -277,7 +307,7 @@
private ZmqMessage ToZmqMessage(string messageText)
{
ZmqMessage message = new ZmqMessage();
- message.Destination = new Queue(session.Connection.BrokerUri.LocalPath);
+ message.Destination = new Queue(this.contextBinding);
message.ClientId = session.Connection.ClientId;
message.Text = messageText;
return message;
diff --git a/src/main/csharp/MessageProducer.cs b/src/main/csharp/MessageProducer.cs
index 17e7c21..e3369d7 100644
--- a/src/main/csharp/MessageProducer.cs
+++ b/src/main/csharp/MessageProducer.cs
@@ -16,6 +16,9 @@
*/
using System;
+using ZSocket = ZMQ.Socket;
+using ZSocketType = ZMQ.SocketType;
+using System.Text;
namespace Apache.NMS.ZMQ
{
@@ -24,11 +27,13 @@
/// </summary>
public class MessageProducer : IMessageProducer
{
-
private readonly Session session;
- private Destination destination;
+ private IDestination destination;
- //private long messageCounter;
+ /// <summary>
+ /// Socket object
+ /// </summary>
+ private ZSocket messageProducer = null;
private MsgDeliveryMode deliveryMode;
private TimeSpan timeToLive;
private MsgPriority priority;
@@ -42,13 +47,24 @@
set { this.producerTransformer = value; }
}
- public MessageProducer(Session session, Destination destination)
+ public MessageProducer(Connection connection, Session session, IDestination destination)
{
+ if(null == Connection.Context)
+ {
+ throw new NMSConnectionException();
+ }
+
this.session = session;
this.destination = destination;
- if(destination != null)
+ this.messageProducer = Connection.Context.Socket(ZSocketType.SUB);
+
+ string clientId = connection.ClientId;
+ if(!string.IsNullOrEmpty(clientId))
{
+ this.messageProducer.StringToIdentity(clientId, Encoding.Unicode);
}
+
+ this.messageProducer.Connect(connection.BrokerUri.LocalPath);
}
public void Send(IMessage message)
@@ -68,11 +84,7 @@
public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
{
- // TODO: Implement sending a message.
- }
-
- public void Close()
- {
+ messageProducer.Send();
}
public void Dispose()
@@ -80,6 +92,15 @@
Close();
}
+ public void Close()
+ {
+ if(null != messageProducer)
+ {
+ messageProducer.Dispose();
+ messageProducer = null;
+ }
+ }
+
public IMessage CreateMessage()
{
return session.CreateMessage();
@@ -144,7 +165,7 @@
public IDestination Destination
{
get { return destination; }
- set { destination = (Destination) value; }
+ set { destination = value; }
}
public MsgPriority Priority
diff --git a/src/main/csharp/Session.cs b/src/main/csharp/Session.cs
index 66a4323..7652050 100644
--- a/src/main/csharp/Session.cs
+++ b/src/main/csharp/Session.cs
@@ -57,7 +57,7 @@
public IMessageProducer CreateProducer(IDestination destination)
{
- throw new NotSupportedException("Producer is not supported/implemented");
+ return new MessageProducer(connection, this, destination);
}
#endregion
@@ -76,7 +76,7 @@
{
// Subscriber client reads messages from a publisher and forwards messages
// through the message consumer
- return new MessageConsumer(this, acknowledgementMode, new ZmqSubscriber(connection, destination, selector));
+ return new MessageConsumer(this, acknowledgementMode, destination, selector);
}
public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
diff --git a/src/main/csharp/ZmqSubscriber.cs b/src/main/csharp/ZmqSubscriber.cs
deleted file mode 100644
index 0493ac6..0000000
--- a/src/main/csharp/ZmqSubscriber.cs
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-using System.Text;
-using CLRZMQ = ZMQ;
-
-namespace Apache.NMS.ZMQ
-{
- public class ZmqSubscriber : IDisposable
- {
- /// <summary>
- /// Socket object
- /// </summary>
- private CLRZMQ.Socket m_Subscriber = null;
-
- /// <summary>
- /// Context binding string
- /// </summary>
- private string m_Binding;
-
- public ZmqSubscriber(Connection connection, IDestination destination, string selector)
- {
- if(null != Connection.Context)
- {
- m_Subscriber = Connection.Context.Socket(CLRZMQ.SocketType.SUB);
- }
- Connect(connection.ClientId, connection.BrokerUri.LocalPath, selector);
- }
-
- private void Connect(string clientId, string binding, string selector)
- {
- m_Binding = binding;
- if(null != m_Subscriber)
- {
- if(!string.IsNullOrEmpty(clientId))
- {
- m_Subscriber.StringToIdentity(clientId, Encoding.Unicode);
- }
- m_Subscriber.Connect(binding);
- m_Subscriber.Subscribe(!string.IsNullOrEmpty(selector) ? selector : "", System.Text.Encoding.ASCII);
- }
- }
-
- public void Dispose()
- {
- if(null != m_Subscriber)
- {
- m_Subscriber.Dispose();
- m_Subscriber = null;
- }
- }
-
- #region Properties
- internal CLRZMQ.Socket Subscriber
- {
- get { return m_Subscriber; }
- }
-
- public string Binding
- {
- get { return m_Binding; }
- }
- #endregion
- }
-}
diff --git a/vs2010-zmq-net-4.0x64.csproj b/vs2010-zmq-net-4.0x64.csproj
index 70869a1..cea4f86 100644
--- a/vs2010-zmq-net-4.0x64.csproj
+++ b/vs2010-zmq-net-4.0x64.csproj
@@ -70,7 +70,6 @@
<SubType>Code</SubType>
</Compile>
<Compile Include="src\main\csharp\TextMessage.cs" />
- <Compile Include="src\main\csharp\ZmqSubscriber.cs" />
<Compile Include="src\main\csharp\ZmqMessage.cs" />
</ItemGroup>
<ItemGroup>
diff --git a/vs2010-zmq-net-4.0x86.csproj b/vs2010-zmq-net-4.0x86.csproj
index fc0758f..8a69c48 100644
--- a/vs2010-zmq-net-4.0x86.csproj
+++ b/vs2010-zmq-net-4.0x86.csproj
@@ -70,7 +70,6 @@
<SubType>Code</SubType>
</Compile>
<Compile Include="src\main\csharp\TextMessage.cs" />
- <Compile Include="src\main\csharp\ZmqSubscriber.cs" />
<Compile Include="src\main\csharp\ZmqMessage.cs" />
</ItemGroup>
<ItemGroup>