blob: 173c8a23c5ff92f84db1cebcb128af6bf402423f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading;
namespace Avro.ipc
{
public class SocketServer
{
public static ManualResetEvent allDone = new ManualResetEvent(false);
private readonly string hostName;
private readonly int port;
private Responder responder;
private bool cancellationRequested;
private Socket channel;
private List<Socket> sockets = new List<Socket>();
private Thread serverThread;
public SocketServer(string hostName, int port, Responder responder = null)
{
if (hostName == null) throw new ArgumentNullException("hostName");
if (port < 0) throw new ArgumentOutOfRangeException("port");
this.responder = responder;
this.hostName = hostName;
this.port = port;
}
public bool IsBound
{
get { return channel.IsBound; }
}
public int Port
{
get { return ((IPEndPoint) channel.LocalEndPoint).Port; }
}
public void SetResponder(Responder responder)
{
this.responder = responder;
}
public void Start()
{
channel = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
serverThread = new Thread(Run);
serverThread.Start();
while (!IsBound)
{
Thread.Sleep(10);
}
}
public void Stop()
{
cancellationRequested = true;
while (serverThread.IsAlive)
{
Thread.Sleep(10);
}
}
private void Run()
{
IPHostEntry host = Dns.GetHostEntry(hostName);
IPAddress ipAddress =
host.AddressList.FirstOrDefault(x => x.AddressFamily == AddressFamily.InterNetwork);
if (ipAddress == null)
throw new InvalidDataException(
string.Format("There is not IP Address with the hostname {0} and AddressFamily InterNetwork",
hostName));
var localEndPoint = new IPEndPoint(ipAddress, port);
channel.Bind(localEndPoint);
channel.Listen(100);
var results = new List<IAsyncResult>();
while (true)
{
// Set the event to nonsignaled state.
allDone.Reset();
// Start an asynchronous socket to listen for connections.
IAsyncResult t = channel.BeginAccept(AcceptCallback, channel);
results.Add(t);
// Wait until a connection is made before continuing.
while (!allDone.WaitOne(1000))
{
if (cancellationRequested)
{
try
{
channel.Close();
}
catch
{
}
try
{
CloseSockets();
}
catch
{
}
return;
}
}
}
}
private void CloseSockets()
{
lock (this)
{
try
{
foreach (var socket in sockets)
{
var myOpts = new LingerOption(true, 1);
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.DontLinger, true);
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Linger, myOpts);
socket.SendTimeout = 1;
socket.ReceiveTimeout = 1;
socket.Shutdown(SocketShutdown.Both);
socket.Disconnect(false);
}
sockets = new List<Socket>();
}
catch (Exception)
{
}
}
}
public void AddSocket(Socket socket)
{
lock (this)
{
sockets.Add(socket);
}
}
public void RemoveSocket(Socket socket)
{
lock (this)
{
sockets.Remove(socket);
}
}
private void AcceptCallback(IAsyncResult ar)
{
// Signal the main thread to continue.
allDone.Set();
// Get the socket that handles the client request.
var listener = (Socket) ar.AsyncState;
if (cancellationRequested)
{
return;
}
Socket socket = listener.EndAccept(ar);
AddSocket(socket);
// Create the state object.
var xc = new SocketTransceiver(socket);
while (true)
{
try
{
IList<MemoryStream> request = xc.ReadBuffers();
IList<MemoryStream> response = responder.Respond(request, xc);
xc.WriteBuffers(response);
}
catch (ObjectDisposedException)
{
break;
}
catch (SocketException)
{
break;
}
catch (AvroRuntimeException)
{
break;
}
catch (Exception)
{
break;
}
}
try
{
xc.Disconnect();
}
catch (Exception) { }
RemoveSocket(socket);
}
}
}