blob: f2cb5740d3cac2343dc24e851f568752766a5bd4 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <windows.h>
#include <msclr\lock.h>
#include "qpid/client/AsyncSession.h"
#include "qpid/framing/FrameSet.h"
#include "qpid/framing/AMQFrame.h"
#include "MessageBodyStream.h"
namespace Apache {
namespace Qpid {
namespace Interop {
using namespace System;
using namespace System::Runtime::InteropServices;
using namespace System::Threading;
using namespace msclr;
using namespace qpid::client;
using namespace qpid::framing;
// Thefolowing def must match "Frames" private typedef.
// TODO: make "Frames" publicly visible.
typedef qpid::InlineVector<AMQFrame, 4> FrameSetFrames;
using namespace std;
static void ThrowIfBadArgs (array<unsigned char>^ buffer, int offset, int count)
{
if (buffer == nullptr)
throw gcnew ArgumentNullException("buffer");
if (offset < 0)
throw gcnew ArgumentOutOfRangeException("offset");
if (count < 0)
throw gcnew ArgumentOutOfRangeException("count");
if ((offset + count) > buffer->Length)
throw gcnew ArgumentException("offset + count");
}
// Input stream constructor
MessageBodyStream::MessageBodyStream(FrameSet::shared_ptr *fspp)
{
isInputStream = true;
frameSetpp = fspp;
fragmentCount = 0;
length = 0;
position = 0;
currentFramep = NULL;
const std::string *datap; // pointer to the fragment's string variable that holds the content
for(FrameSetFrames::const_iterator i = (*frameSetpp)->begin(); i != (*frameSetpp)->end(); i++) {
if (i->getBody()->type() == CONTENT_BODY) {
fragmentCount++;
datap = &(i->castBody<AMQContentBody>()->getData());
length += datap->size();
}
}
// fragmentCount can be zero for an empty message
fragmentIndex = 0;
fragmentPosition = 0;
if (fragmentCount == 0) {
currentFragment = NULL;
fragmentLength = 0;
}
else if (fragmentCount == 1) {
currentFragment = datap->data();
fragmentLength = (int) length;
}
else {
fragments = gcnew array<IntPtr>(fragmentCount);
fragmentIndex = 0;
for(FrameSetFrames::const_iterator i = (*frameSetpp)->begin(); i != (*frameSetpp)->end(); i++) {
if (i->getBody()->type() == CONTENT_BODY) {
datap = &(i->castBody<AMQContentBody>()->getData());
fragments[fragmentIndex++] = (IntPtr) (void *) datap;
}
}
fragmentIndex = 0;
datap = (const std::string *) fragments[0].ToPointer();
currentFragment = datap->data();
fragmentLength = datap->size();
}
}
int MessageBodyStream::Read(array<unsigned char>^ buffer, int offset, int count)
{
if (!isInputStream)
throw gcnew NotSupportedException();
if (disposed)
throw gcnew ObjectDisposedException("Stream");
if (count == 0)
return 0;
ThrowIfBadArgs(buffer, offset, count);
int nRead = 0;
int remaining = count;
while (nRead < count) {
int fragAvail = fragmentLength - fragmentPosition;
int copyCount = min (fragAvail, remaining);
if (copyCount == 0) {
// no more to read
return nRead;
}
// copy from native space
IntPtr nativep = (IntPtr) (void *) (currentFragment + fragmentPosition);
Marshal::Copy (nativep, buffer, offset, copyCount);
nRead += copyCount;
remaining -= copyCount;
fragmentPosition += copyCount;
offset += copyCount;
// advance to next fragment?
if (fragmentPosition == fragmentLength) {
if (++fragmentIndex < fragmentCount) {
const std::string *datap = (const std::string *) fragments[fragmentIndex].ToPointer();
currentFragment = datap->data();
fragmentLength = datap->size();
fragmentPosition = 0;
}
}
}
return nRead;
}
void MessageBodyStream::pushCurrentFrame(bool lastFrame)
{
// set flags as in SessionImpl::sendContent.
if (currentFramep->getBody()->type() == CONTENT_BODY) {
if ((fragmentCount == 1) && lastFrame) {
// only one content frame
currentFramep->setFirstSegment(false);
}
else {
currentFramep->setFirstSegment(false);
currentFramep->setLastSegment(true);
if (fragmentCount != 1) {
currentFramep->setFirstFrame(false);
}
if (!lastFrame) {
currentFramep->setLastFrame(false);
}
}
}
else {
// the header frame
currentFramep->setFirstSegment(false);
if (!lastFrame) {
// there will be at least one content frame
currentFramep->setLastSegment(false);
}
}
// add to frame set. This makes a copy and ref counts the body
(*frameSetpp)->append(*currentFramep);
delete currentFramep;
currentFramep = NULL;
}
IntPtr MessageBodyStream::GetFrameSet()
{
if (currentFramep != NULL) {
// No more content. Tidy up the pending (possibly single header) frame.
pushCurrentFrame(true);
}
if (frameSetpp == NULL) {
return (IntPtr) NULL;
}
// shared_ptr.get()
return (IntPtr) (void *) (*frameSetpp).get();
}
IntPtr MessageBodyStream::GetHeader()
{
return (IntPtr) headerBodyp;
}
// Ouput stream constructor
MessageBodyStream::MessageBodyStream(int maxFrameSize)
{
isInputStream = false;
maxFrameContentSize = maxFrameSize - AMQFrame::frameOverhead();
SequenceNumber unused; // only meaningful on incoming frames
frameSetpp = new FrameSet::shared_ptr(new FrameSet(unused));
fragmentCount = 0;
length = 0;
position = 0;
// header goes first in the outgoing frameset
boost::intrusive_ptr<AMQBody> headerBody(new AMQHeaderBody);
currentFramep = new AMQFrame(headerBody);
headerBodyp = static_cast<AMQHeaderBody*>(headerBody.get());
// mark this header frame as "full" to force the first write to create a new content frame
fragmentPosition = maxFrameContentSize;
}
void MessageBodyStream::Write(array<unsigned char>^ buffer, int offset, int count)
{
if (isInputStream)
throw gcnew NotSupportedException();
if (disposed)
throw gcnew ObjectDisposedException("Stream");
if (count == 0)
return;
ThrowIfBadArgs(buffer, offset, count);
if (currentFramep == NULL) {
// GetFrameSet() has been called and we no longer exclusively own the underlying frames.
throw gcnew InvalidOperationException ("Mesage Body output already completed");
}
if (count <= 0)
return;
// keep GC memory movement at bay while copying to native space
pin_ptr<unsigned char> pinnedBuf = &buffer[0];
string *datap;
int remaining = count;
while (remaining > 0) {
if (fragmentPosition == maxFrameContentSize) {
// move to a new frame, but not until ready to add new content.
// zero content is valid, or the final write may exactly fill to maxFrameContentSize
pushCurrentFrame(false);
currentFramep = new AMQFrame(AMQContentBody());
fragmentPosition = 0;
fragmentCount++;
}
int copyCount = min (remaining, (maxFrameContentSize - fragmentPosition));
datap = &(currentFramep->castBody<AMQContentBody>()->getData());
char *outp = (char *) pinnedBuf + offset;
if (fragmentPosition == 0) {
datap->assign(outp, copyCount);
}
else {
datap->append(outp, copyCount);
}
position += copyCount;
fragmentPosition += copyCount;
remaining -= copyCount;
offset += copyCount;
}
}
void MessageBodyStream::Cleanup()
{
{
lock l(this);
if (disposed)
return;
disposed = true;
}
try {}
finally
{
if (frameSetpp != NULL) {
delete frameSetpp;
frameSetpp = NULL;
}
if (currentFramep != NULL) {
delete currentFramep;
currentFramep = NULL;
}
}
}
MessageBodyStream::~MessageBodyStream()
{
Cleanup();
}
MessageBodyStream::!MessageBodyStream()
{
Cleanup();
}
void MessageBodyStream::Close()
{
// Simulate Dispose()...
Cleanup();
GC::SuppressFinalize(this);
}
}}} // namespace Apache::Qpid::Interop