| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| |
| /* |
| * This program parses strings of the form "node/subject;{options}" as |
| * used in the Qpid messaging API. It provides basic wiring |
| * capabilities to create/delete temporary queues (to topic |
| * subsciptions) and unbound "point and shoot" queues. |
| */ |
| |
| |
| #include <windows.h> |
| #include <msclr\lock.h> |
| #include <oletx2xa.h> |
| |
| #include "qpid/client/AsyncSession.h" |
| #include "qpid/client/SubscriptionManager.h" |
| #include "qpid/client/Connection.h" |
| #include "qpid/client/SessionImpl.h" |
| #include "qpid/client/SessionBase_0_10Access.h" |
| #include "qpid/client/Message.h" |
| #include "qpid/framing/MessageTransferBody.h" |
| #include "qpid/client/Future.h" |
| |
| #include "AmqpConnection.h" |
| #include "AmqpSession.h" |
| #include "AmqpMessage.h" |
| #include "MessageBodyStream.h" |
| #include "InputLink.h" |
| #include "OutputLink.h" |
| #include "QpidMarshal.h" |
| #include "QpidException.h" |
| #include "QpidAddress.h" |
| |
| namespace Apache { |
| namespace Qpid { |
| namespace Interop { |
| |
| using namespace System; |
| using namespace System::Runtime::InteropServices; |
| using namespace msclr; |
| |
| using namespace qpid::client; |
| using namespace std; |
| |
| QpidAddress::QpidAddress(String^ s, bool isInput) { |
| address = s; |
| nodeName = s; |
| isInputChannel = isInput; |
| isQueue = true; |
| |
| if (address->StartsWith("//")) { |
| // special case old style address to default exchange, |
| // no options, output only |
| if ((s->IndexOf(';') != -1) || isInputChannel) |
| throw gcnew ArgumentException("Invalid 0-10 address: " + address); |
| nodeName = nodeName->Substring(2); |
| return; |
| } |
| |
| String^ options = nullptr; |
| int pos = s->IndexOf(';'); |
| if (pos != -1) { |
| options = s->Substring(pos + 1); |
| nodeName = s->Substring(0, pos); |
| |
| if (options->Length > 0) { |
| if (!options->StartsWith("{") || !options->EndsWith("}")) |
| throw gcnew ArgumentException("Invalid address: " + address); |
| options = options->Substring(1, options->Length - 2); |
| array<String^>^ subOpts = options->Split(String(",: ").ToCharArray(), StringSplitOptions::RemoveEmptyEntries); |
| |
| if ((subOpts->Length % 2) != 0) |
| throw gcnew ArgumentException("Bad address (options): " + address); |
| |
| for (int i=0; i < subOpts->Length; i += 2) { |
| String^ opt = subOpts[i]; |
| String^ optArg = subOpts[i+1]; |
| if (opt->Equals("create")) { |
| creating = PolicyApplies(optArg); |
| } |
| else if (opt->Equals("delete")) { |
| deleting = PolicyApplies(optArg); |
| } |
| else if (opt->Equals("mode")) { |
| if (optArg->Equals("browse")) { |
| browsing = isInputChannel; |
| } |
| else if (!optArg->Equals("consume")) { |
| throw gcnew ArgumentException("Invalid browsing option: " + optArg); |
| } |
| } |
| else if (opt->Equals("assert") || opt->Equals("node")) { |
| throw gcnew ArgumentException("Unsupported address option: " + opt); |
| } |
| else { |
| throw gcnew ArgumentException("Bad address option: " + opt); |
| } |
| } |
| } |
| else |
| options = nullptr; |
| } |
| |
| pos = nodeName->IndexOf('/'); |
| if (pos != -1) { |
| subject = nodeName->Substring(pos + 1); |
| if (String::IsNullOrEmpty(subject)) |
| subject = nullptr; |
| nodeName = nodeName->Substring(0, pos); |
| } |
| } |
| |
| |
| QpidAddress^ QpidAddress::CreateAddress(String^ s, bool isInput) { |
| QpidAddress^ addr = gcnew QpidAddress(s, isInput); |
| return addr; |
| } |
| |
| |
| void QpidAddress::ResolveLink(AmqpSession^ amqpSession) { |
| |
| AsyncSession* asyncSessionp = (AsyncSession *) amqpSession->BorrowNativeSession().ToPointer(); |
| if (asyncSessionp == NULL) |
| throw gcnew ObjectDisposedException("session"); |
| |
| deleteName = nullptr; |
| isQueue = true; |
| |
| try { |
| Session session = sync(*asyncSessionp); |
| std::string n_name = QpidMarshal::ToNative(nodeName); |
| ExchangeBoundResult result = session.exchangeBound(arg::exchange=n_name, arg::queue=n_name); |
| |
| bool queueFound = !result.getQueueNotFound(); |
| bool exchangeFound = !result.getExchangeNotFound(); |
| |
| if (isInputChannel) { |
| |
| if (queueFound) { |
| linkName = nodeName; |
| if (deleting) |
| deleteName = nodeName; |
| } |
| else if (exchangeFound) { |
| isQueue = false; |
| String^ tmpkey = nullptr; |
| String^ tmpname = nodeName + "_" + Guid::NewGuid().ToString(); |
| bool haveSubject = !String::IsNullOrEmpty(subject); |
| FieldTable bindArgs; |
| |
| std::string exchangeType = session.exchangeQuery(n_name).getType(); |
| if (exchangeType == "topic") { |
| tmpkey = haveSubject ? subject : "#"; |
| } |
| else if (exchangeType == "fanout") { |
| tmpkey = tmpname; |
| } |
| else if (exchangeType == "headers") { |
| tmpkey = haveSubject ? subject : "match-all"; |
| if (haveSubject) |
| bindArgs.setString("qpid.subject", QpidMarshal::ToNative(subject)); |
| bindArgs.setString("x-match", "all"); |
| } |
| else if (exchangeType == "xml") { |
| tmpkey = haveSubject ? subject : ""; |
| if (haveSubject) { |
| String^ v = "declare variable $qpid.subject external; $qpid.subject = '" + |
| subject + "'"; |
| bindArgs.setString("xquery", QpidMarshal::ToNative(v)); |
| } |
| else |
| bindArgs.setString("xquery", "true()"); |
| } |
| else { |
| tmpkey = haveSubject ? subject : ""; |
| } |
| |
| std::string qn = QpidMarshal::ToNative(tmpname); |
| session.queueDeclare(arg::queue=qn, arg::autoDelete=true, arg::exclusive=true); |
| bool success = false; |
| try { |
| session.exchangeBind(arg::exchange=n_name, arg::queue=qn, |
| arg::bindingKey=QpidMarshal::ToNative(tmpkey), |
| arg::arguments=bindArgs); |
| bindKey = tmpkey; // remember for later cleanup |
| success = true; |
| } |
| finally { |
| if (!success) |
| session.queueDelete(arg::queue=qn); |
| } |
| linkName = tmpname; |
| deleteName = tmpname; |
| deleting = true; |
| } |
| else if (creating) { |
| // only create "point and shoot" queues for now |
| session.queueDeclare(arg::queue=QpidMarshal::ToNative(nodeName)); |
| // leave unbound |
| |
| linkName = nodeName; |
| |
| if (deleting) |
| deleteName = nodeName; |
| } |
| else { |
| throw gcnew ArgumentException("AMQP broker node not found: " + nodeName); |
| } |
| } |
| else { |
| // Output channel |
| |
| bool oldStyleUri = address->StartsWith("//"); |
| |
| if (queueFound) { |
| linkName = ""; // default exchange for point and shoot |
| routingKey = nodeName; |
| if (deleting) |
| deleteName = nodeName; |
| } |
| else if (exchangeFound && !oldStyleUri) { |
| isQueue = false; |
| linkName = nodeName; |
| routingKey = subject; |
| } |
| else if (creating) { |
| // only create "point and shoot" queues for now |
| session.queueDeclare(arg::queue=QpidMarshal::ToNative(nodeName)); |
| // leave unbound |
| linkName = ""; |
| routingKey = nodeName; |
| if (deleting) |
| deleteName = nodeName; |
| } |
| else { |
| throw gcnew ArgumentException("AMQP broker node not found: " + nodeName); |
| } |
| } |
| } |
| finally { |
| amqpSession->ReturnNativeSession(); |
| } |
| } |
| |
| void QpidAddress::CleanupLink(AmqpSession^ amqpSession) { |
| if (deleteName == nullptr) |
| return; |
| |
| AsyncSession* asyncSessionp = (AsyncSession *) amqpSession->BorrowNativeSession().ToPointer(); |
| if (asyncSessionp == NULL) { |
| // TODO: log it: can't undo tear down actions |
| return; |
| } |
| |
| try { |
| Session session = sync(*asyncSessionp); |
| std::string q = QpidMarshal::ToNative(deleteName); |
| if (isInputChannel && !isQueue) { |
| // undo the temp wiring to the topic |
| session.exchangeUnbind(arg::exchange=QpidMarshal::ToNative(nodeName), arg::queue=q, |
| arg::bindingKey=QpidMarshal::ToNative(bindKey)); |
| } |
| session.queueDelete(q); |
| } |
| catch (Exception^ e) { |
| // TODO: log it |
| } |
| finally { |
| amqpSession->ReturnNativeSession(); |
| } |
| } |
| |
| bool QpidAddress::PolicyApplies(String^ mode) { |
| if (mode->Equals("always")) |
| return true; |
| if (mode->Equals("sender")) |
| return !isInputChannel; |
| if (mode->Equals("receiver")) |
| return isInputChannel; |
| if (mode->Equals("never")) |
| return false; |
| |
| throw gcnew ArgumentException(String::Format("Bad address option {0} for {1}", mode, address)); |
| } |
| |
| }}} // namespace Apache::Qpid::Interop |