blob: a108393ae319e25d27b671d651edafda704719e3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.IO;
using Avro.Generic;
using Avro.IO;
using Avro.Specific;
using log4net;
using org.apache.avro.ipc;
namespace Avro.ipc
{
public abstract class Responder
{
private static readonly ILog log = LogManager.GetLogger(typeof (Responder));
private static readonly Schema META = MapSchema.CreateMap(PrimitiveSchema.NewInstance("bytes"));
private static readonly GenericReader<Dictionary<String, object>>
META_READER = new GenericReader<Dictionary<String, object>>(META, META);
private static readonly GenericWriter<IDictionary<String, object>>
META_WRITER = new GenericWriter<IDictionary<String, object>>(META);
private readonly SpecificReader<HandshakeRequest> handshakeReader =
new SpecificReader<HandshakeRequest>(new HandshakeRequest().Schema, new HandshakeRequest().Schema);
private readonly SpecificWriter<HandshakeResponse> handshakeWriter =
new SpecificWriter<HandshakeResponse>(new HandshakeResponse().Schema);
private readonly Protocol local;
private readonly MD5 localHash;
private readonly IDictionary<Schema, Protocol> protocols = new Dictionary<Schema, Protocol>();
private readonly object protocolsLock = new object();
protected Responder(Protocol local)
{
this.local = local;
localHash = new MD5 {Value = local.MD5};
lock (protocolsLock)
{
protocols[localHash.Schema] = local;
}
}
public Protocol Local
{
get { return local; }
}
public abstract Object Respond(Message message, Object request);
public abstract Object ReadRequest(Schema actual, Schema expected, Decoder input);
public abstract void WriteResponse(Schema schema, Object response, Encoder output);
public abstract void WriteError(Schema schema, Object error, Encoder output);
public IList<MemoryStream> Respond(IList<MemoryStream> buffers)
{
return Respond(buffers, null);
}
private Protocol Handshake(Decoder input, Encoder output, Transceiver connection)
{
if (connection != null && connection.IsConnected)
return connection.Remote;
HandshakeRequest request = handshakeReader.Read(null, input);
Protocol remote;
lock (protocolsLock)
{
remote = protocols[request.clientHash.Schema];
if (remote == null && request.clientProtocol != null)
{
remote = Protocol.Parse(request.clientProtocol);
protocols[request.clientHash.Schema] = remote;
}
}
var response = new HandshakeResponse();
if (localHash.Schema.Equals(request.serverHash.Schema))
{
response.match =
remote == null ? HandshakeMatch.NONE : HandshakeMatch.BOTH;
}
else
{
response.match =
remote == null ? HandshakeMatch.NONE : HandshakeMatch.CLIENT;
}
if (response.match != HandshakeMatch.BOTH)
{
response.serverProtocol = local.ToString();
response.serverHash = localHash;
}
handshakeWriter.Write(response, output);
if (connection != null && response.match != HandshakeMatch.NONE)
connection.Remote = remote;
return remote;
}
public IList<MemoryStream> Respond(IList<MemoryStream> buffers,
Transceiver connection)
{
Decoder input = new BinaryDecoder(new ByteBufferInputStream(buffers));
var bbo = new ByteBufferOutputStream();
var output = new BinaryEncoder(bbo);
Exception error = null;
var context = new RpcContext();
List<MemoryStream> handshake = null;
bool wasConnected = connection != null && connection.IsConnected;
try
{
Protocol remote = Handshake(input, output, connection);
output.Flush();
if (remote == null) // handshake failed
return bbo.GetBufferList();
handshake = bbo.GetBufferList();
// read request using remote protocol specification
context.RequestCallMeta = META_READER.Read(null, input);
String messageName = input.ReadString();
if (messageName.Equals("")) // a handshake ping
return handshake;
Message rm = remote.Messages[messageName];
if (rm == null)
throw new AvroRuntimeException("No such remote message: " + messageName);
Message m = Local.Messages[messageName];
if (m == null)
throw new AvroRuntimeException("No message named " + messageName
+ " in " + Local);
Object request = ReadRequest(rm.Request, m.Request, input);
context.Message = rm;
// create response using local protocol specification
if ((m.Oneway.GetValueOrDefault() != rm.Oneway.GetValueOrDefault()) && wasConnected)
throw new AvroRuntimeException("Not both one-way: " + messageName);
Object response = null;
try
{
response = Respond(m, request);
context.Response = response;
}
catch (Exception e)
{
error = e;
context.Error = error;
log.Warn("user error", e);
}
if (m.Oneway.GetValueOrDefault() && wasConnected) // no response data
return null;
output.WriteBoolean(error != null);
if (error == null)
WriteResponse(m.Response, response, output);
else
{
try
{
WriteError(m.SupportedErrors, error, output);
}
catch (Exception)
{
// Presumably no match on the exception, throw the original
throw error;
}
}
}
catch (Exception e)
{
// system error
log.Warn("system error", e);
context.Error = e;
bbo = new ByteBufferOutputStream();
output = new BinaryEncoder(bbo);
output.WriteBoolean(true);
WriteError(errorSchema /*Protocol.SYSTEM_ERRORS*/, e.ToString(), output);
if (null == handshake)
{
handshake = new ByteBufferOutputStream().GetBufferList();
}
}
output.Flush();
List<MemoryStream> payload = bbo.GetBufferList();
// Grab meta-data from plugins
context.ResponsePayload = payload;
META_WRITER.Write(context.ResponseCallMeta, output);
output.Flush();
// Prepend handshake and append payload
bbo.Prepend(handshake);
bbo.Append(payload);
return bbo.GetBufferList();
}
static Schema errorSchema = Schema.Parse("[\"string\"]");
}
}