| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.authentication; |
| |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.GridTopic; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.IgniteInterruptedCheckedException; |
| import org.apache.ignite.internal.IgniteNodeAttributes; |
| import org.apache.ignite.internal.managers.communication.GridIoManager; |
| import org.apache.ignite.internal.managers.communication.GridIoPolicy; |
| import org.apache.ignite.internal.managers.communication.GridMessageListener; |
| import org.apache.ignite.internal.managers.discovery.CustomEventListener; |
| import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; |
| import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; |
| import org.apache.ignite.internal.processors.GridProcessorAdapter; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheUtils; |
| import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener; |
| import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageTree; |
| import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage; |
| import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage; |
| import org.apache.ignite.internal.util.future.GridFutureAdapter; |
| import org.apache.ignite.internal.util.tostring.GridToStringExclude; |
| import org.apache.ignite.internal.util.tostring.GridToStringInclude; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.internal.util.worker.GridWorker; |
| import org.apache.ignite.lang.IgniteFuture; |
| import org.apache.ignite.lang.IgniteFutureCancelledException; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.apache.ignite.spi.IgniteNodeValidationResult; |
| import org.apache.ignite.spi.discovery.DiscoveryDataBag; |
| import org.apache.ignite.spi.discovery.DiscoverySpi; |
| import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; |
| import org.apache.ignite.thread.IgniteThreadPoolExecutor; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; |
| import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; |
| import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; |
| import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.AUTH_PROC; |
| |
| /** |
| * |
| */ |
| public class IgniteAuthenticationProcessor extends GridProcessorAdapter implements MetastorageLifecycleListener { |
| /** Store user prefix. */ |
| private static final String STORE_USER_PREFIX = "user."; |
| |
| /** Discovery event types. */ |
| private static final int[] DISCO_EVT_TYPES = new int[] {EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_NODE_JOINED}; |
| |
| /** User operation finish futures (Operation ID -> future). */ |
| private final ConcurrentHashMap<IgniteUuid, UserOperationFinishFuture> opFinishFuts = new ConcurrentHashMap<>(); |
| |
| /** Futures prepared user map. Authentication message ID -> public future. */ |
| private final ConcurrentMap<IgniteUuid, AuthenticateFuture> authFuts = new ConcurrentHashMap<>(); |
| |
| /** Whan the future is done the node is ready for authentication. */ |
| private final GridFutureAdapter<Void> readyForAuthFut = new GridFutureAdapter<>(); |
| |
| /** Operation mutex. */ |
| private final Object mux = new Object(); |
| |
| /** Active operations. Collects to send on joining node. */ |
| private final Map<IgniteUuid, UserManagementOperation> activeOps = Collections.synchronizedMap(new LinkedHashMap<>()); |
| |
| /** User map. */ |
| private ConcurrentMap<String, User> users; |
| |
| /** Shared context. */ |
| @GridToStringExclude |
| private GridCacheSharedContext<?, ?> sharedCtx; |
| |
| /** Meta storage. */ |
| private ReadWriteMetastorage metastorage; |
| |
| /** Executor. */ |
| private IgniteThreadPoolExecutor exec; |
| |
| /** Coordinator node. */ |
| private ClusterNode crdNode; |
| |
| /** Is authentication enabled. */ |
| private boolean isEnabled; |
| |
| /** Disconnected flag. */ |
| private volatile boolean disconnected; |
| |
| /** Finish message of the current operation. May be resend when coordinator node leave. */ |
| private UserManagementOperationFinishedMessage curOpFinishMsg; |
| |
| /** Initial users map and operations received from coordinator on the node joined to the cluster. */ |
| private InitialUsersData initUsrs; |
| |
| /** I/O message listener. */ |
| private GridMessageListener ioLsnr; |
| |
| /** System discovery message listener. */ |
| private DiscoveryEventListener discoLsnr; |
| |
| /** Node activate future. */ |
| private final GridFutureAdapter<Void> activateFut = new GridFutureAdapter<>(); |
| |
| /** Validate error. */ |
| private String validateErr; |
| |
| /** |
| * @param ctx Kernal context. |
| */ |
| public IgniteAuthenticationProcessor(GridKernalContext ctx) { |
| super(ctx); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void start() throws IgniteCheckedException { |
| super.start(); |
| |
| isEnabled = ctx.config().isAuthenticationEnabled(); |
| |
| if (isEnabled && !GridCacheUtils.isPersistenceEnabled(ctx.config())) { |
| isEnabled = false; |
| |
| throw new IgniteCheckedException("Authentication can be enabled only for cluster with enabled persistence." |
| + " Check the DataRegionConfiguration"); |
| } |
| |
| ctx.internalSubscriptionProcessor().registerMetastorageListener(this); |
| |
| ctx.addNodeAttribute(IgniteNodeAttributes.ATTR_AUTHENTICATION_ENABLED, isEnabled); |
| |
| GridDiscoveryManager discoMgr = ctx.discovery(); |
| |
| GridIoManager ioMgr = ctx.io(); |
| |
| discoMgr.setCustomEventListener(UserProposedMessage.class, new UserProposedListener()); |
| |
| discoMgr.setCustomEventListener(UserAcceptedMessage.class, new UserAcceptedListener()); |
| |
| discoLsnr = (evt, discoCache) -> { |
| if (!isEnabled || ctx.isStopping()) |
| return; |
| |
| switch (evt.type()) { |
| case EVT_NODE_LEFT: |
| case EVT_NODE_FAILED: |
| onNodeLeft(evt.eventNode().id()); |
| break; |
| |
| case EVT_NODE_JOINED: |
| onNodeJoin(evt.eventNode()); |
| break; |
| } |
| }; |
| |
| ctx.event().addDiscoveryEventListener(discoLsnr, DISCO_EVT_TYPES); |
| |
| ioLsnr = (nodeId, msg, plc) -> { |
| if (!isEnabled || ctx.isStopping()) |
| return; |
| |
| if (msg instanceof UserManagementOperationFinishedMessage) |
| onFinishMessage(nodeId, (UserManagementOperationFinishedMessage)msg); |
| else if (msg instanceof UserAuthenticateRequestMessage) |
| onAuthenticateRequestMessage(nodeId, (UserAuthenticateRequestMessage)msg); |
| else if (msg instanceof UserAuthenticateResponseMessage) |
| onAuthenticateResponseMessage((UserAuthenticateResponseMessage)msg); |
| }; |
| |
| ioMgr.addMessageListener(GridTopic.TOPIC_AUTH, ioLsnr); |
| |
| exec = new IgniteThreadPoolExecutor( |
| "auth", |
| ctx.config().getIgniteInstanceName(), |
| 1, |
| 1, |
| 0, |
| new LinkedBlockingQueue<>()); |
| } |
| |
| /** |
| * On cache processor started. |
| */ |
| public void cacheProcessorStarted() { |
| sharedCtx = ctx.cache().context(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void stop(boolean cancel) throws IgniteCheckedException { |
| if (!isEnabled) |
| return; |
| |
| ctx.io().removeMessageListener(GridTopic.TOPIC_AUTH, ioLsnr); |
| |
| ctx.event().removeDiscoveryEventListener(discoLsnr, DISCO_EVT_TYPES); |
| |
| cancelFutures("Node stopped"); |
| |
| if (!cancel) |
| exec.shutdown(); |
| else |
| exec.shutdownNow(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onKernalStop(boolean cancel) { |
| if (!isEnabled) |
| return; |
| |
| synchronized (mux) { |
| cancelFutures("Kernal stopped."); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onKernalStart(boolean active) throws IgniteCheckedException { |
| super.onKernalStart(active); |
| |
| if (validateErr != null) |
| throw new IgniteCheckedException(validateErr); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onDisconnected(IgniteFuture reconnectFut) { |
| if (!isEnabled) |
| return; |
| |
| synchronized (mux) { |
| assert !disconnected; |
| |
| disconnected = true; |
| |
| cancelFutures("Client node was disconnected from topology (operation result is unknown)."); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<?> onReconnected(boolean active) { |
| if (!isEnabled) |
| return null; |
| |
| synchronized (mux) { |
| assert disconnected; |
| |
| disconnected = false; |
| |
| return null; |
| } |
| } |
| |
| /** |
| * Authenticate user. |
| * |
| * @param login User's login. |
| * @param passwd Plain text password. |
| * @return User object on successful authenticate. Otherwise returns {@code null}. |
| * @throws IgniteCheckedException On error. |
| * @throws IgniteAccessControlException On authentication error. |
| */ |
| public AuthorizationContext authenticate(String login, String passwd) throws IgniteCheckedException { |
| checkEnabled(); |
| |
| if (F.isEmpty(login)) |
| throw new IgniteAccessControlException("The user name or password is incorrect [userName=" + login + ']'); |
| |
| if (ctx.clientNode()) { |
| while (true) { |
| AuthenticateFuture fut; |
| |
| synchronized (mux) { |
| ClusterNode rndNode = U.randomServerNode(ctx); |
| |
| fut = new AuthenticateFuture(rndNode.id()); |
| |
| UserAuthenticateRequestMessage msg = new UserAuthenticateRequestMessage(login, passwd); |
| |
| authFuts.put(msg.id(), fut); |
| |
| ctx.io().sendToGridTopic(rndNode, GridTopic.TOPIC_AUTH, msg, GridIoPolicy.SYSTEM_POOL); |
| } |
| |
| fut.get(); |
| |
| if (fut.retry()) |
| continue; |
| |
| return new AuthorizationContext(User.create(login)); |
| } |
| } |
| else |
| return new AuthorizationContext(authenticateOnServer(login, passwd)); |
| } |
| |
| /** |
| * @param login User's login. |
| * @param passwd Password. |
| * @throws UserManagementException On error. |
| */ |
| public static void validate(String login, String passwd) throws UserManagementException { |
| if (F.isEmpty(login)) |
| throw new UserManagementException("User name is empty"); |
| |
| if (F.isEmpty(passwd)) |
| throw new UserManagementException("Password is empty"); |
| |
| if ((STORE_USER_PREFIX + login).getBytes().length > MetastorageTree.MAX_KEY_LEN) |
| throw new UserManagementException("User name is too long. " + |
| "The user name length must be less then 60 bytes in UTF8"); |
| } |
| |
| /** |
| * Adds new user. |
| * |
| * @param login User's login. |
| * @param passwd Plain text password. |
| * @throws IgniteCheckedException On error. |
| */ |
| public void addUser(String login, String passwd) throws IgniteCheckedException { |
| validate(login, passwd); |
| |
| UserManagementOperation op = new UserManagementOperation(User.create(login, passwd), |
| UserManagementOperation.OperationType.ADD); |
| |
| execUserOperation(op).get(); |
| } |
| |
| /** |
| * @param login User name. |
| * @throws IgniteCheckedException On error. |
| */ |
| public void removeUser(String login) throws IgniteCheckedException { |
| UserManagementOperation op = new UserManagementOperation(User.create(login), |
| UserManagementOperation.OperationType.REMOVE); |
| |
| execUserOperation(op).get(); |
| } |
| |
| /** |
| * @param login User name. |
| * @param passwd User password. |
| * @throws IgniteCheckedException On error. |
| */ |
| public void updateUser(String login, String passwd) throws IgniteCheckedException { |
| UserManagementOperation op = new UserManagementOperation(User.create(login, passwd), |
| UserManagementOperation.OperationType.UPDATE); |
| |
| execUserOperation(op).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public void onReadyForRead(ReadOnlyMetastorage metastorage) throws IgniteCheckedException { |
| if (!ctx.clientNode()) { |
| users = new ConcurrentHashMap<>(); |
| |
| Map<String, User> readUsers = (Map<String, User>)metastorage.readForPredicate( |
| (IgnitePredicate<String>)key -> key != null && key.startsWith(STORE_USER_PREFIX)); |
| |
| for (User u : readUsers.values()) |
| users.put(u.name(), u); |
| } |
| else |
| users = null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onReadyForReadWrite(ReadWriteMetastorage metastorage) { |
| if (!ctx.clientNode()) |
| this.metastorage = metastorage; |
| else |
| this.metastorage = null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() { |
| return DiscoveryDataExchangeType.AUTH_PROC; |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public IgniteNodeValidationResult validateNode(ClusterNode node) { |
| Boolean rmtEnabled = node.attribute(IgniteNodeAttributes.ATTR_AUTHENTICATION_ENABLED); |
| |
| if (isEnabled && rmtEnabled == null) { |
| String errMsg = "Failed to add node to topology because user authentication is enabled on cluster and " + |
| "the node doesn't support user authentication [nodeId=" + node.id() + ']'; |
| |
| return new IgniteNodeValidationResult(node.id(), errMsg, errMsg); |
| } |
| |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { |
| // 1. Collect users info only on coordinator |
| // 2. Doesn't collect users info to send on client node due to security reason. |
| if (!isEnabled || !isLocalNodeCoordinator() || dataBag.isJoiningNodeClient()) |
| return; |
| |
| synchronized (mux) { |
| if (!dataBag.commonDataCollectedFor(AUTH_PROC.ordinal())) { |
| InitialUsersData d = new InitialUsersData(users.values(), activeOps.values()); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Collected initial users data: " + d); |
| |
| dataBag.addGridCommonData(AUTH_PROC.ordinal(), d); |
| } |
| } |
| } |
| |
| /** |
| * Checks whether local node is coordinator. Nodes that are leaving or failed |
| * (but are still in topology) are removed from search. |
| * |
| * @return {@code true} if local node is coordinator. |
| */ |
| private boolean isLocalNodeCoordinator() { |
| DiscoverySpi spi = ctx.discovery().getInjectedDiscoverySpi(); |
| |
| if (spi instanceof TcpDiscoverySpi) |
| return ((TcpDiscoverySpi)spi).isLocalNodeCoordinator(); |
| else |
| return F.eq(ctx.localNodeId(), coordinator().id()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { |
| initUsrs = (InitialUsersData)data.commonData(); |
| } |
| |
| /** |
| * @return {@code true} if authentication is enabled, {@code false} if not. |
| */ |
| public boolean enabled() { |
| return isEnabled; |
| } |
| |
| /** |
| * Check cluster state. |
| */ |
| private void checkActivate() { |
| if (!ctx.state().publicApiActiveState(true)) { |
| throw new IgniteException("Can not perform the operation because the cluster is inactive. Note, that " + |
| "the cluster is considered inactive by default if Ignite Persistent Store is used to let all the nodes " + |
| "join the cluster. To activate the cluster call Ignite.active(true)."); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private void checkEnabled() { |
| if (!isEnabled) { |
| throw new IgniteException("Can not perform the operation because the authentication" + |
| " is not enabled for the cluster."); |
| } |
| } |
| |
| /** |
| */ |
| private void addDefaultUser() { |
| assert users != null && users.isEmpty(); |
| |
| User dfltUser = User.defaultUser(); |
| |
| // Put to local map to be ready for authentication. |
| users.put(dfltUser.name(), dfltUser); |
| |
| // Put to MetaStore when it will be ready. |
| exec.execute(new RefreshUsersStorageWorker(new ArrayList<>(Collections.singleton(dfltUser)))); |
| } |
| |
| /** |
| * Authenticate user. |
| * |
| * @param login User's login. |
| * @param passwd Plain text password. |
| * @return User object on successful authenticate. Otherwise returns {@code null}. |
| * @throws IgniteCheckedException On authentication error. |
| */ |
| private User authenticateOnServer(String login, String passwd) throws IgniteCheckedException { |
| assert !ctx.clientNode() : "Must be used on server node"; |
| |
| readyForAuthFut.get(); |
| |
| User usr; |
| |
| usr = users.get(login); |
| |
| if (usr == null) |
| throw new IgniteAccessControlException("The user name or password is incorrect [userName=" + login + ']'); |
| |
| if (usr.authorize(passwd)) |
| return usr; |
| else |
| throw new IgniteAccessControlException("The user name or password is incorrect [userName=" + login + ']'); |
| } |
| |
| /** |
| * @param op User operation. |
| * @return Operation future. |
| * @throws IgniteCheckedException On error. |
| */ |
| private UserOperationFinishFuture execUserOperation(UserManagementOperation op) throws IgniteCheckedException { |
| checkActivate(); |
| checkEnabled(); |
| |
| synchronized (mux) { |
| if (disconnected) { |
| throw new UserManagementException("Failed to initiate user management operation because " |
| + "client node is disconnected."); |
| } |
| |
| AuthorizationContext actx = AuthorizationContext.context(); |
| |
| if (actx == null) |
| throw new IgniteAccessControlException("Operation not allowed: authorized context is empty."); |
| |
| actx.checkUserOperation(op); |
| |
| UserOperationFinishFuture fut = new UserOperationFinishFuture(op.id()); |
| |
| opFinishFuts.put(op.id(), fut); |
| |
| UserProposedMessage msg = new UserProposedMessage(op); |
| |
| ctx.discovery().sendCustomEvent(msg); |
| |
| return fut; |
| } |
| } |
| |
| /** |
| * @param op The operation with users. |
| * @throws IgniteCheckedException On error. |
| */ |
| private void processOperationLocal(UserManagementOperation op) throws IgniteCheckedException { |
| assert op != null && op.user() != null : "Invalid operation: " + op; |
| |
| switch (op.type()) { |
| case ADD: |
| addUserLocal(op); |
| |
| break; |
| |
| case REMOVE: |
| removeUserLocal(op); |
| |
| break; |
| |
| case UPDATE: |
| updateUserLocal(op); |
| |
| break; |
| } |
| } |
| |
| /** |
| * Adds new user locally. |
| * |
| * @param op User operation. |
| * @throws IgniteCheckedException On error. |
| */ |
| private void addUserLocal(final UserManagementOperation op) throws IgniteCheckedException { |
| User usr = op.user(); |
| |
| String userName = usr.name(); |
| |
| if (users.containsKey(userName)) |
| throw new UserManagementException("User already exists [login=" + userName + ']'); |
| |
| metastorage.write(STORE_USER_PREFIX + userName, usr); |
| |
| synchronized (mux) { |
| activeOps.remove(op.id()); |
| |
| users.put(userName, usr); |
| } |
| } |
| |
| /** |
| * Remove user from MetaStorage. |
| * |
| * @param op Operation. |
| * @throws IgniteCheckedException On error. |
| */ |
| private void removeUserLocal(UserManagementOperation op) throws IgniteCheckedException { |
| User usr = op.user(); |
| |
| if (!users.containsKey(usr.name())) |
| throw new UserManagementException("User doesn't exist [userName=" + usr.name() + ']'); |
| |
| metastorage.remove(STORE_USER_PREFIX + usr.name()); |
| |
| synchronized (mux) { |
| activeOps.remove(op.id()); |
| |
| users.remove(usr.name()); |
| } |
| } |
| |
| /** |
| * Remove user from MetaStorage. |
| * |
| * @param op Operation. |
| * @throws IgniteCheckedException On error. |
| */ |
| private void updateUserLocal(UserManagementOperation op) throws IgniteCheckedException { |
| User usr = op.user(); |
| |
| if (!users.containsKey(usr.name())) |
| throw new UserManagementException("User doesn't exist [userName=" + usr.name() + ']'); |
| |
| metastorage.write(STORE_USER_PREFIX + usr.name(), usr); |
| |
| synchronized (mux) { |
| activeOps.remove(op.id()); |
| |
| users.put(usr.name(), usr); |
| } |
| } |
| |
| /** |
| * Get current coordinator node. |
| * |
| * @return Coordinator node. |
| */ |
| private ClusterNode coordinator() { |
| synchronized (mux) { |
| if (crdNode != null) |
| return crdNode; |
| else { |
| ClusterNode res = null; |
| |
| for (ClusterNode node : ctx.discovery().aliveServerNodes()) { |
| if (res == null || res.order() > node.order()) |
| res = node; |
| } |
| |
| if (res == null |
| && !ctx.discovery().allNodes().isEmpty() |
| && ctx.discovery().aliveServerNodes().isEmpty()) { |
| U.warn(log, "Cannot find the server coordinator node. " |
| + "Possible a client is started with forceServerMode=true. " + |
| "Security warning: user authentication will be disabled on the client."); |
| |
| isEnabled = false; |
| } |
| else |
| assert res != null; |
| |
| crdNode = res; |
| |
| return res; |
| } |
| } |
| } |
| |
| /** |
| * @param msg Error message. |
| */ |
| private void cancelFutures(String msg) { |
| synchronized (mux) { |
| for (UserOperationFinishFuture fut : opFinishFuts.values()) |
| fut.onDone(null, new IgniteFutureCancelledException(msg)); |
| } |
| |
| for (GridFutureAdapter<Void> fut : authFuts.values()) |
| fut.onDone(null, new IgniteFutureCancelledException(msg)); |
| } |
| |
| /** |
| * @param node Joined node ID. |
| */ |
| private void onNodeJoin(ClusterNode node) { |
| if (isNodeHoldsUsers(ctx.discovery().localNode()) && isNodeHoldsUsers(node)) { |
| synchronized (mux) { |
| for (UserOperationFinishFuture f : opFinishFuts.values()) |
| f.onNodeJoin(node.id()); |
| } |
| } |
| } |
| |
| /** |
| * @param nodeId Left node ID. |
| */ |
| private void onNodeLeft(UUID nodeId) { |
| synchronized (mux) { |
| if (!ctx.clientNode()) { |
| for (UserOperationFinishFuture f : opFinishFuts.values()) |
| f.onNodeLeft(nodeId); |
| } |
| |
| // Found all auth requests that were be sent to left node. |
| // Complete these futures with special exception to retry authentication. |
| for (Iterator<Map.Entry<IgniteUuid, AuthenticateFuture>> it = authFuts.entrySet().iterator(); |
| it.hasNext(); ) { |
| AuthenticateFuture f = it.next().getValue(); |
| |
| if (F.eq(nodeId, f.nodeId())) { |
| f.retry(true); |
| |
| f.onDone(); |
| |
| it.remove(); |
| } |
| } |
| |
| // Coordinator left |
| if (F.eq(coordinator().id(), nodeId)) { |
| // Refresh info about coordinator node. |
| crdNode = null; |
| |
| if (curOpFinishMsg != null) |
| sendFinish(curOpFinishMsg); |
| } |
| } |
| } |
| |
| /** |
| * Handle finish operation message from a node. |
| * |
| * @param nodeId Node ID. |
| * @param msg Message. |
| */ |
| private void onFinishMessage(UUID nodeId, UserManagementOperationFinishedMessage msg) { |
| if (log.isDebugEnabled()) |
| log.debug(msg.toString()); |
| |
| synchronized (mux) { |
| UserOperationFinishFuture fut = opFinishFuts.get(msg.operationId()); |
| |
| if (fut == null) { |
| fut = new UserOperationFinishFuture(msg.operationId()); |
| |
| opFinishFuts.put(msg.operationId(), fut); |
| } |
| |
| if (msg.success()) |
| fut.onSuccessOnNode(nodeId); |
| else |
| fut.onOperationFailOnNode(nodeId, msg.errorMessage()); |
| } |
| } |
| |
| /** |
| * Called when all required finish messages are received, |
| * send ACK message to complete operation futures on all nodes. |
| * |
| * @param opId Operation ID. |
| * @param err Error. |
| */ |
| private void onFinishOperation(IgniteUuid opId, IgniteCheckedException err) { |
| try { |
| UserAcceptedMessage msg = new UserAcceptedMessage(opId, err); |
| |
| ctx.discovery().sendCustomEvent(msg); |
| } |
| catch (IgniteCheckedException e) { |
| if (!e.hasCause(IgniteFutureCancelledException.class)) |
| U.error(log, "Unexpected exception on send UserAcceptedMessage.", e); |
| } |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param msg Message. |
| */ |
| private void onAuthenticateRequestMessage(UUID nodeId, UserAuthenticateRequestMessage msg) { |
| UserAuthenticateResponseMessage respMsg; |
| try { |
| User u = authenticateOnServer(msg.name(), msg.password()); |
| |
| respMsg = new UserAuthenticateResponseMessage(msg.id(), null); |
| } |
| catch (IgniteCheckedException e) { |
| respMsg = new UserAuthenticateResponseMessage(msg.id(), e.toString()); |
| |
| e.printStackTrace(); |
| } |
| |
| try { |
| ctx.io().sendToGridTopic(nodeId, GridTopic.TOPIC_AUTH, respMsg, GridIoPolicy.SYSTEM_POOL); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Unexpected exception on send UserAuthenticateResponseMessage.", e); |
| } |
| } |
| |
| /** |
| * @param msg Message. |
| */ |
| private void onAuthenticateResponseMessage(UserAuthenticateResponseMessage msg) { |
| GridFutureAdapter<Void> fut = authFuts.get(msg.id()); |
| |
| fut.onDone(null, !msg.success() ? new IgniteAccessControlException(msg.errorMessage()) : null); |
| |
| authFuts.remove(msg.id()); |
| } |
| |
| /** |
| * Local node joined to topology. Discovery cache is available but no discovery custom message are received. |
| * Initial user set and initial user operation (received on join) are processed here. |
| */ |
| public void onLocalJoin() { |
| if (coordinator() == null) |
| return; |
| |
| if (F.eq(coordinator().id(), ctx.localNodeId())) { |
| if (!isEnabled) |
| return; |
| |
| assert initUsrs == null; |
| |
| // Creates default user on coordinator if it is the first start of PDS cluster |
| // or start of in-memory cluster. |
| if (users.isEmpty()) |
| addDefaultUser(); |
| } |
| else { |
| Boolean rmtEnabled = coordinator().attribute(IgniteNodeAttributes.ATTR_AUTHENTICATION_ENABLED); |
| |
| // The cluster doesn't support authentication (ver < 2.5) |
| if (rmtEnabled == null) |
| rmtEnabled = false; |
| |
| if (isEnabled != rmtEnabled) { |
| if (rmtEnabled) |
| U.warn(log, "User authentication is enabled on cluster. Enables on local node"); |
| else { |
| validateErr = "User authentication is disabled on cluster"; |
| |
| return; |
| } |
| } |
| |
| isEnabled = rmtEnabled; |
| |
| if (!isEnabled) { |
| try { |
| stop(false); |
| } |
| catch (IgniteCheckedException e) { |
| U.warn(log, "Unexpected exception on stopped authentication processor", e); |
| } |
| |
| return; |
| } |
| |
| if (ctx.clientNode()) |
| return; |
| |
| assert initUsrs != null; |
| |
| // Can be empty on initial start of PDS cluster (default user will be created and stored after activate) |
| if (!F.isEmpty(initUsrs.usrs)) { |
| if (users == null) |
| users = new ConcurrentHashMap<>(); |
| else |
| users.clear(); |
| |
| for (User u : initUsrs.usrs) |
| users.put(u.name(), u); |
| |
| exec.execute(new RefreshUsersStorageWorker(initUsrs.usrs)); |
| } |
| |
| for (UserManagementOperation op : initUsrs.activeOps) |
| submitOperation(op); |
| } |
| |
| readyForAuthFut.onDone(); |
| } |
| |
| /** |
| * Called on node activate. |
| */ |
| public void onActivate() { |
| activateFut.onDone(); |
| } |
| |
| /** |
| * |
| */ |
| private void waitActivate() { |
| try { |
| activateFut.get(); |
| } |
| catch (IgniteCheckedException e) { |
| // No-op. |
| } |
| } |
| |
| /** |
| * @param msg Finish message. |
| */ |
| private void sendFinish(UserManagementOperationFinishedMessage msg) { |
| try { |
| ctx.io().sendToGridTopic(coordinator(), GridTopic.TOPIC_AUTH, msg, GridIoPolicy.SYSTEM_POOL); |
| } |
| catch (Exception e) { |
| U.error(log, "Failed to send UserManagementOperationFinishedMessage [op=" + msg.operationId() + |
| ", node=" + coordinator() + ", err=" + msg.errorMessage() + ']', e); |
| } |
| } |
| |
| /** |
| * Register operation, future and add operation worker to execute queue. |
| * |
| * @param op User operation. |
| */ |
| private void submitOperation(UserManagementOperation op) { |
| synchronized (mux) { |
| UserOperationFinishFuture fut = opFinishFuts.get(op.id()); |
| |
| if (fut == null) { |
| fut = new UserOperationFinishFuture(op.id()); |
| |
| opFinishFuts.put(op.id(), fut); |
| } |
| |
| if (!fut.workerSubmitted()) { |
| fut.workerSubmitted(true); |
| |
| activeOps.put(op.id(), op); |
| |
| exec.execute(new UserOperationWorker(op, fut)); |
| } |
| } |
| } |
| |
| /** |
| * @param n Node. |
| * @return {@code true} if node holds user information. Otherwise returns {@code false}. |
| */ |
| private static boolean isNodeHoldsUsers(ClusterNode n) { |
| return !n.isClient() && !n.isDaemon(); |
| } |
| |
| /** |
| * Initial data is collected on coordinator to send to join node. |
| */ |
| private static final class InitialUsersData implements Serializable { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Users. */ |
| @GridToStringInclude |
| private final ArrayList<User> usrs; |
| |
| /** Active user operations. */ |
| @GridToStringInclude |
| private final ArrayList<UserManagementOperation> activeOps; |
| |
| /** |
| * @param usrs Users. |
| * @param ops Active operations on cluster. |
| */ |
| InitialUsersData(Collection<User> usrs, Collection<UserManagementOperation> ops) { |
| this.usrs = new ArrayList<>(usrs); |
| activeOps = new ArrayList<>(ops); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(InitialUsersData.class, this); |
| } |
| } |
| |
| /**i |
| * |
| */ |
| private final class UserProposedListener implements CustomEventListener<UserProposedMessage> { |
| /** {@inheritDoc} */ |
| @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, |
| final UserProposedMessage msg) { |
| if (!isEnabled || ctx.isStopping() || ctx.clientNode()) |
| return; |
| |
| if (log.isDebugEnabled()) |
| log.debug(msg.toString()); |
| |
| submitOperation(msg.operation()); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private final class UserAcceptedListener implements CustomEventListener<UserAcceptedMessage> { |
| /** {@inheritDoc} */ |
| @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, UserAcceptedMessage msg) { |
| if (!isEnabled || ctx.isStopping()) |
| return; |
| |
| if (log.isDebugEnabled()) |
| log.debug(msg.toString()); |
| |
| synchronized (mux) { |
| UserOperationFinishFuture f = opFinishFuts.get(msg.operationId()); |
| |
| if (f != null) { |
| if (msg.error() != null) |
| f.onDone(null, msg.error()); |
| else |
| f.onDone(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Future to wait for end of user operation. Removes itself from map when completed. |
| */ |
| private class UserOperationFinishFuture extends GridFutureAdapter<Void> { |
| /** */ |
| private final Set<UUID> requiredFinish; |
| |
| /** */ |
| private final Set<UUID> receivedFinish; |
| |
| /** User management operation ID. */ |
| private final IgniteUuid opId; |
| |
| /** Worker has been already submitted flag. */ |
| private boolean workerSubmitted; |
| |
| /** Error. */ |
| private IgniteCheckedException err; |
| |
| |
| /** |
| * @param opId User management operation ID. |
| */ |
| UserOperationFinishFuture(IgniteUuid opId) { |
| this.opId = opId; |
| |
| if (!ctx.clientNode()) { |
| requiredFinish = new HashSet<>(); |
| receivedFinish = new HashSet<>(); |
| |
| for (ClusterNode node : ctx.discovery().nodes(ctx.discovery().topologyVersionEx())) { |
| if (isNodeHoldsUsers(node)) |
| requiredFinish.add(node.id()); |
| } |
| } |
| else { |
| requiredFinish = null; |
| receivedFinish = null; |
| } |
| } |
| |
| /** |
| * @return Worker submitted flag. |
| */ |
| boolean workerSubmitted() { |
| return workerSubmitted; |
| } |
| |
| /** |
| * @param val Worker submitted flag. |
| */ |
| void workerSubmitted(boolean val) { |
| workerSubmitted = val; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) { |
| boolean done = super.onDone(res, err); |
| |
| synchronized (mux) { |
| if (done) |
| opFinishFuts.remove(opId, this); |
| } |
| |
| return done; |
| } |
| |
| /** |
| * @param nodeId ID of left or failed node. |
| */ |
| synchronized void onNodeLeft(UUID nodeId) { |
| assert requiredFinish != null : "Process node left on client"; |
| |
| requiredFinish.remove(nodeId); |
| |
| checkOperationFinished(); |
| } |
| |
| /** |
| * @param id Joined node ID. |
| */ |
| synchronized void onNodeJoin(UUID id) { |
| assert requiredFinish != null : "Process node join on client"; |
| |
| requiredFinish.add(id); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| */ |
| synchronized void onSuccessOnNode(UUID nodeId) { |
| assert receivedFinish != null : "Process operation state on client"; |
| |
| receivedFinish.add(nodeId); |
| |
| checkOperationFinished(); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param errMsg Error message. |
| */ |
| synchronized void onOperationFailOnNode(UUID nodeId, String errMsg) { |
| assert receivedFinish != null : "Process operation state on client"; |
| |
| if (log.isDebugEnabled()) |
| log.debug("User operation is failed [nodeId=" + nodeId + ", err=" + errMsg + ']'); |
| |
| receivedFinish.add(nodeId); |
| |
| UserManagementException e = new UserManagementException("Operation failed [nodeId=" + nodeId |
| + ", opId=" + opId + ", err=" + errMsg + ']'); |
| |
| if (err == null) |
| err = e; |
| else |
| err.addSuppressed(e); |
| |
| checkOperationFinished(); |
| } |
| |
| /** |
| * |
| */ |
| private void checkOperationFinished() { |
| if (receivedFinish.containsAll(requiredFinish)) |
| onFinishOperation(opId, err); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class AuthenticateFuture extends GridFutureAdapter<Void> { |
| /** Node id. */ |
| private final UUID nodeId; |
| |
| /** Node id. */ |
| private boolean retry; |
| |
| /** |
| * @param nodeId ID of the node that processes authentication request. |
| */ |
| AuthenticateFuture(UUID nodeId) { |
| this.nodeId = nodeId; |
| } |
| |
| /** |
| * @return ID of the node that processes authentication request. |
| */ |
| UUID nodeId() { |
| return nodeId; |
| } |
| |
| /** |
| * @return {@code true} if need retry (aftyer node left). |
| */ |
| boolean retry() { |
| return retry; |
| } |
| |
| /** |
| * @param retry Set retry flag. |
| */ |
| void retry(boolean retry) { |
| this.retry = retry; |
| } |
| } |
| |
| /** |
| * User operation worker. |
| */ |
| private class UserOperationWorker extends GridWorker { |
| /** User operation. */ |
| private final UserManagementOperation op; |
| |
| /** Operation future. */ |
| private final UserOperationFinishFuture fut; |
| |
| /** |
| * Constructor. |
| * |
| * @param op Operation. |
| * @param fut Operation finish future. |
| */ |
| private UserOperationWorker(UserManagementOperation op, UserOperationFinishFuture fut) { |
| super(ctx.igniteInstanceName(), "auth-op-" + op.type(), IgniteAuthenticationProcessor.this.log); |
| |
| this.op = op; |
| this.fut = fut; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { |
| if (ctx.isStopping()) |
| return; |
| |
| waitActivate(); |
| |
| UserManagementOperationFinishedMessage msg0; |
| |
| if (sharedCtx != null) |
| sharedCtx.database().checkpointReadLock(); |
| |
| try { |
| processOperationLocal(op); |
| |
| msg0 = new UserManagementOperationFinishedMessage(op.id(), null); |
| } |
| catch (UserManagementException e) { |
| msg0 = new UserManagementOperationFinishedMessage(op.id(), e.toString()); |
| |
| // Remove failed operation from active operations. |
| activeOps.remove(op.id()); |
| } |
| catch (Throwable e) { |
| log.warning("Unexpected exception on perform user management operation", e); |
| |
| msg0 = new UserManagementOperationFinishedMessage(op.id(), e.toString()); |
| |
| // Remove failed operation from active operations. |
| activeOps.remove(op.id()); |
| } |
| finally { |
| if (sharedCtx != null) |
| sharedCtx.database().checkpointReadUnlock(); |
| } |
| |
| curOpFinishMsg = msg0; |
| |
| sendFinish(curOpFinishMsg); |
| |
| try { |
| fut.get(); |
| } |
| catch (IgniteCheckedException e) { |
| if (!e.hasCause(IgniteFutureCancelledException.class)) |
| U.error(log, "Unexpected exception on wait for end of user operation.", e); |
| } |
| finally { |
| curOpFinishMsg = null; |
| } |
| } |
| } |
| |
| /** |
| * Initial users set worker. |
| */ |
| private class RefreshUsersStorageWorker extends GridWorker { |
| /** */ |
| private final ArrayList<User> newUsrs; |
| |
| /** |
| * @param usrs New users to store. |
| */ |
| private RefreshUsersStorageWorker(ArrayList<User> usrs) { |
| super(ctx.igniteInstanceName(), "refresh-store", IgniteAuthenticationProcessor.this.log); |
| |
| assert !F.isEmpty(usrs); |
| |
| newUsrs = usrs; |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { |
| if (ctx.clientNode()) |
| return; |
| |
| waitActivate(); |
| |
| if (sharedCtx != null) |
| sharedCtx.database().checkpointReadLock(); |
| |
| try { |
| Map<String, User> existUsrs = (Map<String, User>)metastorage.readForPredicate( |
| (IgnitePredicate<String>)key -> key != null && key.startsWith(STORE_USER_PREFIX)); |
| |
| for (String key : existUsrs.keySet()) |
| metastorage.remove(key); |
| |
| for (User u : newUsrs) |
| metastorage.write(STORE_USER_PREFIX + u.name(), u); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Cannot cleanup old users information at metastorage", e); |
| } |
| finally { |
| if (sharedCtx != null) |
| sharedCtx.database().checkpointReadUnlock(); |
| } |
| } |
| } |
| } |