| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.ignite.internal.processors.cache.binary; |
| |
| import java.util.Iterator; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Queue; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.binary.BinaryObjectException; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.events.DiscoveryEvent; |
| import org.apache.ignite.events.Event; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.GridTopic; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.binary.BinaryMetadata; |
| import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; |
| import org.apache.ignite.internal.managers.communication.GridIoManager; |
| import org.apache.ignite.internal.managers.communication.GridMessageListener; |
| import org.apache.ignite.internal.managers.discovery.CustomEventListener; |
| import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; |
| import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.util.future.GridFutureAdapter; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteInClosure; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; |
| import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; |
| import static org.apache.ignite.internal.binary.BinaryUtils.mergeMetadata; |
| import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; |
| |
| /** |
| * Provides API for discovery-based metadata exchange protocol and communication SPI-based metadata request protocol. |
| * |
| * It is responsible for sending update and metadata requests and manages message listeners for them. |
| * |
| * It also manages synchronization logic (blocking/unblocking threads requesting updates or up-to-date metadata etc) |
| * around protocols. |
| */ |
| final class BinaryMetadataTransport { |
| /** */ |
| private final GridDiscoveryManager discoMgr; |
| |
| /** */ |
| private final GridKernalContext ctx; |
| |
| /** */ |
| private final IgniteLogger log; |
| |
| /** */ |
| private final boolean clientNode; |
| |
| /** */ |
| private final ConcurrentMap<Integer, BinaryMetadataHolder> metaLocCache; |
| |
| /** */ |
| private final BinaryMetadataFileStore metadataFileStore; |
| |
| /** */ |
| private final Queue<MetadataUpdateResultFuture> unlabeledFutures = new ConcurrentLinkedQueue<>(); |
| |
| /** */ |
| private final ConcurrentMap<SyncKey, MetadataUpdateResultFuture> syncMap = new ConcurrentHashMap<>(); |
| |
| /** It store pending update future for typeId. It allow to do only one update in one moment. */ |
| private final ConcurrentMap<Integer, MetadataUpdateResultFuture> pendingTypeIdMap = new ConcurrentHashMap<>(); |
| |
| /** */ |
| private final ConcurrentMap<Integer, ClientMetadataRequestFuture> clientReqSyncMap = new ConcurrentHashMap<>(); |
| |
| /** */ |
| private final ConcurrentMap<SyncKey, GridFutureAdapter<?>> schemaWaitFuts = new ConcurrentHashMap<>(); |
| |
| /** */ |
| private volatile boolean stopping; |
| |
| /** */ |
| private final List<BinaryMetadataUpdatedListener> binaryUpdatedLsnrs = new CopyOnWriteArrayList<>(); |
| |
| /** |
| * @param metaLocCache Metadata locale cache. |
| * @param metadataFileStore File store for binary metadata. |
| * @param ctx Context. |
| * @param log Logger. |
| */ |
| BinaryMetadataTransport( |
| ConcurrentMap<Integer, BinaryMetadataHolder> metaLocCache, |
| BinaryMetadataFileStore metadataFileStore, |
| final GridKernalContext ctx, |
| IgniteLogger log |
| ) { |
| this.metaLocCache = metaLocCache; |
| |
| this.metadataFileStore = metadataFileStore; |
| |
| this.ctx = ctx; |
| |
| this.log = log; |
| |
| discoMgr = ctx.discovery(); |
| |
| clientNode = ctx.clientNode(); |
| |
| discoMgr.setCustomEventListener(MetadataUpdateProposedMessage.class, new MetadataUpdateProposedListener()); |
| |
| discoMgr.setCustomEventListener(MetadataUpdateAcceptedMessage.class, new MetadataUpdateAcceptedListener()); |
| |
| GridIoManager ioMgr = ctx.io(); |
| |
| if (clientNode) |
| ioMgr.addMessageListener(GridTopic.TOPIC_METADATA_REQ, new MetadataResponseListener()); |
| else |
| ioMgr.addMessageListener(GridTopic.TOPIC_METADATA_REQ, new MetadataRequestListener(ioMgr)); |
| |
| if (clientNode) |
| ctx.event().addLocalEventListener(new GridLocalEventListener() { |
| @Override public void onEvent(Event evt) { |
| DiscoveryEvent evt0 = (DiscoveryEvent)evt; |
| |
| if (!ctx.isStopping()) { |
| for (ClientMetadataRequestFuture fut : clientReqSyncMap.values()) |
| fut.onNodeLeft(evt0.eventNode().id()); |
| } |
| } |
| }, EVT_NODE_LEFT, EVT_NODE_FAILED); |
| } |
| |
| /** |
| * Adds BinaryMetadata updates {@link BinaryMetadataUpdatedListener listener} to transport. |
| * |
| * @param lsnr Listener. |
| */ |
| void addBinaryMetadataUpdateListener(BinaryMetadataUpdatedListener lsnr) { |
| binaryUpdatedLsnrs.add(lsnr); |
| } |
| |
| /** |
| * Sends request to cluster proposing update for given metadata. |
| * |
| * @param newMeta Metadata proposed for update. |
| * @return Future to wait for update result on. |
| */ |
| GridFutureAdapter<MetadataUpdateResult> requestMetadataUpdate(BinaryMetadata newMeta) { |
| int typeId = newMeta.typeId(); |
| |
| MetadataUpdateResultFuture resFut; |
| |
| do { |
| BinaryMetadataHolder metaHolder = metaLocCache.get(typeId); |
| |
| BinaryMetadata oldMeta = Optional.ofNullable(metaHolder) |
| .map(BinaryMetadataHolder::metadata) |
| .orElse(null); |
| |
| BinaryMetadata mergedMeta = mergeMetadata(oldMeta, newMeta, null); |
| |
| if (mergedMeta == oldMeta) { |
| if (metaHolder.pendingVersion() == metaHolder.acceptedVersion()) |
| return null; |
| |
| return awaitMetadataUpdate(typeId, metaHolder.pendingVersion()); |
| } |
| |
| resFut = new MetadataUpdateResultFuture(typeId); |
| } |
| while (!putAndWaitPendingUpdate(typeId, resFut)); |
| |
| BinaryMetadataHolder metadataHolder = metaLocCache.get(typeId); |
| |
| BinaryMetadata oldMeta = Optional.ofNullable(metadataHolder) |
| .map(BinaryMetadataHolder::metadata) |
| .orElse(null); |
| |
| Set<Integer> changedSchemas = new LinkedHashSet<>(); |
| |
| //Ensure after putting pending future, metadata still has difference. |
| BinaryMetadata mergedMeta = mergeMetadata(oldMeta, newMeta, changedSchemas); |
| |
| if (mergedMeta == oldMeta) { |
| resFut.onDone(MetadataUpdateResult.createSuccessfulResult(-1)); |
| |
| return null; |
| } |
| |
| if (log.isDebugEnabled()) { |
| log.debug("Requesting metadata update [typeId=" + typeId + |
| ", typeName=" + mergedMeta.typeName() + |
| ", changedSchemas=" + changedSchemas + |
| ", holder=" + metadataHolder + |
| ", fut=" + resFut + |
| ']'); |
| } |
| |
| try { |
| synchronized (this) { |
| unlabeledFutures.add(resFut); |
| |
| if (!stopping) |
| discoMgr.sendCustomEvent(new MetadataUpdateProposedMessage(mergedMeta, ctx.localNodeId())); |
| else |
| resFut.onDone(MetadataUpdateResult.createUpdateDisabledResult()); |
| } |
| } |
| catch (Exception e) { |
| resFut.onDone(MetadataUpdateResult.createUpdateDisabledResult(), e); |
| } |
| |
| if (ctx.clientDisconnected()) |
| onDisconnected(); |
| |
| return resFut; |
| } |
| |
| /** |
| * Put new update future and it are waiting pending future if it exists. |
| * |
| * @param typeId Type id. |
| * @param metaUpdateFut New metadata update future. |
| * @return {@code true} If given future put successfully. |
| */ |
| private boolean putAndWaitPendingUpdate(int typeId, MetadataUpdateResultFuture metaUpdateFut) { |
| MetadataUpdateResultFuture oldFut = pendingTypeIdMap.putIfAbsent(typeId, metaUpdateFut); |
| |
| if (oldFut != null) { |
| try { |
| oldFut.get(); |
| } |
| catch (IgniteCheckedException ignore) { |
| //Stacktrace will be logged in thread which created this future. |
| log.warning("Pending update metadata process was failed. Trying to update to new metadata."); |
| } |
| |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /** |
| * Allows thread to wait for a metadata of given typeId and version to be accepted by the cluster. |
| * |
| * @param typeId ID of binary type. |
| * @param ver version of given binary type (see {@link MetadataUpdateProposedMessage} javadoc for more details). |
| * @return future to wait for update result on. |
| */ |
| GridFutureAdapter<MetadataUpdateResult> awaitMetadataUpdate(int typeId, int ver) { |
| SyncKey key = new SyncKey(typeId, ver); |
| MetadataUpdateResultFuture resFut = new MetadataUpdateResultFuture(key); |
| |
| MetadataUpdateResultFuture oldFut = syncMap.putIfAbsent(key, resFut); |
| |
| if (oldFut != null) |
| resFut = oldFut; |
| |
| BinaryMetadataHolder holder = metaLocCache.get(typeId); |
| |
| if (holder.acceptedVersion() >= ver) |
| resFut.onDone(MetadataUpdateResult.createSuccessfulResult(-1)); |
| |
| return resFut; |
| } |
| |
| /** |
| * Await specific schema update. |
| * |
| * @param typeId Type id. |
| * @param schemaId Schema id. |
| * @return Future which will be completed when schema is received. |
| */ |
| GridFutureAdapter<?> awaitSchemaUpdate(int typeId, int schemaId) { |
| GridFutureAdapter<Object> fut = new GridFutureAdapter<>(); |
| |
| // Use version for schemaId. |
| GridFutureAdapter<?> oldFut = schemaWaitFuts.putIfAbsent(new SyncKey(typeId, schemaId), fut); |
| |
| return oldFut == null ? fut : oldFut; |
| } |
| |
| /** |
| * Allows client node to request latest version of binary metadata for a given typeId from the cluster in case |
| * client is able to detect that it has obsolete metadata in its local cache. |
| * |
| * @param typeId ID of binary type. |
| * @return future to wait for request arrival on. |
| */ |
| GridFutureAdapter<MetadataUpdateResult> requestUpToDateMetadata(int typeId) { |
| ClientMetadataRequestFuture newFut = new ClientMetadataRequestFuture(ctx, typeId, clientReqSyncMap); |
| |
| ClientMetadataRequestFuture oldFut = clientReqSyncMap.putIfAbsent(typeId, newFut); |
| |
| if (oldFut != null) |
| return oldFut; |
| |
| newFut.requestMetadata(); |
| |
| return newFut; |
| } |
| |
| /** */ |
| void stop() { |
| stopping = true; |
| |
| cancelFutures(MetadataUpdateResult.createUpdateDisabledResult()); |
| } |
| |
| /** */ |
| void onDisconnected() { |
| cancelFutures(MetadataUpdateResult.createFailureResult(new BinaryObjectException("Failed to update or wait for metadata, client node disconnected"))); |
| } |
| |
| /** |
| * @param res result to cancel futures with. |
| */ |
| private void cancelFutures(MetadataUpdateResult res) { |
| for (MetadataUpdateResultFuture fut : unlabeledFutures) |
| fut.onDone(res); |
| |
| unlabeledFutures.clear(); |
| |
| for (MetadataUpdateResultFuture fut : syncMap.values()) |
| fut.onDone(res); |
| |
| for (ClientMetadataRequestFuture fut : clientReqSyncMap.values()) |
| fut.onDone(res); |
| } |
| |
| /** */ |
| private final class MetadataUpdateProposedListener implements CustomEventListener<MetadataUpdateProposedMessage> { |
| |
| /** {@inheritDoc} */ |
| @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, |
| MetadataUpdateProposedMessage msg) { |
| if (log.isDebugEnabled()) |
| log.debug("Received MetadataUpdateProposedListener [typeId=" + msg.typeId() + |
| ", typeName=" + msg.metadata().typeName() + |
| ", pendingVer=" + msg.pendingVersion() + |
| ", acceptedVer=" + msg.acceptedVersion() + |
| ", schemasCnt=" + msg.metadata().schemas().size() + ']'); |
| |
| int typeId = msg.typeId(); |
| |
| BinaryMetadataHolder holder = metaLocCache.get(typeId); |
| |
| int pendingVer; |
| int acceptedVer; |
| |
| if (msg.pendingVersion() == 0) { |
| //coordinator receives update request |
| if (holder != null) { |
| pendingVer = holder.pendingVersion() + 1; |
| acceptedVer = holder.acceptedVersion(); |
| } |
| else { |
| pendingVer = 1; |
| acceptedVer = 0; |
| } |
| |
| msg.pendingVersion(pendingVer); |
| msg.acceptedVersion(acceptedVer); |
| |
| BinaryMetadata locMeta = holder != null ? holder.metadata() : null; |
| |
| try { |
| Set<Integer> changedSchemas = new LinkedHashSet<>(); |
| |
| BinaryMetadata mergedMeta = mergeMetadata(locMeta, msg.metadata(), changedSchemas); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Versions are stamped on coordinator" + |
| " [typeId=" + typeId + |
| ", changedSchemas=" + changedSchemas + |
| ", pendingVer=" + pendingVer + |
| ", acceptedVer=" + acceptedVer + "]" |
| ); |
| |
| msg.metadata(mergedMeta); |
| } |
| catch (BinaryObjectException err) { |
| log.warning("Exception with merging metadata for typeId: " + typeId, err); |
| |
| msg.markRejected(err); |
| } |
| } |
| else { |
| pendingVer = msg.pendingVersion(); |
| acceptedVer = msg.acceptedVersion(); |
| } |
| |
| if (ctx.localNodeId().equals(msg.origNodeId())) { |
| MetadataUpdateResultFuture fut = unlabeledFutures.poll(); |
| |
| if (msg.rejected()) |
| fut.onDone(MetadataUpdateResult.createFailureResult(msg.rejectionError())); |
| else { |
| if (clientNode) { |
| BinaryMetadataHolder newHolder = new BinaryMetadataHolder(msg.metadata(), pendingVer, acceptedVer); |
| |
| holder = metaLocCache.putIfAbsent(typeId, newHolder); |
| |
| if (holder != null) { |
| boolean obsoleteUpd = false; |
| |
| do { |
| holder = metaLocCache.get(typeId); |
| |
| if (obsoleteUpdate( |
| holder.pendingVersion(), |
| holder.acceptedVersion(), |
| pendingVer, |
| acceptedVer)) { |
| obsoleteUpd = true; |
| |
| fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1)); |
| |
| break; |
| } |
| } |
| while (!metaLocCache.replace(typeId, holder, newHolder)); |
| |
| if (!obsoleteUpd) |
| initSyncFor(typeId, pendingVer, fut); |
| } |
| else |
| initSyncFor(typeId, pendingVer, fut); |
| } |
| else { |
| initSyncFor(typeId, pendingVer, fut); |
| |
| BinaryMetadataHolder newHolder = new BinaryMetadataHolder(msg.metadata(), pendingVer, acceptedVer); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Updated metadata on originating node: " + newHolder); |
| |
| metaLocCache.put(typeId, newHolder); |
| |
| metadataFileStore.prepareMetadataWriting(msg.metadata(), pendingVer); |
| } |
| } |
| } |
| else { |
| if (!msg.rejected()) { |
| BinaryMetadata locMeta = holder != null ? holder.metadata() : null; |
| |
| Set<Integer> changedSchemas = new LinkedHashSet<>(); |
| |
| try { |
| BinaryMetadata mergedMeta = mergeMetadata(locMeta, msg.metadata(), changedSchemas); |
| |
| BinaryMetadataHolder newHolder = new BinaryMetadataHolder(mergedMeta, pendingVer, acceptedVer); |
| |
| if (clientNode) { |
| holder = metaLocCache.putIfAbsent(typeId, newHolder); |
| |
| if (holder != null) { |
| do { |
| holder = metaLocCache.get(typeId); |
| |
| if (obsoleteUpdate( |
| holder.pendingVersion(), |
| holder.acceptedVersion(), |
| pendingVer, |
| acceptedVer)) |
| break; |
| |
| } |
| while (!metaLocCache.replace(typeId, holder, newHolder)); |
| } |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Updated metadata on server node [holder=" + newHolder + |
| ", changedSchemas=" + changedSchemas + ']'); |
| |
| metaLocCache.put(typeId, newHolder); |
| |
| metadataFileStore.prepareMetadataWriting(mergedMeta, pendingVer); |
| } |
| } |
| catch (BinaryObjectException ignored) { |
| assert false : msg; |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * @param typeId Type ID. |
| * @param pendingVer Pending version. |
| * @param fut Future. |
| */ |
| private void initSyncFor(int typeId, int pendingVer, final MetadataUpdateResultFuture fut) { |
| if (stopping) { |
| fut.onDone(MetadataUpdateResult.createUpdateDisabledResult()); |
| |
| return; |
| } |
| |
| SyncKey key = new SyncKey(typeId, pendingVer); |
| |
| MetadataUpdateResultFuture oldFut = syncMap.putIfAbsent(key, fut); |
| |
| if (oldFut != null) { |
| oldFut.listen(new IgniteInClosure<IgniteInternalFuture<MetadataUpdateResult>>() { |
| @Override public void apply(IgniteInternalFuture<MetadataUpdateResult> doneFut) { |
| fut.onDone(doneFut.result(), doneFut.error()); |
| } |
| }); |
| } |
| |
| fut.key(key); |
| } |
| |
| /** |
| * |
| */ |
| private final class MetadataUpdateAcceptedListener implements CustomEventListener<MetadataUpdateAcceptedMessage> { |
| |
| /** {@inheritDoc} */ |
| @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, |
| MetadataUpdateAcceptedMessage msg) { |
| if (log.isDebugEnabled()) |
| log.debug("Received MetadataUpdateAcceptedMessage " + msg); |
| |
| if (msg.duplicated()) |
| return; |
| |
| int typeId = msg.typeId(); |
| |
| BinaryMetadataHolder holder = metaLocCache.get(typeId); |
| |
| assert holder != null : "No metadata found for typeId " + typeId; |
| |
| int newAcceptedVer = msg.acceptedVersion(); |
| |
| if (clientNode) { |
| BinaryMetadataHolder newHolder = new BinaryMetadataHolder(holder.metadata(), |
| holder.pendingVersion(), newAcceptedVer); |
| |
| do { |
| holder = metaLocCache.get(typeId); |
| |
| int oldAcceptedVer = holder.acceptedVersion(); |
| |
| if (oldAcceptedVer > newAcceptedVer) |
| break; |
| } |
| while (!metaLocCache.replace(typeId, holder, newHolder)); |
| } |
| else { |
| int oldAcceptedVer = holder.acceptedVersion(); |
| |
| if (oldAcceptedVer >= newAcceptedVer) { |
| if (log.isDebugEnabled()) |
| log.debug("Marking ack as duplicate [holder=" + holder + |
| ", newAcceptedVer=" + newAcceptedVer + ']'); |
| |
| //this is duplicate ack |
| msg.duplicated(true); |
| |
| metadataFileStore.finishWrite(typeId, newAcceptedVer); |
| |
| return; |
| } |
| |
| metadataFileStore.writeMetadataAsync(typeId, newAcceptedVer); |
| |
| metaLocCache.put(typeId, new BinaryMetadataHolder(holder.metadata(), holder.pendingVersion(), newAcceptedVer)); |
| } |
| |
| for (BinaryMetadataUpdatedListener lsnr : binaryUpdatedLsnrs) |
| lsnr.binaryMetadataUpdated(holder.metadata()); |
| |
| GridFutureAdapter<MetadataUpdateResult> fut = syncMap.get(new SyncKey(typeId, newAcceptedVer)); |
| |
| holder = metaLocCache.get(typeId); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Completing future " + fut + " for " + holder); |
| |
| if (!schemaWaitFuts.isEmpty()) { |
| Iterator<Map.Entry<SyncKey, GridFutureAdapter<?>>> iter = schemaWaitFuts.entrySet().iterator(); |
| |
| while (iter.hasNext()) { |
| Map.Entry<SyncKey, GridFutureAdapter<?>> entry = iter.next(); |
| |
| SyncKey key = entry.getKey(); |
| |
| if (key.typeId() == typeId && holder.metadata().hasSchema(key.version())) { |
| entry.getValue().onDone(); |
| |
| iter.remove(); |
| } |
| } |
| } |
| |
| if (fut != null) |
| fut.onDone(MetadataUpdateResult.createSuccessfulResult(newAcceptedVer)); |
| } |
| } |
| |
| /** |
| * Future class responsible for blocking threads until particular events with metadata updates happen, e.g. arriving |
| * {@link MetadataUpdateAcceptedMessage} acknowledgment or {@link MetadataResponseMessage} response. |
| */ |
| public final class MetadataUpdateResultFuture extends GridFutureAdapter<MetadataUpdateResult> { |
| /** */ |
| MetadataUpdateResultFuture(int typeId) { |
| this.key = new SyncKey(typeId, 0); |
| } |
| |
| /** |
| * @param key key in syncMap this future was added under. |
| */ |
| MetadataUpdateResultFuture(SyncKey key) { |
| this.key = key; |
| } |
| |
| /** */ |
| private SyncKey key; |
| |
| /** {@inheritDoc} */ |
| @Override public boolean onDone(@Nullable MetadataUpdateResult res, @Nullable Throwable err) { |
| assert res != null; |
| |
| boolean done = super.onDone(res, err); |
| |
| if (done && key != null) { |
| syncMap.remove(key, this); |
| pendingTypeIdMap.remove(key.typeId, this); |
| } |
| |
| return done; |
| } |
| |
| /** |
| * @param key Key. |
| */ |
| void key(SyncKey key) { |
| this.key = key; |
| } |
| |
| /** */ |
| public int typeVersion() { |
| return key.ver; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(MetadataUpdateResultFuture.class, this); |
| } |
| } |
| |
| /** |
| * Key for mapping arriving {@link MetadataUpdateAcceptedMessage} messages to {@link MetadataUpdateResultFuture}s |
| * other threads may be waiting on. |
| */ |
| private static final class SyncKey { |
| /** */ |
| private final int typeId; |
| |
| /** */ |
| private final int ver; |
| |
| /** |
| * @param typeId Type id. |
| * @param ver Version. |
| */ |
| private SyncKey(int typeId, int ver) { |
| this.typeId = typeId; |
| this.ver = ver; |
| } |
| |
| /** |
| * @return Type ID. |
| */ |
| int typeId() { |
| return typeId; |
| } |
| |
| /** |
| * @return Version. |
| */ |
| int version() { |
| return ver; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int hashCode() { |
| return typeId + ver; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean equals(Object o) { |
| if (o == this) |
| return true; |
| |
| if (!(o instanceof SyncKey)) |
| return false; |
| |
| SyncKey that = (SyncKey)o; |
| |
| return (typeId == that.typeId) && (ver == that.ver); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(SyncKey.class, this); |
| } |
| } |
| |
| /** |
| * Listener is registered on each server node in cluster waiting for metadata requests from clients. |
| */ |
| private final class MetadataRequestListener implements GridMessageListener { |
| /** */ |
| private final GridIoManager ioMgr; |
| |
| /** |
| * @param ioMgr IO manager. |
| */ |
| MetadataRequestListener(GridIoManager ioMgr) { |
| this.ioMgr = ioMgr; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onMessage(UUID nodeId, Object msg, byte plc) { |
| assert msg instanceof MetadataRequestMessage : msg; |
| |
| MetadataRequestMessage msg0 = (MetadataRequestMessage)msg; |
| |
| int typeId = msg0.typeId(); |
| |
| BinaryMetadataHolder metaHolder = metaLocCache.get(typeId); |
| |
| MetadataResponseMessage resp = new MetadataResponseMessage(typeId); |
| |
| byte[] binMetaBytes = null; |
| |
| if (metaHolder != null) { |
| try { |
| binMetaBytes = U.marshal(ctx, metaHolder); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to marshal binary metadata for [typeId=" + typeId + ']', e); |
| |
| resp.markErrorOnRequest(); |
| } |
| } |
| |
| resp.binaryMetadataBytes(binMetaBytes); |
| |
| try { |
| ioMgr.sendToGridTopic(nodeId, GridTopic.TOPIC_METADATA_REQ, resp, SYSTEM_POOL); |
| } |
| catch (ClusterTopologyCheckedException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to send metadata response, node failed: " + nodeId); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to send up-to-date metadata response.", e); |
| } |
| } |
| } |
| |
| /** |
| * Listener is registered on each client node and listens for metadata responses from cluster. |
| */ |
| private final class MetadataResponseListener implements GridMessageListener { |
| /** {@inheritDoc} */ |
| @Override public void onMessage(UUID nodeId, Object msg, byte plc) { |
| assert msg instanceof MetadataResponseMessage : msg; |
| |
| MetadataResponseMessage msg0 = (MetadataResponseMessage)msg; |
| |
| int typeId = msg0.typeId(); |
| |
| byte[] binMetaBytes = msg0.binaryMetadataBytes(); |
| |
| ClientMetadataRequestFuture fut = clientReqSyncMap.get(typeId); |
| |
| if (fut == null) |
| return; |
| |
| if (msg0.metadataNotFound()) { |
| fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1)); |
| |
| return; |
| } |
| |
| try { |
| BinaryMetadataHolder newHolder = U.unmarshal(ctx, binMetaBytes, U.resolveClassLoader(ctx.config())); |
| |
| BinaryMetadataHolder oldHolder = metaLocCache.putIfAbsent(typeId, newHolder); |
| |
| if (oldHolder != null) { |
| do { |
| oldHolder = metaLocCache.get(typeId); |
| |
| // typeId metadata cannot be removed after initialization. |
| if (obsoleteUpdate( |
| oldHolder.pendingVersion(), |
| oldHolder.acceptedVersion(), |
| newHolder.pendingVersion(), |
| newHolder.acceptedVersion())) |
| break; |
| } |
| while (!metaLocCache.replace(typeId, oldHolder, newHolder)); |
| } |
| |
| fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1)); |
| } |
| catch (IgniteCheckedException e) { |
| fut.onDone(MetadataUpdateResult.createFailureResult(new BinaryObjectException(e))); |
| } |
| } |
| } |
| |
| /** |
| * Method checks if arrived metadata is obsolete comparing to the one from local cache. |
| * |
| * @param locP pendingVersion of metadata from local cache. |
| * @param locA acceptedVersion of metadata from local cache. |
| * @param remP pendingVersion of metadata from arrived message (client response/proposed/accepted). |
| * @param remA acceptedVersion of metadata from arrived message (client response/proposed/accepted). |
| * @return {@code true} is |
| */ |
| private static boolean obsoleteUpdate(int locP, int locA, int remP, int remA) { |
| return (remP < locP) || (remP == locP && remA < locA); |
| } |
| } |