blob: 32284e70d3a2b2034bd4d75aaba454855a1f0bf6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hama.bsp.message;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map.Entry;
import java.util.Queue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
import org.apache.hama.bsp.message.compress.BSPMessageCompressorFactory;
import org.apache.hama.bsp.message.queue.MemoryQueue;
import org.apache.hama.bsp.message.queue.MessageQueue;
import org.apache.hama.bsp.message.queue.SingleLockQueue;
import org.apache.hama.bsp.message.queue.SynchronizedQueue;
import org.apache.hama.util.ReflectionUtils;
/**
* Abstract baseclass that should contain all information and services needed
* for the concrete RPC subclasses. For example it manages how the queues are
* managed and it maintains a cache for socket addresses.
*/
public abstract class AbstractMessageManager<M extends Writable> implements
MessageManager<M>, Configurable {
protected static final Log LOG = LogFactory
.getLog(AbstractMessageManager.class);
// conf is injected via reflection of the factory
protected Configuration conf;
protected OutgoingMessageManager<M> outgoingMessageManager;
protected MessageQueue<M> localQueue;
// this must be a synchronized implementation: this is accessed per RPC
protected SynchronizedQueue<M> localQueueForNextIteration;
// this peer object is just used for counter incrementation
protected BSPPeer<?, ?, ?, ?, M> peer;
// the task attempt id
protected TaskAttemptID attemptId;
// to maximum cached connections in the concrete message manager
protected int maxCachedConnections = 100;
// List of listeners for all the sent messages
protected Queue<MessageEventListener<M>> messageListenerQueue;
protected BSPMessageCompressor<M> compressor;
/*
* (non-Javadoc)
* @see org.apache.hama.bsp.message.MessageManager#init(org.apache.hama.bsp.
* TaskAttemptID, org.apache.hama.bsp.BSPPeer,
* org.apache.hadoop.conf.Configuration, java.net.InetSocketAddress)
*/
@Override
public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
HamaConfiguration conf, InetSocketAddress peerAddress) {
this.messageListenerQueue = new LinkedList<MessageEventListener<M>>();
this.attemptId = attemptId;
this.peer = peer;
this.conf = conf;
this.localQueue = getReceiverQueue();
this.localQueueForNextIteration = getSynchronizedReceiverQueue();
this.maxCachedConnections = conf.getInt(MAX_CACHED_CONNECTIONS_KEY, 100);
this.compressor = new BSPMessageCompressorFactory<M>().getCompressor(conf);
this.outgoingMessageManager = getOutgoingMessageManager();
this.outgoingMessageManager.init(conf);
}
/*
* (non-Javadoc)
* @see org.apache.hama.bsp.message.MessageManager#close()
*/
@Override
public void close() {
try {
outgoingMessageManager.clear();
localQueue.close();
} finally {
notifyClose();
}
}
/*
* (non-Javadoc)
* @see org.apache.hama.bsp.message.MessageManager#getCurrentMessage()
*/
@Override
public final M getCurrentMessage() throws IOException {
return localQueue.poll();
}
/*
* (non-Javadoc)
* @see org.apache.hama.bsp.message.MessageManager#getNumCurrentMessages()
*/
@Override
public final int getNumCurrentMessages() {
return localQueue.size();
}
public void clearIncomingMessages() {
localQueue.clear();
}
/*
* (non-Javadoc)
* @see org.apache.hama.bsp.message.MessageManager#clearOutgoingQueues()
*/
@Override
public final void clearOutgoingMessages() {
outgoingMessageManager.clear();
if (conf.getBoolean(MessageQueue.PERSISTENT_QUEUE, false)
&& localQueue.size() > 0) {
// To reduce the number of element additions
if (localQueue.size() > localQueueForNextIteration.size()) {
localQueue.addAll(localQueueForNextIteration);
} else {
localQueueForNextIteration.addAll(localQueue);
localQueue = localQueueForNextIteration.getMessageQueue();
}
} else {
if (localQueue != null) {
localQueue.close();
}
localQueue = localQueueForNextIteration.getMessageQueue();
}
localQueue.prepareRead();
localQueueForNextIteration = getSynchronizedReceiverQueue();
notifyInit();
}
/*
* (non-Javadoc)
* @see org.apache.hama.bsp.message.MessageManager#send(java.lang.String,
* org.apache.hadoop.io.Writable)
*/
@Override
public void send(String peerName, M msg) throws IOException {
outgoingMessageManager.addMessage(peerName, msg);
peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
notifySentMessage(peerName, msg);
}
/*
* (non-Javadoc)
* @see org.apache.hama.bsp.message.MessageManager#getMessageIterator()
*/
@Override
public final Iterator<Entry<InetSocketAddress, BSPMessageBundle<M>>> getOutgoingBundles() {
return this.outgoingMessageManager.getBundleIterator();
}
protected OutgoingMessageManager<M> getOutgoingMessageManager() {
@SuppressWarnings("unchecked")
OutgoingMessageManager<M> messageManager = ReflectionUtils.newInstance(conf
.getClass(MessageManager.OUTGOING_MESSAGE_MANAGER_CLASS,
OutgoingPOJOMessageBundle.class, OutgoingMessageManager.class));
return messageManager;
}
/**
* Returns a new queue implementation based on what was configured. If nothing
* has been configured for "hama.messenger.queue.class" then the
* {@link MemoryQueue} is used. If you have scalability issues, then better
* use {@link DiskQueue}.
*
* @return a <b>new</b> queue implementation.
*/
protected MessageQueue<M> getReceiverQueue() {
@SuppressWarnings("unchecked")
MessageQueue<M> queue = ReflectionUtils.newInstance(conf.getClass(
MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class,
MessageQueue.class));
queue.init(conf, attemptId);
return queue;
}
protected SynchronizedQueue<M> getSynchronizedReceiverQueue() {
return SingleLockQueue.synchronize(getReceiverQueue());
}
@Override
public final Configuration getConf() {
return conf;
}
@Override
public final void setConf(Configuration conf) {
this.conf = conf;
}
private void notifySentMessage(String peerName, M message) {
for (MessageEventListener<M> aMessageListenerQueue : this.messageListenerQueue) {
aMessageListenerQueue.onMessageSent(peerName, message);
}
}
private void notifyReceivedMessage(M message) throws IOException {
for (MessageEventListener<M> aMessageListenerQueue : this.messageListenerQueue) {
aMessageListenerQueue.onMessageReceived(message);
}
}
private void notifyReceivedMessage(BSPMessageBundle<M> bundle)
throws IOException {
for (MessageEventListener<M> aMessageListenerQueue : this.messageListenerQueue) {
aMessageListenerQueue.onBundleReceived(bundle);
}
}
private void notifyInit() {
for (MessageEventListener<M> aMessageListenerQueue : this.messageListenerQueue) {
aMessageListenerQueue.onInitialized();
}
}
private void notifyClose() {
for (MessageEventListener<M> aMessageListenerQueue : this.messageListenerQueue) {
aMessageListenerQueue.onClose();
}
}
@Override
public void registerListener(MessageEventListener<M> listener)
throws IOException {
if (listener != null)
this.messageListenerQueue.add(listener);
}
@Override
public void loopBackBundle(BSPMessageBundle<M> bundle) throws IOException {
peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
bundle.size());
this.localQueueForNextIteration.addBundle(bundle);
notifyReceivedMessage(bundle);
}
@SuppressWarnings("unchecked")
@Override
public void loopBackMessage(Writable message) throws IOException {
this.localQueueForNextIteration.add((M) message);
notifyReceivedMessage((M) message);
}
}