blob: dc62a4afcc885ab993a2944137f93395ad43dd00 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.IO;
using System.Net;
namespace Avro.ipc
{
public class HttpTransceiver : Transceiver
{
private byte[] _intBuffer = new byte[4]; //this buffer is used by read/write behind the latch controlled by base class so we are sure there is no race condition
private HttpWebRequest _httpRequest;
private HttpWebRequest _modelRequest;
public override string RemoteName
{
get
{
return _modelRequest.RequestUri.AbsoluteUri;
}
}
public HttpTransceiver(HttpWebRequest modelRequest)
{
_modelRequest = modelRequest;
}
public HttpTransceiver(Uri serviceUri, int timeoutMs)
{
_modelRequest = (HttpWebRequest)WebRequest.Create(serviceUri);
_modelRequest.Method = "POST";
_modelRequest.ContentType = "avro/binary";
_modelRequest.Timeout = timeoutMs;
}
private static int ReadInt(Stream stream, byte[] buffer)
{
stream.Read(buffer, 0, 4);
return IPAddress.NetworkToHostOrder(BitConverter.ToInt32(buffer, 0));
}
public static byte[] ConvertIntToBytes(int value)
{
return BitConverter.GetBytes(IPAddress.HostToNetworkOrder(value));
}
public static int CalculateLength(IList<MemoryStream> buffers)
{
int num = 0;
foreach (MemoryStream memoryStream in (IEnumerable<MemoryStream>)buffers)
{
num += 4;
num += (int)memoryStream.Length;
}
return num + 4;
}
public static IList<MemoryStream> ReadBuffers(Stream inStream, byte[] intBuffer)
{
List<MemoryStream> list = new List<MemoryStream>();
while (true)
{
int length = ReadInt(inStream, intBuffer);
if (length == 0) //end of transmission
break;
byte[] buffer = new byte[length];
int offset = 0;
int count = length;
while (offset < length)
{
int num = inStream.Read(buffer, offset, count);
if (num == 0)
throw new Exception(string.Format("Unexpected end of response binary stream - expected {0} more bytes in current chunk", (object)count));
offset += num;
count -= num;
}
list.Add(new MemoryStream(buffer));
}
return (IList<MemoryStream>)list;
}
public override IList<MemoryStream> ReadBuffers()
{
using (Stream responseStream = this._httpRequest.GetResponse().GetResponseStream())
{
return ReadBuffers(responseStream, _intBuffer);
}
}
protected HttpWebRequest CreateAvroHttpRequest(long contentLength)
{
HttpWebRequest wr = (HttpWebRequest)WebRequest.Create(_modelRequest.RequestUri);
//TODO: what else to copy from model request?
wr.AllowAutoRedirect = _modelRequest.AllowAutoRedirect;
wr.AllowWriteStreamBuffering = _modelRequest.AllowWriteStreamBuffering;
wr.AuthenticationLevel = _modelRequest.AuthenticationLevel;
wr.AutomaticDecompression = _modelRequest.AutomaticDecompression;
wr.CachePolicy = _modelRequest.CachePolicy;
wr.ClientCertificates.AddRange(_modelRequest.ClientCertificates);
wr.ConnectionGroupName = _modelRequest.ConnectionGroupName;
wr.ContinueDelegate = _modelRequest.ContinueDelegate;
wr.CookieContainer = _modelRequest.CookieContainer;
wr.Credentials = _modelRequest.Credentials;
wr.UnsafeAuthenticatedConnectionSharing = _modelRequest.UnsafeAuthenticatedConnectionSharing;
wr.UseDefaultCredentials = _modelRequest.UseDefaultCredentials;
wr.KeepAlive = _modelRequest.KeepAlive;
wr.Expect = _modelRequest.Expect;
//wr.Date = _modelRequest.Date;
//wr.Host = _modelRequest.Host;
wr.UserAgent = _modelRequest.UserAgent;
//wr.Headers = _modelRequest.Headers;
wr.Referer = _modelRequest.Referer;
wr.Pipelined = _modelRequest.Pipelined;
wr.PreAuthenticate = _modelRequest.PreAuthenticate;
wr.ProtocolVersion = _modelRequest.ProtocolVersion;
wr.Proxy = _modelRequest.Proxy;
wr.ReadWriteTimeout = _modelRequest.ReadWriteTimeout;
wr.Timeout = _modelRequest.Timeout;
//the properties which are defined by Avro specification
wr.Method = "POST";
wr.ContentType = "avro/binary";
wr.ContentLength = contentLength;
return wr;
}
public static void WriteBuffers(IList<MemoryStream> buffers, Stream outStream)
{
foreach (MemoryStream memoryStream in buffers)
{
int num = (int)memoryStream.Length;
outStream.Write(ConvertIntToBytes(num), 0, 4);
memoryStream.WriteTo(outStream);
}
outStream.Write(ConvertIntToBytes(0), 0, 4);
outStream.Flush();
}
public override void WriteBuffers(IList<MemoryStream> buffers)
{
_httpRequest = CreateAvroHttpRequest(CalculateLength(buffers));
using (Stream requestStream = _httpRequest.GetRequestStream())
{
WriteBuffers(buffers, requestStream);
}
}
}
}