blob: 8870988108623ec76c64fd61dc9f898d526f2c33 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
using System;
using System.IO;
namespace Thrift.Transport
{
public class TBufferedTransport : TTransport, IDisposable
{
private readonly int bufSize;
private readonly MemoryStream inputBuffer = new MemoryStream(0);
private readonly MemoryStream outputBuffer = new MemoryStream(0);
private readonly TTransport transport;
public TBufferedTransport(TTransport transport, int bufSize = 1024)
{
if (transport == null)
throw new ArgumentNullException("transport");
if (bufSize <= 0)
throw new ArgumentException("bufSize", "Buffer size must be a positive number.");
this.transport = transport;
this.bufSize = bufSize;
}
public TTransport UnderlyingTransport
{
get
{
CheckNotDisposed();
return transport;
}
}
public override bool IsOpen
{
get
{
// We can legitimately throw here but be nice a bit.
// CheckNotDisposed();
return !_IsDisposed && transport.IsOpen;
}
}
public override void Open()
{
CheckNotDisposed();
transport.Open();
}
public override void Close()
{
CheckNotDisposed();
transport.Close();
}
public override int Read(byte[] buf, int off, int len)
{
CheckNotDisposed();
ValidateBufferArgs(buf, off, len);
if (!IsOpen)
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
if (inputBuffer.Capacity < bufSize)
inputBuffer.Capacity = bufSize;
while (true)
{
int got = inputBuffer.Read(buf, off, len);
if (got > 0)
return got;
inputBuffer.Seek(0, SeekOrigin.Begin);
inputBuffer.SetLength(inputBuffer.Capacity);
int filled = transport.Read(inputBuffer.GetBuffer(), 0, (int)inputBuffer.Length);
inputBuffer.SetLength(filled);
if (filled == 0)
return 0;
}
}
public override void Write(byte[] buf, int off, int len)
{
CheckNotDisposed();
ValidateBufferArgs(buf, off, len);
if (!IsOpen)
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
// Relative offset from "off" argument
int offset = 0;
if (outputBuffer.Length > 0)
{
int capa = (int)(outputBuffer.Capacity - outputBuffer.Length);
int writeSize = capa <= len ? capa : len;
outputBuffer.Write(buf, off, writeSize);
offset += writeSize;
if (writeSize == capa)
{
transport.Write(outputBuffer.GetBuffer(), 0, (int)outputBuffer.Length);
outputBuffer.SetLength(0);
}
}
while (len - offset >= bufSize)
{
transport.Write(buf, off + offset, bufSize);
offset += bufSize;
}
int remain = len - offset;
if (remain > 0)
{
if (outputBuffer.Capacity < bufSize)
outputBuffer.Capacity = bufSize;
outputBuffer.Write(buf, off + offset, remain);
}
}
private void InternalFlush()
{
if (!IsOpen)
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
if (outputBuffer.Length > 0)
{
transport.Write(outputBuffer.GetBuffer(), 0, (int)outputBuffer.Length);
outputBuffer.SetLength(0);
}
}
public override void Flush()
{
CheckNotDisposed();
InternalFlush();
transport.Flush();
}
public override IAsyncResult BeginFlush(AsyncCallback callback, object state)
{
CheckNotDisposed();
InternalFlush();
return transport.BeginFlush( callback, state);
}
public override void EndFlush(IAsyncResult asyncResult)
{
transport.EndFlush( asyncResult);
}
protected void CheckNotDisposed()
{
if (_IsDisposed)
throw new ObjectDisposedException("TBufferedTransport");
}
#region " IDisposable Support "
protected bool _IsDisposed { get; private set; }
// IDisposable
protected override void Dispose(bool disposing)
{
if (!_IsDisposed)
{
if (disposing)
{
if (inputBuffer != null)
inputBuffer.Dispose();
if (outputBuffer != null)
outputBuffer.Dispose();
if (transport != null)
transport.Dispose();
}
}
_IsDisposed = true;
}
#endregion
}
}