| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.server.store.berkeleydb.replication; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import org.apache.log4j.Logger; |
| import org.apache.qpid.AMQStoreException; |
| import org.apache.qpid.server.replication.ReplicationGroupListener; |
| import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter; |
| import org.apache.qpid.server.store.berkeleydb.Committer; |
| import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; |
| import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener; |
| import org.apache.qpid.server.util.DaemonThreadFactory; |
| |
| import com.sleepycat.je.Database; |
| import com.sleepycat.je.DatabaseConfig; |
| import com.sleepycat.je.DatabaseException; |
| import com.sleepycat.je.Durability; |
| import com.sleepycat.je.Environment; |
| import com.sleepycat.je.EnvironmentConfig; |
| import com.sleepycat.je.EnvironmentFailureException; |
| import com.sleepycat.je.Transaction; |
| import com.sleepycat.je.rep.InsufficientLogException; |
| import com.sleepycat.je.rep.InsufficientReplicasException; |
| import com.sleepycat.je.rep.NetworkRestore; |
| import com.sleepycat.je.rep.NetworkRestoreConfig; |
| import com.sleepycat.je.rep.NodeState; |
| import com.sleepycat.je.rep.RepInternal; |
| import com.sleepycat.je.rep.ReplicatedEnvironment; |
| import com.sleepycat.je.rep.ReplicationConfig; |
| import com.sleepycat.je.rep.ReplicationGroup; |
| import com.sleepycat.je.rep.ReplicationMutableConfig; |
| import com.sleepycat.je.rep.ReplicationNode; |
| import com.sleepycat.je.rep.RestartRequiredException; |
| import com.sleepycat.je.rep.StateChangeEvent; |
| import com.sleepycat.je.rep.StateChangeListener; |
| import com.sleepycat.je.rep.util.DbPing; |
| import com.sleepycat.je.rep.util.ReplicationGroupAdmin; |
| import com.sleepycat.je.rep.utilint.ServiceDispatcher.ServiceConnectFailedException; |
| import com.sleepycat.je.rep.vlsn.VLSNRange; |
| import com.sleepycat.je.utilint.PropUtil; |
| import com.sleepycat.je.utilint.VLSN; |
| |
| public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener |
| { |
| public static final String GROUP_CHECK_INTERVAL_PROPERTY_NAME = "qpid.bdb.ha.group_check_interval"; |
| public static final String MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.master_transfer_interval"; |
| public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.db_ping_socket_timeout"; |
| |
| private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacade.class); |
| |
| private static final long DEFAULT_GROUP_CHECK_INTERVAL = 1000l; |
| private static final int DEFAULT_MASTER_TRANSFER_TIMEOUT = 1000 * 60; |
| private static final int DEFAULT_DB_PING_SOCKET_TIMEOUT = 1000; |
| |
| private static final long GROUP_CHECK_INTERVAL = Long.getLong(GROUP_CHECK_INTERVAL_PROPERTY_NAME, DEFAULT_GROUP_CHECK_INTERVAL); |
| private static final int MASTER_TRANSFER_TIMEOUT = Integer.getInteger(MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME, DEFAULT_MASTER_TRANSFER_TIMEOUT); |
| private static final int DB_PING_SOCKET_TIMEOUT = Integer.getInteger(DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, DEFAULT_DB_PING_SOCKET_TIMEOUT); |
| |
| @SuppressWarnings("serial") |
| private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>() |
| {{ |
| /** |
| * Parameter decreased as the 24h default may lead very large log files for most users. |
| */ |
| put(ReplicationConfig.REP_STREAM_TIMEOUT, "1 h"); |
| /** |
| * Parameter increased as the 5 s default may lead to spurious timeouts. |
| */ |
| put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "15 s"); |
| /** |
| * Parameter increased as the 10 s default may lead to spurious timeouts. |
| */ |
| put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "20 s"); |
| /** |
| * Parameter increased as the 10 h default may cause user confusion. |
| */ |
| put(ReplicationConfig.ENV_SETUP_TIMEOUT, "15 min"); |
| /** |
| * Parameter changed from default true so we adopt immediately adopt the new behaviour early. False |
| * is scheduled to become default after JE 5.0.48. |
| */ |
| put(ReplicationConfig.PROTOCOL_OLD_STRING_ENCODING, Boolean.FALSE.toString()); |
| /** |
| * Parameter decreased as a default 5min interval may lead to bigger data losses on Node |
| * with NO_SYN durability in case if such Node crushes. |
| */ |
| put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min"); |
| |
| /** |
| * Timeout to transit into UNKNOWN state if the majority is not available. |
| * By default it is switched off. |
| */ |
| put(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "5 s"); |
| }}); |
| |
| public static final String TYPE = "BDB-HA"; |
| |
| private final ReplicatedEnvironmentConfiguration _configuration; |
| private final Durability _durability; |
| private final Boolean _coalescingSync; |
| private final String _prettyGroupNodeName; |
| private final File _environmentDirectory; |
| |
| private final ExecutorService _environmentJobExecutor; |
| private final ScheduledExecutorService _groupChangeExecutor; |
| private final AtomicReference<State> _state = new AtomicReference<State>(State.OPENING); |
| private final ConcurrentMap<String, DatabaseHolder> _databases = new ConcurrentHashMap<String, DatabaseHolder>(); |
| private final ConcurrentMap<String, RemoteReplicationNode> _remoteReplicationNodes = new ConcurrentHashMap<String, RemoteReplicationNode>(); |
| private final RemoteReplicationNodeFactory _remoteReplicationNodeFactory; |
| private final AtomicReference<ReplicationGroupListener> _replicationGroupListener = new AtomicReference<ReplicationGroupListener>(); |
| private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>(); |
| |
| private volatile ReplicatedEnvironment _environment; |
| private volatile long _joinTime; |
| private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState; |
| |
| public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration, RemoteReplicationNodeFactory remoteReplicationNodeFactory) |
| { |
| _environmentDirectory = new File(configuration.getStorePath()); |
| if (!_environmentDirectory.exists()) |
| { |
| if (!_environmentDirectory.mkdirs()) |
| { |
| throw new IllegalArgumentException("Environment path " + _environmentDirectory + " could not be read or created. " |
| + "Ensure the path is correct and that the permissions are correct."); |
| } |
| } |
| |
| _configuration = configuration; |
| |
| _durability = Durability.parse(_configuration.getDurability()); |
| _coalescingSync = _configuration.isCoalescingSync(); |
| _prettyGroupNodeName = _configuration.getGroupName() + ":" + _configuration.getName(); |
| |
| // we relay on this executor being single-threaded as we need to restart and mutate the environment in one thread |
| _environmentJobExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("Environment-" + _prettyGroupNodeName)); |
| _groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName)); |
| |
| _remoteReplicationNodeFactory = remoteReplicationNodeFactory; |
| _groupChangeExecutor.scheduleWithFixedDelay(new GroupChangeLearner(), 0, GROUP_CHECK_INTERVAL, TimeUnit.MILLISECONDS); |
| _groupChangeExecutor.schedule(new RemoteNodeStateLearner(), _remoteReplicationNodeFactory.getRemoteNodeMonitorInterval(), TimeUnit.MILLISECONDS); |
| |
| // create environment in a separate thread to avoid renaming of the current thread by JE |
| _environment = createEnvironment(true); |
| populateExistingRemoteReplicationNodes(); |
| } |
| |
| @Override |
| public void commit(final Transaction tx) throws AMQStoreException |
| { |
| try |
| { |
| // Using commit() instead of commitNoSync() for the HA store to allow |
| // the HA durability configuration to influence resulting behaviour. |
| tx.commit(); |
| } |
| catch (DatabaseException de) |
| { |
| throw handleDatabaseException("Got DatabaseException on commit, closing environment", de); |
| } |
| } |
| |
| @Override |
| public void close() |
| { |
| if (_state.compareAndSet(State.OPENING, State.CLOSING) || |
| _state.compareAndSet(State.OPEN, State.CLOSING) || |
| _state.compareAndSet(State.RESTARTING, State.CLOSING) ) |
| { |
| try |
| { |
| if (LOGGER.isDebugEnabled()) |
| { |
| LOGGER.debug("Closing replicated environment facade for " + _prettyGroupNodeName); |
| } |
| |
| _environmentJobExecutor.shutdown(); |
| _groupChangeExecutor.shutdown(); |
| closeDatabases(); |
| closeEnvironment(); |
| } |
| finally |
| { |
| _state.compareAndSet(State.CLOSING, State.CLOSED); |
| } |
| } |
| } |
| |
| @Override |
| public AMQStoreException handleDatabaseException(String contextMessage, final DatabaseException dbe) |
| { |
| boolean restart = (dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientReplicasException || dbe instanceof RestartRequiredException); |
| if (restart) |
| { |
| tryToRestartEnvironment(dbe); |
| } |
| return new AMQStoreException(contextMessage, dbe); |
| } |
| |
| private void tryToRestartEnvironment(final DatabaseException dbe) |
| { |
| if (_state.compareAndSet(State.OPEN, State.RESTARTING)) |
| { |
| if (dbe != null && LOGGER.isDebugEnabled()) |
| { |
| LOGGER.debug("Environment restarting due to exception " + dbe.getMessage(), dbe); |
| } |
| |
| _environmentJobExecutor.execute(new Runnable() |
| { |
| @Override |
| public void run() |
| { |
| try |
| { |
| restartEnvironment(); |
| } |
| catch (Exception e) |
| { |
| LOGGER.error("Exception on environment restart", e); |
| } |
| } |
| }); |
| |
| } |
| else |
| { |
| LOGGER.info("Cannot restart environment because of facade state: " + _state.get()); |
| } |
| } |
| |
| @Override |
| public void openDatabases(DatabaseConfig dbConfig, String... databaseNames) |
| { |
| if (_state.get() != State.OPEN) |
| { |
| throw new IllegalStateException("Environment facade is not in opened state"); |
| } |
| |
| if (!_environment.isValid()) |
| { |
| throw new IllegalStateException("Environment is not valid"); |
| } |
| |
| if (_environment.getState() != ReplicatedEnvironment.State.MASTER) |
| { |
| throw new IllegalStateException("Databases can only be opened on Master node"); |
| } |
| |
| for (String databaseName : databaseNames) |
| { |
| _databases.put(databaseName, new DatabaseHolder(dbConfig)); |
| } |
| for (String databaseName : databaseNames) |
| { |
| DatabaseHolder holder = _databases.get(databaseName); |
| openDatabaseInternally(databaseName, holder); |
| } |
| } |
| |
| private void openDatabaseInternally(String databaseName, DatabaseHolder holder) |
| { |
| Database database = _environment.openDatabase(null, databaseName, holder.getConfig()); |
| holder.setDatabase(database); |
| } |
| |
| @Override |
| public Database getOpenDatabase(String name) |
| { |
| if (_state.get() != State.OPEN) |
| { |
| throw new IllegalStateException("Environment facade is not in opened state"); |
| } |
| |
| if (!_environment.isValid()) |
| { |
| throw new IllegalStateException("Environment is not valid"); |
| } |
| DatabaseHolder databaseHolder = _databases.get(name); |
| if (databaseHolder == null) |
| { |
| throw new IllegalArgumentException("Database with name '" + name + "' has never been requested to be opened"); |
| } |
| Database database = databaseHolder.getDatabase(); |
| if (database == null) |
| { |
| throw new IllegalArgumentException("Database with name '" + name + "' has not been opened"); |
| } |
| return database; |
| } |
| |
| @Override |
| public String getStoreLocation() |
| { |
| return _environmentDirectory.getAbsolutePath(); |
| } |
| |
| @Override |
| public void stateChange(final StateChangeEvent stateChangeEvent) |
| { |
| if (LOGGER.isInfoEnabled()) |
| { |
| LOGGER.info("The node '" + _prettyGroupNodeName + "' state is " + stateChangeEvent.getState()); |
| } |
| |
| if (_state.get() != State.CLOSING && _state.get() != State.CLOSED) |
| { |
| _groupChangeExecutor.submit(new Runnable() |
| { |
| @Override |
| public void run() |
| { |
| stateChanged(stateChangeEvent); |
| } |
| }); |
| } |
| else |
| { |
| if (LOGGER.isDebugEnabled()) |
| { |
| LOGGER.debug("Ignoring the state environment change event as the environment facade for node '" + _prettyGroupNodeName |
| + "' is in state " + _state.get()); |
| } |
| } |
| } |
| |
| private void stateChanged(StateChangeEvent stateChangeEvent) |
| { |
| ReplicatedEnvironment.State state = stateChangeEvent.getState(); |
| |
| if (state == ReplicatedEnvironment.State.REPLICA || state == ReplicatedEnvironment.State.MASTER) |
| { |
| if (_state.compareAndSet(State.OPENING, State.OPEN) || _state.compareAndSet(State.RESTARTING, State.OPEN)) |
| { |
| LOGGER.info("The environment facade is in open state for node " + _prettyGroupNodeName); |
| _joinTime = System.currentTimeMillis(); |
| } |
| } |
| |
| if (state == ReplicatedEnvironment.State.MASTER) |
| { |
| reopenDatabases(); |
| } |
| |
| StateChangeListener listener = _stateChangeListener.get(); |
| if (listener != null) |
| { |
| listener.stateChange(stateChangeEvent); |
| } |
| |
| if (_lastKnownEnvironmentState == ReplicatedEnvironment.State.MASTER && state == ReplicatedEnvironment.State.DETACHED && _state.get() == State.OPEN) |
| { |
| tryToRestartEnvironment(null); |
| } |
| _lastKnownEnvironmentState = state; |
| } |
| |
| private void reopenDatabases() |
| { |
| DatabaseConfig pingDbConfig = new DatabaseConfig(); |
| pingDbConfig.setTransactional(true); |
| pingDbConfig.setAllowCreate(true); |
| |
| _databases.putIfAbsent(DatabasePinger.PING_DATABASE_NAME, new DatabaseHolder(pingDbConfig)); |
| |
| for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet()) |
| { |
| openDatabaseInternally(entry.getKey(), entry.getValue()); |
| } |
| } |
| |
| public String getGroupName() |
| { |
| return (String)_configuration.getGroupName(); |
| } |
| |
| public String getNodeName() |
| { |
| return _configuration.getName(); |
| } |
| |
| public String getHostPort() |
| { |
| return (String)_configuration.getHostPort(); |
| } |
| |
| public String getHelperHostPort() |
| { |
| return (String)_configuration.getHelperHostPort(); |
| } |
| |
| public String getDurability() |
| { |
| return _durability.toString(); |
| } |
| |
| public boolean isCoalescingSync() |
| { |
| return _coalescingSync; |
| } |
| |
| public String getNodeState() |
| { |
| if (_state.get() != State.OPEN) |
| { |
| return ReplicatedEnvironment.State.UNKNOWN.name(); |
| } |
| ReplicatedEnvironment.State state = _environment.getState(); |
| return state.toString(); |
| } |
| |
| public void removeNodeFromGroup(final String nodeName) |
| { |
| createReplicationGroupAdmin().removeMember(nodeName); |
| } |
| |
| public boolean isDesignatedPrimary() |
| { |
| if (_state.get() != State.OPEN) |
| { |
| throw new IllegalStateException("Environment facade is not opened"); |
| } |
| return _environment.getRepMutableConfig().getDesignatedPrimary(); |
| } |
| |
| public Future<Void> setDesignatedPrimary(final boolean isPrimary) |
| { |
| if (LOGGER.isInfoEnabled()) |
| { |
| LOGGER.info("Submitting a job to set designated primary on " + _prettyGroupNodeName + " to " + isPrimary); |
| } |
| |
| return _environmentJobExecutor.submit(new Callable<Void>() |
| { |
| @Override |
| public Void call() |
| { |
| setDesignatedPrimaryInternal(isPrimary); |
| return null; |
| } |
| }); |
| } |
| |
| void setDesignatedPrimaryInternal(final boolean isPrimary) |
| { |
| try |
| { |
| final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig(); |
| final ReplicationMutableConfig newConfig = oldConfig.setDesignatedPrimary(isPrimary); |
| _environment.setRepMutableConfig(newConfig); |
| |
| if (LOGGER.isInfoEnabled()) |
| { |
| LOGGER.info("Node " + _prettyGroupNodeName + " successfully set designated primary : " + isPrimary); |
| } |
| } |
| catch (Exception e) |
| { |
| LOGGER.error("Cannot set designated primary to " + isPrimary + " on node " + _prettyGroupNodeName, e); |
| } |
| } |
| |
| int getPriority() |
| { |
| if (_state.get() != State.OPEN) |
| { |
| throw new IllegalStateException("Environment facade is not opened"); |
| } |
| ReplicationMutableConfig repConfig = _environment.getRepMutableConfig(); |
| return repConfig.getNodePriority(); |
| } |
| |
| public Future<Void> setPriority(final int priority) |
| { |
| if (LOGGER.isInfoEnabled()) |
| { |
| LOGGER.info("Submitting a job to set priority on " + _prettyGroupNodeName + " to " + priority); |
| } |
| |
| return _environmentJobExecutor.submit(new Callable<Void>() |
| { |
| @Override |
| public Void call() |
| { |
| setPriorityInternal(priority); |
| return null; |
| } |
| }); |
| } |
| |
| void setPriorityInternal(int priority) |
| { |
| try |
| { |
| final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig(); |
| final ReplicationMutableConfig newConfig = oldConfig.setNodePriority(priority); |
| _environment.setRepMutableConfig(newConfig); |
| |
| if (LOGGER.isDebugEnabled()) |
| { |
| LOGGER.debug("Node " + _prettyGroupNodeName + " priority has been changed to " + priority); |
| } |
| } |
| catch (Exception e) |
| { |
| LOGGER.error("Cannot set priority to " + priority + " on node " + _prettyGroupNodeName, e); |
| } |
| } |
| |
| int getElectableGroupSizeOverride() |
| { |
| if (_state.get() != State.OPEN) |
| { |
| throw new IllegalStateException("Environment facade is not opened"); |
| } |
| ReplicationMutableConfig repConfig = _environment.getRepMutableConfig(); |
| return repConfig.getElectableGroupSizeOverride(); |
| } |
| |
| public Future<Void> setElectableGroupSizeOverride(final int electableGroupOverride) |
| { |
| if (LOGGER.isInfoEnabled()) |
| { |
| LOGGER.info("Submitting a job to set electable group override on " + _prettyGroupNodeName + " to " + electableGroupOverride); |
| } |
| |
| return _environmentJobExecutor.submit(new Callable<Void>() |
| { |
| @Override |
| public Void call() |
| { |
| setElectableGroupSizeOverrideInternal(electableGroupOverride); |
| return null; |
| } |
| }); |
| } |
| |
| void setElectableGroupSizeOverrideInternal(int electableGroupOverride) |
| { |
| try |
| { |
| final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig(); |
| final ReplicationMutableConfig newConfig = oldConfig.setElectableGroupSizeOverride(electableGroupOverride); |
| _environment.setRepMutableConfig(newConfig); |
| |
| if (LOGGER.isDebugEnabled()) |
| { |
| LOGGER.debug("Node " + _prettyGroupNodeName + " electable group size override has been changed to " + electableGroupOverride); |
| } |
| } |
| catch (Exception e) |
| { |
| LOGGER.error("Cannot set electable group size to " + electableGroupOverride + " on node " + _prettyGroupNodeName, e); |
| } |
| } |
| |
| |
| public long getJoinTime() |
| { |
| return _joinTime ; |
| } |
| |
| public long getLastKnownReplicationTransactionId() |
| { |
| if (_state.get() == State.OPEN) |
| { |
| VLSNRange range = RepInternal.getRepImpl(_environment).getVLSNIndex().getRange(); |
| VLSN lastTxnEnd = range.getLastTxnEnd(); |
| return lastTxnEnd.getSequence(); |
| } |
| else |
| { |
| return -1L; |
| } |
| } |
| |
| public void transferMasterToSelfAsynchronously() |
| { |
| final String nodeName = getNodeName(); |
| transferMasterAsynchronously(nodeName); |
| } |
| |
| public void transferMasterAsynchronously(final String nodeName) |
| { |
| _groupChangeExecutor.submit(new Runnable() |
| { |
| @Override |
| public void run() |
| { |
| try |
| { |
| ReplicationGroupAdmin admin = createReplicationGroupAdmin(); |
| String newMaster = admin.transferMaster(Collections.singleton(nodeName), MASTER_TRANSFER_TIMEOUT, TimeUnit.MILLISECONDS, true); |
| if (LOGGER.isDebugEnabled()) |
| { |
| LOGGER.debug("The mastership has been transfered to " + newMaster); |
| } |
| } |
| catch (DatabaseException e) |
| { |
| LOGGER.warn("Exception on transfering the mastership to " + _prettyGroupNodeName |
| + " Master transfer timeout : " + MASTER_TRANSFER_TIMEOUT, e); |
| } |
| } |
| }); |
| } |
| |
| public ReplicatedEnvironment getEnvironment() |
| { |
| return _environment; |
| } |
| |
| public State getFacadeState() |
| { |
| return _state.get(); |
| } |
| |
| public void setReplicationGroupListener(ReplicationGroupListener replicationGroupListener) |
| { |
| if (_replicationGroupListener.compareAndSet(null, replicationGroupListener)) |
| { |
| notifyExistingRemoteReplicationNodes(replicationGroupListener); |
| } |
| else |
| { |
| throw new IllegalStateException("ReplicationGroupListener is already set on " + _prettyGroupNodeName); |
| } |
| } |
| |
| public void setStateChangeListener(StateChangeListener stateChangeListener) |
| { |
| if (_stateChangeListener.compareAndSet(null, stateChangeListener)) |
| { |
| _environment.setStateChangeListener(this); |
| } |
| else |
| { |
| throw new IllegalStateException("StateChangeListener is already set on " + _prettyGroupNodeName); |
| } |
| } |
| |
| private void populateExistingRemoteReplicationNodes() |
| { |
| ReplicationGroup group = _environment.getGroup(); |
| Set<ReplicationNode> nodes = new HashSet<ReplicationNode>(group.getElectableNodes()); |
| String localNodeName = getNodeName(); |
| |
| for (ReplicationNode replicationNode : nodes) |
| { |
| String discoveredNodeName = replicationNode.getName(); |
| if (!discoveredNodeName.equals(localNodeName)) |
| { |
| RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(replicationNode, this); |
| |
| _remoteReplicationNodes.put(replicationNode.getName(), remoteNode); |
| } |
| } |
| } |
| |
| private void notifyExistingRemoteReplicationNodes(ReplicationGroupListener listener) |
| { |
| for (org.apache.qpid.server.model.ReplicationNode value : _remoteReplicationNodes.values()) |
| { |
| listener.onReplicationNodeRecovered(value); |
| } |
| } |
| |
| private ReplicationGroupAdmin createReplicationGroupAdmin() |
| { |
| final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>(); |
| for (RemoteReplicationNode node : _remoteReplicationNodes.values()) |
| { |
| helpers.add(node.getReplicationNode().getSocketAddress()); |
| } |
| |
| //TODO: refactor this into a method on LocalReplicationNode |
| String hostPort = _configuration.getHostPort(); |
| String[] tokens = hostPort.split(":"); |
| helpers.add(new InetSocketAddress(tokens[0], Integer.parseInt(tokens[1]))); |
| |
| return new ReplicationGroupAdmin(_configuration.getGroupName(), helpers); |
| } |
| |
| private void closeEnvironment() |
| { |
| // Clean the log before closing. This makes sure it doesn't contain |
| // redundant data. Closing without doing this means the cleaner may not |
| // get a chance to finish. |
| try |
| { |
| if (_environment.isValid()) |
| { |
| _environment.cleanLog(); |
| } |
| } |
| finally |
| { |
| _environment.close(); |
| _environment = null; |
| } |
| } |
| |
| private void restartEnvironment() |
| { |
| LOGGER.info("Restarting environment"); |
| |
| closeEnvironmentSafely(); |
| |
| _environment = createEnvironment(false); |
| |
| if (_stateChangeListener.get() != null) |
| { |
| _environment.setStateChangeListener(this); |
| } |
| |
| LOGGER.info("Environment is restarted"); |
| } |
| |
| private void closeEnvironmentSafely() |
| { |
| Environment environment = _environment; |
| if (environment != null) |
| { |
| try |
| { |
| if (environment.isValid()) |
| { |
| try |
| { |
| closeDatabases(); |
| } |
| catch(Exception e) |
| { |
| LOGGER.warn("Ignoring an exception whilst closing databases", e); |
| } |
| } |
| environment.close(); |
| } |
| catch (EnvironmentFailureException efe) |
| { |
| LOGGER.warn("Ignoring an exception whilst closing environment", efe); |
| } |
| } |
| } |
| |
| private void closeDatabases() |
| { |
| RuntimeException firstThrownException = null; |
| for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet()) |
| { |
| DatabaseHolder databaseHolder = entry.getValue(); |
| Database database = databaseHolder.getDatabase(); |
| if (database != null) |
| { |
| try |
| { |
| if (LOGGER.isDebugEnabled()) |
| { |
| LOGGER.debug("Closing database " + entry.getKey() + " on " + _prettyGroupNodeName); |
| } |
| |
| database.close(); |
| } |
| catch(RuntimeException e) |
| { |
| LOGGER.error("Failed to close database on " + _prettyGroupNodeName, e); |
| if (firstThrownException == null) |
| { |
| firstThrownException = e; |
| } |
| } |
| finally |
| { |
| databaseHolder.setDatabase(null); |
| } |
| } |
| } |
| if (firstThrownException != null) |
| { |
| throw firstThrownException; |
| } |
| } |
| |
| private ReplicatedEnvironment createEnvironment(boolean createEnvironmentInSeparateThread) |
| { |
| String groupName = _configuration.getGroupName(); |
| String helperHostPort = _configuration.getHelperHostPort(); |
| String hostPort = _configuration.getHostPort(); |
| Map<String, String> environmentParameters = _configuration.getParameters(); |
| Map<String, String> replicationEnvironmentParameters = _configuration.getReplicationParameters(); |
| boolean designatedPrimary = _configuration.isDesignatedPrimary(); |
| int priority = _configuration.getPriority(); |
| int quorumOverride = _configuration.getQuorumOverride(); |
| |
| if (LOGGER.isInfoEnabled()) |
| { |
| LOGGER.info("Creating environment"); |
| LOGGER.info("Environment path " + _environmentDirectory.getAbsolutePath()); |
| LOGGER.info("Group name " + groupName); |
| LOGGER.info("Node name " + _configuration.getName()); |
| LOGGER.info("Node host port " + hostPort); |
| LOGGER.info("Helper host port " + helperHostPort); |
| LOGGER.info("Durability " + _durability); |
| LOGGER.info("Coalescing sync " + _coalescingSync); |
| LOGGER.info("Designated primary (applicable to 2 node case only) " + designatedPrimary); |
| LOGGER.info("Node priority " + priority); |
| LOGGER.info("Quorum override " + quorumOverride); |
| } |
| |
| Map<String, String> replicationEnvironmentSettings = new HashMap<String, String>(REPCONFIG_DEFAULTS); |
| if (replicationEnvironmentParameters != null && !replicationEnvironmentParameters.isEmpty()) |
| { |
| replicationEnvironmentSettings.putAll(replicationEnvironmentParameters); |
| } |
| Map<String, String> environmentSettings = new HashMap<String, String>(EnvironmentFacade.ENVCONFIG_DEFAULTS); |
| if (environmentParameters != null && !environmentParameters.isEmpty()) |
| { |
| environmentSettings.putAll(environmentParameters); |
| } |
| |
| ReplicationConfig replicationConfig = new ReplicationConfig(groupName, _configuration.getName(), hostPort); |
| replicationConfig.setHelperHosts(helperHostPort); |
| replicationConfig.setDesignatedPrimary(designatedPrimary); |
| replicationConfig.setNodePriority(priority); |
| replicationConfig.setElectableGroupSizeOverride(quorumOverride); |
| |
| for (Map.Entry<String, String> configItem : replicationEnvironmentSettings.entrySet()) |
| { |
| if (LOGGER.isInfoEnabled()) |
| { |
| LOGGER.info("Setting ReplicationConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'"); |
| } |
| replicationConfig.setConfigParam(configItem.getKey(), configItem.getValue()); |
| } |
| |
| EnvironmentConfig envConfig = new EnvironmentConfig(); |
| envConfig.setAllowCreate(true); |
| envConfig.setTransactional(true); |
| envConfig.setExceptionListener(new LoggingAsyncExceptionListener()); |
| envConfig.setDurability(_durability); |
| |
| for (Map.Entry<String, String> configItem : environmentSettings.entrySet()) |
| { |
| if (LOGGER.isInfoEnabled()) |
| { |
| LOGGER.info("Setting EnvironmentConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'"); |
| } |
| envConfig.setConfigParam(configItem.getKey(), configItem.getValue()); |
| } |
| |
| if (createEnvironmentInSeparateThread) |
| { |
| return createEnvironmentInSeparateThread(_environmentDirectory, envConfig, replicationConfig); |
| } |
| else |
| { |
| return createEnvironment(_environmentDirectory, envConfig, replicationConfig); |
| } |
| } |
| |
| private ReplicatedEnvironment createEnvironmentInSeparateThread(final File environmentPathFile, final EnvironmentConfig envConfig, |
| final ReplicationConfig replicationConfig) |
| { |
| Future<ReplicatedEnvironment> environmentFuture = _environmentJobExecutor.submit(new Callable<ReplicatedEnvironment>(){ |
| @Override |
| public ReplicatedEnvironment call() throws Exception |
| { |
| String originalThreadName = Thread.currentThread().getName(); |
| try |
| { |
| return createEnvironment(environmentPathFile, envConfig, replicationConfig); |
| } |
| finally |
| { |
| Thread.currentThread().setName(originalThreadName); |
| } |
| }}); |
| |
| long setUpTimeOutMillis = PropUtil.parseDuration(replicationConfig.getConfigParam(ReplicationConfig.ENV_SETUP_TIMEOUT)); |
| try |
| { |
| return environmentFuture.get(setUpTimeOutMillis, TimeUnit.MILLISECONDS); |
| } |
| catch (InterruptedException e) |
| { |
| Thread.currentThread().interrupt(); |
| throw new RuntimeException("Environment creation was interrupted", e); |
| } |
| catch (ExecutionException e) |
| { |
| throw new RuntimeException("Unexpected exception on environment creation", e.getCause()); |
| } |
| catch (TimeoutException e) |
| { |
| throw new RuntimeException("JE environment has not been created in due time"); |
| } |
| } |
| |
| private ReplicatedEnvironment createEnvironment(File environmentPathFile, EnvironmentConfig envConfig, |
| final ReplicationConfig replicationConfig) |
| { |
| ReplicatedEnvironment environment = null; |
| try |
| { |
| environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig); |
| } |
| catch (final InsufficientLogException ile) |
| { |
| LOGGER.info("InsufficientLogException thrown and so full network restore required", ile); |
| NetworkRestore restore = new NetworkRestore(); |
| NetworkRestoreConfig config = new NetworkRestoreConfig(); |
| config.setRetainLogFiles(false); |
| restore.execute(ile, config); |
| environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig); |
| } |
| if (LOGGER.isInfoEnabled()) |
| { |
| LOGGER.info("Environment is created for node " + _prettyGroupNodeName); |
| } |
| return environment; |
| } |
| |
| @Override |
| public Committer createCommitter(String name) |
| { |
| if (_coalescingSync) |
| { |
| return new CoalescingCommiter(name, this); |
| } |
| else |
| { |
| return Committer.IMMEDIATE_FUTURE_COMMITTER; |
| } |
| } |
| |
| public NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, ServiceConnectFailedException |
| { |
| if (repNode == null) |
| { |
| throw new IllegalArgumentException("Node cannot be null"); |
| } |
| return new DbPing(repNode, (String)_configuration.getGroupName(), DB_PING_SOCKET_TIMEOUT).getNodeState(); |
| } |
| |
| // For testing only |
| int getNumberOfElectableGroupMembers() |
| { |
| if (_state.get() != State.OPEN) |
| { |
| throw new IllegalStateException("Environment facade is not opened"); |
| } |
| return _environment.getGroup().getElectableNodes().size(); |
| } |
| |
| private final class GroupChangeLearner implements Runnable |
| { |
| @Override |
| public void run() |
| { |
| if (_state.get() == State.OPEN) |
| { |
| if (LOGGER.isDebugEnabled()) |
| { |
| LOGGER.debug("Checking for changes in the group " + _configuration.getGroupName() + " on node " + _configuration.getName()); |
| } |
| |
| try |
| { |
| detectGroupChangesAndNotify(); |
| } |
| catch(DatabaseException e) |
| { |
| handleDatabaseException("Exception on replication group check", e); |
| } |
| } |
| } |
| |
| private void detectGroupChangesAndNotify() |
| { |
| String groupName = _configuration.getGroupName(); |
| ReplicatedEnvironment env = _environment; |
| ReplicationGroupListener replicationGroupListener = _replicationGroupListener.get(); |
| if (env != null) |
| { |
| ReplicationGroup group = env.getGroup(); |
| Set<ReplicationNode> nodes = new HashSet<ReplicationNode>(group.getElectableNodes()); |
| String localNodeName = getNodeName(); |
| |
| Map<String, org.apache.qpid.server.model.ReplicationNode> removalMap = new HashMap<String, org.apache.qpid.server.model.ReplicationNode>(_remoteReplicationNodes); |
| for (ReplicationNode replicationNode : nodes) |
| { |
| String discoveredNodeName = replicationNode.getName(); |
| if (!discoveredNodeName.equals(localNodeName)) |
| { |
| if (!_remoteReplicationNodes.containsKey(discoveredNodeName)) |
| { |
| if (LOGGER.isDebugEnabled()) |
| { |
| LOGGER.debug("Remote replication node added '" + replicationNode + "' to '" + groupName + "'"); |
| } |
| |
| RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(replicationNode, ReplicatedEnvironmentFacade.this); |
| |
| _remoteReplicationNodes.put(discoveredNodeName, remoteNode); |
| |
| if (replicationGroupListener != null) |
| { |
| replicationGroupListener.onReplicationNodeAddedToGroup(remoteNode); |
| } |
| } |
| else |
| { |
| removalMap.remove(discoveredNodeName); |
| } |
| } |
| } |
| |
| if (!removalMap.isEmpty()) |
| { |
| for (Map.Entry<String, org.apache.qpid.server.model.ReplicationNode> replicationNodeEntry : removalMap.entrySet()) |
| { |
| String replicationNodeName = replicationNodeEntry.getKey(); |
| if (LOGGER.isDebugEnabled()) |
| { |
| LOGGER.debug("Remote replication node removed '" + replicationNodeName + "' from '" + groupName + "'"); |
| } |
| _remoteReplicationNodes.remove(replicationNodeName); |
| if (replicationGroupListener != null) |
| { |
| replicationGroupListener.onReplicationNodeRemovedFromGroup(replicationNodeEntry.getValue()); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| private class RemoteNodeStateLearner implements Callable<Void> |
| { |
| private Map<String, String> _previousGroupState = Collections.emptyMap(); |
| @Override |
| public Void call() |
| { |
| long remoteNodeMonitorInterval = _remoteReplicationNodeFactory.getRemoteNodeMonitorInterval(); |
| try |
| { |
| Set<Future<Void>> futures = new HashSet<Future<Void>>(); |
| for (final RemoteReplicationNode node : _remoteReplicationNodes.values()) |
| { |
| Future<Void> future = _groupChangeExecutor.submit(new Callable<Void>() |
| { |
| @Override |
| public Void call() |
| { |
| node.updateNodeState(); |
| return null; |
| } |
| }); |
| futures.add(future); |
| } |
| |
| for (Future<Void> future : futures) |
| { |
| try |
| { |
| future.get(remoteNodeMonitorInterval, TimeUnit.MILLISECONDS); |
| } |
| catch (InterruptedException e) |
| { |
| Thread.currentThread().interrupt(); |
| } |
| catch (ExecutionException e) |
| { |
| LOGGER.warn("Cannot update node state for group " + _configuration.getGroupName(), e.getCause()); |
| } |
| catch (TimeoutException e) |
| { |
| LOGGER.warn("Timeout whilst updating node state for group " + _configuration.getGroupName()); |
| future.cancel(true); |
| } |
| } |
| |
| if (ReplicatedEnvironment.State.MASTER == _environment.getState()) |
| { |
| Map<String, String> currentGroupState = new HashMap<String, String>(); |
| for (final RemoteReplicationNode node : _remoteReplicationNodes.values()) |
| { |
| currentGroupState.put(node.getName(), (String)node.getAttribute(org.apache.qpid.server.model.ReplicationNode.ROLE)); |
| } |
| boolean stateChanged = !_previousGroupState.equals(currentGroupState); |
| _previousGroupState = currentGroupState; |
| if (stateChanged && State.OPEN == _state.get()) |
| { |
| new DatabasePinger().pingDb(ReplicatedEnvironmentFacade.this); |
| } |
| } |
| } |
| finally |
| { |
| _groupChangeExecutor.schedule(this, remoteNodeMonitorInterval, TimeUnit.MILLISECONDS); |
| } |
| return null; |
| } |
| } |
| |
| public static enum State |
| { |
| OPENING, |
| OPEN, |
| RESTARTING, |
| CLOSING, |
| CLOSED |
| } |
| |
| private static class DatabaseHolder |
| { |
| private final DatabaseConfig _config; |
| private Database _database; |
| |
| public DatabaseHolder(DatabaseConfig config) |
| { |
| _config = config; |
| } |
| |
| public Database getDatabase() |
| { |
| return _database; |
| } |
| |
| public void setDatabase(Database database) |
| { |
| _database = database; |
| } |
| |
| public DatabaseConfig getConfig() |
| { |
| return _config; |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "DatabaseHolder [_config=" + _config + ", _database=" + _database + "]"; |
| } |
| |
| } |
| |
| } |