blob: 293b3d92d360505658ea6cb82d52a72f44f3302b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.network.scalecube;
import static io.scalecube.cluster.membership.MembershipEvent.createAdded;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import io.scalecube.cluster.ClusterConfig;
import io.scalecube.cluster.ClusterImpl;
import io.scalecube.cluster.ClusterMessageHandler;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.metadata.MetadataCodec;
import io.scalecube.net.Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.AbstractClusterService;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.DefaultMessagingService;
import org.apache.ignite.internal.network.NettyBootstrapFactory;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.NodeFinder;
import org.apache.ignite.internal.network.NodeFinderFactory;
import org.apache.ignite.internal.network.configuration.ClusterMembershipView;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import org.apache.ignite.internal.network.configuration.NetworkView;
import org.apache.ignite.internal.network.configuration.ScaleCubeView;
import org.apache.ignite.internal.network.netty.ConnectionManager;
import org.apache.ignite.internal.network.recovery.StaleIds;
import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
import org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.internal.network.serialization.SerializationService;
import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
import org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
import org.apache.ignite.internal.worker.CriticalWorkerRegistry;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NodeMetadata;
import org.apache.ignite.network.TopologyEventHandler;
/**
* Cluster service factory that uses ScaleCube for messaging and topology services.
*/
public class ScaleCubeClusterServiceFactory {
/** Logger. */
private static final IgniteLogger LOG = Loggers.forClass(ScaleCubeClusterServiceFactory.class);
/** Metadata codec. */
private static final MetadataCodec METADATA_CODEC = MetadataCodec.INSTANCE;
/**
* Creates a new {@link ClusterService} using the provided context. The created network will not be in the "started" state.
*
* @param consistentId Consistent ID (aka name) of the local node associated with the service to create.
* @param networkConfiguration Network configuration.
* @param nettyBootstrapFactory Bootstrap factory.
* @param serializationRegistry Registry used for serialization.
* @param staleIds Used to update/detect whether a node has left the physical topology.
* @param criticalWorkerRegistry Used to register critical threads managed by the new service and its components.
* @param failureProcessor Failure processor that is used to handle critical errors.
* @return New cluster service.
*/
public ClusterService createClusterService(
String consistentId,
NetworkConfiguration networkConfiguration,
NettyBootstrapFactory nettyBootstrapFactory,
MessageSerializationRegistry serializationRegistry,
StaleIds staleIds,
CriticalWorkerRegistry criticalWorkerRegistry,
FailureProcessor failureProcessor
) {
var topologyService = new ScaleCubeTopologyService();
// Adding this handler as the first handler to make sure that StaleIds is at least up-to-date as any
// other component that watches topology events.
topologyService.addEventHandler(new TopologyEventHandler() {
@Override
public void onDisappeared(ClusterNode member) {
staleIds.markAsStale(member.id());
}
});
var messageFactory = new NetworkMessagesFactory();
UserObjectSerializationContext userObjectSerialization = createUserObjectSerializationContext();
var messagingService = new DefaultMessagingService(
consistentId,
messageFactory,
topologyService,
staleIds,
userObjectSerialization.descriptorRegistry(),
userObjectSerialization.marshaller(),
criticalWorkerRegistry
);
return new AbstractClusterService(consistentId, topologyService, messagingService, serializationRegistry) {
private volatile ClusterImpl cluster;
private volatile ConnectionManager connectionMgr;
private volatile CompletableFuture<Void> shutdownFuture;
@Override
public CompletableFuture<Void> startAsync() {
var serializationService = new SerializationService(serializationRegistry, userObjectSerialization);
UUID launchId = UUID.randomUUID();
NetworkView configView = networkConfiguration.value();
ConnectionManager connectionMgr = new ConnectionManager(
configView,
serializationService,
consistentId,
nettyBootstrapFactory,
staleIds,
failureProcessor
);
this.connectionMgr = connectionMgr;
connectionMgr.start();
messagingService.start();
Address scalecubeLocalAddress = prepareAddress(connectionMgr.localAddress());
topologyService.addEventHandler(new TopologyEventHandler() {
@Override
public void onDisappeared(ClusterNode member) {
connectionMgr.handleNodeLeft(member.id());
}
});
var transport = new ScaleCubeDirectMarshallerTransport(
scalecubeLocalAddress,
messagingService,
topologyService,
messageFactory
);
ClusterConfig clusterConfig = clusterConfig(configView.membership());
NodeFinder finder = NodeFinderFactory.createNodeFinder(configView.nodeFinder());
ClusterImpl cluster = new ClusterImpl(clusterConfig)
.handler(cl -> new ClusterMessageHandler() {
@Override
public void onMembershipEvent(MembershipEvent event) {
topologyService.onMembershipEvent(event);
}
})
.config(opts -> opts
.memberId(launchId.toString())
.memberAlias(consistentId)
.metadataCodec(METADATA_CODEC)
)
.transport(opts -> opts.transportFactory(transportConfig -> transport))
.membership(opts -> opts.seedMembers(parseAddresses(finder.findNodes())));
Member localMember = createLocalMember(scalecubeLocalAddress, launchId, clusterConfig);
ClusterNode localNode = new ClusterNodeImpl(
localMember.id(),
consistentId,
new NetworkAddress(localMember.address().host(), localMember.address().port())
);
connectionMgr.setLocalNode(localNode);
this.shutdownFuture = cluster.onShutdown().toFuture();
// resolve cyclic dependencies
topologyService.setCluster(cluster);
messagingService.setConnectionManager(connectionMgr);
cluster.startAwait();
assert cluster.member().equals(localMember) : "Expected local member from cluster " + cluster.member()
+ " to be equal to the precomputed one " + localMember;
// emit an artificial event as if the local member has joined the topology (ScaleCube doesn't do that)
var localMembershipEvent = createAdded(cluster.member(), null, System.currentTimeMillis());
topologyService.onMembershipEvent(localMembershipEvent);
this.cluster = cluster;
return nullCompletedFuture();
}
@Override
public CompletableFuture<Void> stopAsync() {
ConnectionManager localConnectionMgr = connectionMgr;
if (localConnectionMgr != null) {
localConnectionMgr.initiateStopping();
}
// Local member will be null, if cluster has not been started.
if (cluster != null && cluster.member() != null) {
cluster.shutdown();
try {
shutdownFuture.get(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IgniteInternalException("Interrupted while waiting for the ClusterService to stop", e);
} catch (ExecutionException e) {
throw new IgniteInternalException("Unable to stop the ClusterService", e.getCause());
} catch (TimeoutException e) {
// Failed to leave gracefully.
LOG.warn("Failed to wait for ScaleCube cluster shutdown [reason={}]", e, e.getMessage());
}
}
if (localConnectionMgr != null) {
localConnectionMgr.stop();
}
// Messaging service checks connection manager's status before sending a message, so connection manager should be
// stopped before messaging service
messagingService.stop();
return nullCompletedFuture();
}
@Override
public void beforeNodeStop() {
this.stopAsync().join();
}
@Override
public boolean isStopped() {
return shutdownFuture.isDone();
}
@Override
public void updateMetadata(NodeMetadata metadata) {
cluster.updateMetadata(metadata).subscribe();
topologyService.updateLocalMetadata(metadata);
}
};
}
/**
* Convert {@link InetSocketAddress} to {@link Address}.
*
* @param addr Address.
* @return ScaleCube address.
*/
private static Address prepareAddress(InetSocketAddress addr) {
InetAddress address = addr.getAddress();
String host = address.isAnyLocalAddress() ? Address.getLocalIpAddress().getHostAddress() : address.getHostAddress();
return Address.create(host, addr.getPort());
}
// This is copied from ClusterImpl#creeateLocalMember() and adjusted to always use launchId as member ID.
private Member createLocalMember(Address address, UUID launchId, ClusterConfig config) {
int port = Optional.ofNullable(config.externalPort()).orElse(address.port());
// calculate local member cluster address
Address memberAddress =
Optional.ofNullable(config.externalHost())
.map(host -> Address.create(host, port))
.orElseGet(() -> Address.create(address.host(), port));
return new Member(
launchId.toString(),
config.memberAlias(),
memberAddress,
config.membershipConfig().namespace());
}
/**
* Returns ScaleCube's cluster configuration. Can be overridden in subclasses for finer control of the created {@link ClusterService}
* instances.
*
* @param cfg Membership configuration.
* @return Cluster configuration.
*/
protected ClusterConfig clusterConfig(ClusterMembershipView cfg) {
ScaleCubeView scaleCube = cfg.scaleCube();
return ClusterConfig.defaultLocalConfig()
.membership(opts ->
opts.syncInterval(cfg.membershipSyncInterval())
.suspicionMult(scaleCube.membershipSuspicionMultiplier())
)
.failureDetector(opts ->
opts.pingInterval(cfg.failurePingInterval())
.pingReqMembers(scaleCube.failurePingRequestMembers())
)
.gossip(opts ->
opts.gossipInterval(scaleCube.gossipInterval())
.gossipRepeatMult(scaleCube.gossipRepeatMult())
)
.metadataTimeout(scaleCube.metadataTimeout());
}
/**
* Creates everything that is needed for the user object serialization.
*
* @return User object serialization context.
*/
private UserObjectSerializationContext createUserObjectSerializationContext() {
var userObjectDescriptorRegistry = new ClassDescriptorRegistry();
var userObjectDescriptorFactory = new ClassDescriptorFactory(userObjectDescriptorRegistry);
var userObjectMarshaller = new DefaultUserObjectMarshaller(userObjectDescriptorRegistry, userObjectDescriptorFactory);
return new UserObjectSerializationContext(userObjectDescriptorRegistry, userObjectDescriptorFactory,
userObjectMarshaller);
}
/**
* Converts the given list of {@link NetworkAddress} into a list of ScaleCube's {@link Address}.
*
* @param addresses Network address.
* @return List of ScaleCube's {@link Address}.
*/
private static List<Address> parseAddresses(List<NetworkAddress> addresses) {
return addresses.stream()
.map(addr -> Address.create(addr.host(), addr.port()))
.collect(Collectors.toList());
}
}