blob: 2dde3c44042587d26bf64505e4bf7ac4c192078a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.IO;
using Apache.Qpid.Proton.Buffer;
using Apache.Qpid.Proton.Client.Transport;
using Apache.Qpid.Proton.Engine;
using Apache.Qpid.Proton.Engine.Exceptions;
using Apache.Qpid.Proton.Logging;
namespace Apache.Qpid.Proton.Client.Implementation
{
/// <summary>
/// Creates a proxy layer between a transport instance and an engine instance which
/// isolates concerns of mapping the events from a transport to a given engine instance
/// and prevents possible misdirected event routing during reconnects etc where the
/// active transport and engine will switch.
/// </summary>
public sealed class ClientTransportProxy
{
private static readonly IProtonLogger LOG = ProtonLoggerFactory.GetLogger<ClientTransportProxy>();
private static readonly string EngineTransportProxyBindingKey = "Transport-Proxy-Key";
private IEngine engine;
private ITransport transport;
public ClientTransportProxy(IEngine engine, ITransport transport)
{
this.engine = engine;
this.engine.Connection.Attachments.Set(EngineTransportProxyBindingKey, this);
this.transport = transport;
this.transport.TransportConnectedHandler(HandleTransportConnected);
this.transport.TransportConnectFailedHandler(HandleTransportConnectFailed);
this.transport.TransportDisconnectedHandler(HandleTransportDisconnected);
this.transport.TransportReadHandler(HandleTransportRead);
}
public ITransport Transport => transport;
public IEngine Engine => engine;
private void HandleTransportConnected(ITransport transport)
{
// Trigger the AMQP header and Open performative exchange on connect
engine.Start().Open();
}
private void HandleTransportConnectFailed(ITransport transport, Exception error)
{
if (!engine.IsShutdown)
{
LOG.Debug("Transport reports connect attempt failed: {0}", transport);
engine.EngineFailed(
new IOException(string.Format("Connection to remote {0} failed.", transport.EndPoint)));
}
}
private void HandleTransportDisconnected(ITransport transport)
{
if (!engine.IsShutdown)
{
LOG.Debug("Transport reports connection dropped: {0}", transport);
engine.EngineFailed(
new IOException(string.Format("Connection to remote {0} dropped.", transport.EndPoint)));
}
}
private void HandleTransportRead(ITransport transport, IProtonBuffer buffer)
{
try
{
do
{
engine.Ingest(buffer);
}
while (buffer.IsReadable && engine.IsWritable);
// TODO - How do we handle case of not all data read ?
}
catch (EngineStateException e)
{
LOG.Warn("Caught problem during incoming data processing: {0}", e.Message, e);
engine.EngineFailed(ClientExceptionSupport.CreateOrPassthroughFatal(e));
}
}
}
}