blob: e1c8f9d5b3201ceb4dfbb36013938c9d692bfbdb [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef _IGNITE_IMPL_THIN_NOTIFICATION_HANDLER
#define _IGNITE_IMPL_THIN_NOTIFICATION_HANDLER
#include <stdint.h>
#include <vector>
#include <ignite/ignite_error.h>
#include <ignite/common/thread_pool.h>
#include <ignite/network/data_buffer.h>
#include <ignite/impl/interop/interop_memory.h>
namespace ignite
{
namespace impl
{
namespace thin
{
/** Notification handler. */
class NotificationHandler
{
public:
/**
* Destructor.
*/
virtual ~NotificationHandler()
{
// No-op.
}
/**
* Handle notification.
*
* @param msg Message.
* @return @c true if processing complete.
*/
virtual void OnNotification(const network::DataBuffer& msg) = 0;
/**
* Disconnected callback.
*
* Called if channel was disconnected.
*/
virtual void OnDisconnected() = 0;
};
/** Shared pointer to notification handler. */
typedef common::concurrent::SharedPointer<NotificationHandler> SP_NotificationHandler;
/**
* Task that handles notification
*/
class HandleNotificationTask : public common::ThreadPoolTask
{
public:
/**
* Constructor.
*
* @param msg Message.
* @param handler Notification handler.
* @param channelId Channel ID.
* @param channelStateHandler Channel state handler.
*/
HandleNotificationTask(
const network::DataBuffer& msg,
const SP_NotificationHandler& handler,
uint64_t channelId,
ChannelStateHandler& channelStateHandler
) :
msg(msg),
handler(handler),
channelId(channelId),
channelStateHandler(channelStateHandler)
{
// No-op.
}
/**
* Destructor.
*/
virtual ~HandleNotificationTask()
{
// No-op.
}
/**
* Execute task.
*/
virtual void Execute()
{
handler.Get()->OnNotification(msg);
}
/**
* Called if error occurred during task processing.
*
* @param err Error.
*/
virtual void OnError(const IgniteError& err)
{
channelStateHandler.OnNotificationHandlingError(channelId, err);
}
private:
/** Message. */
network::DataBuffer msg;
/** Handler. */
SP_NotificationHandler handler;
/** Channel ID. */
uint64_t channelId;
/** Channel state handler. */
ChannelStateHandler& channelStateHandler;
};
/**
* Task that handles connection closing
*/
class DisconnectedTask : public common::ThreadPoolTask
{
public:
/**
* Constructor.
*
* @param handler Notification handler.
*/
explicit DisconnectedTask(const SP_NotificationHandler& handler) :
handler(handler)
{
// No-op.
}
/**
* Destructor.
*/
virtual ~DisconnectedTask()
{
// No-op.
}
/**
* Execute task.
*/
virtual void Execute()
{
handler.Get()->OnDisconnected();
}
/**
* Called if error occurred during task processing.
*
* @param err Error.
*/
virtual void OnError(const IgniteError&)
{
// No-op. Connection already closed so there is not much we can do.
// TODO: Add logging here once it's implemented.
}
private:
/** Handler. */
SP_NotificationHandler handler;
};
/** Notification handler. */
class NotificationHandlerHolder
{
/** Message queue. */
typedef std::vector<network::DataBuffer> MessageQueue;
public:
/**
* Default constructor.
*/
NotificationHandlerHolder() :
disconnected(false),
queue(),
handler()
{
// No-op.
}
/**
* Destructor.
*/
~NotificationHandlerHolder()
{
// No-op.
}
/**
* Process notification.
*
* @param msg Notification message to process.
* @param channelId Channel ID.
* @param channelStateHandler Channel state handler.
* @return Task for dispatching if handler is present and null otherwise.
*/
common::SP_ThreadPoolTask ProcessNotification(const network::DataBuffer& msg,
uint64_t channelId, ChannelStateHandler& channelStateHandler)
{
network::DataBuffer notification(msg.Clone());
if (handler.IsValid())
return common::SP_ThreadPoolTask(
new HandleNotificationTask(notification, handler, channelId, channelStateHandler));
queue.push_back(notification);
return common::SP_ThreadPoolTask();
}
/**
* Process disconnect.
*
* @return Task for dispatching if handler is present and null otherwise.
*/
common::SP_ThreadPoolTask ProcessClosed()
{
disconnected = true;
// Clear the queue as we won't be able to process these notifications after disconnect.
queue.clear();
if (handler.IsValid())
handler.Get()->OnDisconnected();
return common::SP_ThreadPoolTask();
}
/**
* Set handler.
*
* @param handler Notification handler.
*/
void SetHandler(const SP_NotificationHandler& handler0)
{
if (handler.IsValid())
throw IgniteError(IgniteError::IGNITE_ERR_GENERIC,
"Internal error: handler is already set for the notification");
handler = handler0;
// If we are already disconnected, then there is no point in processing notifications.
if (disconnected) {
queue.clear();
handler.Get()->OnDisconnected();
return;
}
for (MessageQueue::iterator it = queue.begin(); it != queue.end(); ++it)
handler.Get()->OnNotification(*it);
queue.clear();
}
private:
/** Disconnected flag. */
bool disconnected;
/** Notification queue. */
MessageQueue queue;
/** Notification handler. */
SP_NotificationHandler handler;
};
}
}
}
#endif //_IGNITE_IMPL_THIN_NOTIFICATION_HANDLER