blob: 68097071fad4cf531286cca26e6468c6ecd8f31c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.network.scalecube;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.membership.MembershipEvent;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.AbstractTopologyService;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.TopologyEventHandler;
import org.apache.ignite.network.TopologyService;
/**
* Implementation of {@link TopologyService} based on ScaleCube.
*/
final class ScaleCubeTopologyService extends AbstractTopologyService {
/** Logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(ScaleCubeTopologyService.class);
/** Local member node. */
private ClusterNode localMember;
/** Topology members. */
private final ConcurrentMap<NetworkAddress, ClusterNode> members = new ConcurrentHashMap<>();
/**
* Sets the ScaleCube's local {@link Member}.
*
* @param member Local member.
*/
void setLocalMember(Member member) {
localMember = fromMember(member);
// emit an artificial event as if the local member has joined the topology (ScaleCube doesn't do that)
onMembershipEvent(MembershipEvent.createAdded(member, null, System.currentTimeMillis()));
}
/**
* Delegates the received topology event to the registered event handlers.
*
* @param event Membership event.
*/
void onMembershipEvent(MembershipEvent event) {
ClusterNode member = fromMember(event.member());
if (event.isAdded()) {
members.put(member.address(), member);
LOG.info("Node joined: " + member);
fireAppearedEvent(member);
}
else if (event.isRemoved()) {
members.compute(member.address(), // Ignore stale remove event.
(k, v) -> v.id().equals(member.id()) ? null : v);
LOG.info("Node left: " + member);
fireDisappearedEvent(member);
}
StringBuilder snapshotMsg = new StringBuilder("Topology snapshot [nodes=").append(members.size()).append("]\n");
for (ClusterNode node : members.values()) {
snapshotMsg.append(" ^-- ").append(node).append('\n');
}
LOG.info(snapshotMsg.toString().trim());
}
/**
* Fire a cluster member appearance event.
*
* @param member Appeared cluster member.
*/
private void fireAppearedEvent(ClusterNode member) {
for (TopologyEventHandler handler : getEventHandlers())
handler.onAppeared(member);
}
/**
* Fire a cluster member disappearance event.
*
* @param member Disappeared cluster member.
*/
private void fireDisappearedEvent(ClusterNode member) {
for (TopologyEventHandler handler : getEventHandlers())
handler.onDisappeared(member);
}
/** {@inheritDoc} */
@Override public ClusterNode localMember() {
assert localMember != null : "Cluster has not been started";
return localMember;
}
/** {@inheritDoc} */
@Override public Collection<ClusterNode> allMembers() {
return Collections.unmodifiableCollection(members.values());
}
/** {@inheritDoc} */
@Override public ClusterNode getByAddress(NetworkAddress addr) {
return members.get(addr);
}
/**
* Converts the given {@link Member} to a {@link ClusterNode}.
*
* @param member ScaleCube's cluster member.
* @return Cluster node.
*/
private static ClusterNode fromMember(Member member) {
var addr = new NetworkAddress(member.address().host(), member.address().port());
return new ClusterNode(member.id(), member.alias(), addr);
}
}