blob: cd0ac279c1f5cd09e89643e3dd13d2a5a8ab23a0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed.internal.membership.gms.messenger;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.SocketException;
import org.jgroups.Address;
import org.jgroups.Message;
import org.jgroups.protocols.UDP;
import org.jgroups.util.AsciiString;
import org.jgroups.util.DefaultThreadFactory;
import org.jgroups.util.LazyThreadFactory;
public class Transport extends UDP {
/**
* This is the initial part of the name of all JGroups threads that deliver messages
*/
public static final String THREAD_POOL_NAME_PREFIX = "Geode UDP";
private JGroupsMessenger messenger;
public void setMessenger(JGroupsMessenger m) {
messenger = m;
}
/*
* (non-Javadoc) copied from JGroups to perform Geode-specific error handling when there is a
* network partition
*
* @see org.jgroups.protocols.TP#_send(org.jgroups.Message, org.jgroups.Address)
*/
@Override
protected void _send(Message msg, Address dest) {
try {
send(msg, dest);
} catch (InterruptedIOException iex) {
} catch (InterruptedException interruptedEx) {
Thread.currentThread().interrupt(); // let someone else handle the interrupt
} catch (SocketException e) {
if (!this.sock.isClosed() && !stack.getChannel().isClosed()) {
log.error("Exception caught while sending message", e);
}
} catch (IOException e) {
if (messenger != null
/* && e.getMessage().contains("Operation not permitted") */) { // this is the english Oracle
// JDK exception condition we
// really want to catch
messenger.handleJGroupsIOException(e, dest);
}
} catch (Throwable e) {
log.error("Exception caught while sending message", e);
}
}
/*
* (non-Javadoc) copied from JGroups to perform Geode-specific error handling when there is a
* network partition
*/
@Override
protected void doSend(AsciiString cluster_name, byte[] buf, int offset, int length, Address dest)
throws Exception {
try {
super.doSend(cluster_name, buf, offset, length, dest);
} catch (SocketException sock_ex) {
if (!this.sock.isClosed() && !stack.getChannel().isClosed()) {
log.error("Exception caught while sending message", sock_ex);
}
} catch (IOException e) {
if (messenger != null
/* && e.getMessage().contains("Operation not permitted") */) { // this is the english Oracle
// JDK exception condition we
// really want to catch
messenger.handleJGroupsIOException(e, dest);
}
} catch (Throwable e) {
log.error("Exception caught while sending message", e);
}
}
/*
* (non-Javadoc) JGroups does not currently (3.6.6) allow you to specify that threads should be
* daemon, so we override the init() method here and create the factories before initializing UDP
*
* @see org.jgroups.protocols.UDP#init()
*/
@Override
public void init() throws Exception {
global_thread_factory = new DefaultThreadFactory("Geode ", true);
timer_thread_factory = new LazyThreadFactory(THREAD_POOL_NAME_PREFIX + " Timer", true, true);
default_thread_factory =
new DefaultThreadFactory(THREAD_POOL_NAME_PREFIX + " Incoming", true, true);
oob_thread_factory = new DefaultThreadFactory(THREAD_POOL_NAME_PREFIX + " OOB", true, true);
internal_thread_factory =
new DefaultThreadFactory(THREAD_POOL_NAME_PREFIX + " INT", true, true);
super.init();
}
/*
* (non-Javadoc)
*
* @see org.jgroups.protocols.UDP#stop() JGroups is not terminating its timer. I contacted the
* jgroups-users email list about this.
*/
@Override
public void stop() {
if (!getTimer().isShutdown()) {
getTimer().stop();
}
super.stop();
}
// overridden to implement AvailablePort response
@Override
public void receive(Address sender, byte[] data, int offset, int length) {
if (data == null || length <= 0) { // GEODE-1596 - check for empty messages
return;
}
// drop message from self; it has already been looped back up
// (https://issues.jboss.org/browse/JGRP-1765)
if (local_physical_addr != null && local_physical_addr.equals(sender))
return;
if (length - offset == 4 && data[offset] == 'p' && data[offset + 1] == 'i'
&& data[offset + 2] == 'n' && data[offset + 3] == 'g') {
// AvailablePort check
data[offset + 1] = 'o';
try {
sendToSingleMember(sender, data, offset, length);
} catch (Exception e) {
log.fatal("Unable to respond to available-port check", e);
}
return;
}
super.receive(sender, data, offset, length);
}
}