| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.metastorage.client; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ConcurrentHashMap; |
| import org.apache.ignite.internal.metastorage.common.OperationType; |
| import org.apache.ignite.internal.metastorage.common.command.ConditionInfo; |
| import org.apache.ignite.internal.metastorage.common.command.GetAllCommand; |
| import org.apache.ignite.internal.metastorage.common.command.GetAndPutAllCommand; |
| import org.apache.ignite.internal.metastorage.common.command.GetAndPutCommand; |
| import org.apache.ignite.internal.metastorage.common.command.GetAndRemoveAllCommand; |
| import org.apache.ignite.internal.metastorage.common.command.GetAndRemoveCommand; |
| import org.apache.ignite.internal.metastorage.common.command.GetCommand; |
| import org.apache.ignite.internal.metastorage.common.command.InvokeCommand; |
| import org.apache.ignite.internal.metastorage.common.command.MultipleEntryResponse; |
| import org.apache.ignite.internal.metastorage.common.command.OperationInfo; |
| import org.apache.ignite.internal.metastorage.common.command.PutAllCommand; |
| import org.apache.ignite.internal.metastorage.common.command.PutCommand; |
| import org.apache.ignite.internal.metastorage.common.command.RangeCommand; |
| import org.apache.ignite.internal.metastorage.common.command.RemoveAllCommand; |
| import org.apache.ignite.internal.metastorage.common.command.RemoveCommand; |
| import org.apache.ignite.internal.metastorage.common.command.SingleEntryResponse; |
| import org.apache.ignite.internal.metastorage.common.command.WatchExactKeysCommand; |
| import org.apache.ignite.internal.metastorage.common.command.WatchRangeKeysCommand; |
| import org.apache.ignite.internal.util.Cursor; |
| import org.apache.ignite.lang.ByteArray; |
| import org.apache.ignite.lang.IgniteInternalException; |
| import org.apache.ignite.lang.IgniteLogger; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.apache.ignite.raft.client.service.RaftGroupService; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| |
| /** |
| * {@link MetaStorageService} implementation. |
| */ |
| public class MetaStorageServiceImpl implements MetaStorageService { |
| /** The logger. */ |
| private static final IgniteLogger LOG = IgniteLogger.forClass(MetaStorageServiceImpl.class); |
| |
| /** Meta storage raft group service. */ |
| private final RaftGroupService metaStorageRaftGrpSvc; |
| |
| // TODO: IGNITE-14691 Temporally solution that should be removed after implementing reactive watches. |
| /** Watch processor, that uses pulling logic in order to retrieve watch notifications from server. */ |
| private final WatchProcessor watchProcessor; |
| |
| /** |
| * @param metaStorageRaftGrpSvc Meta storage raft group service. |
| */ |
| public MetaStorageServiceImpl(RaftGroupService metaStorageRaftGrpSvc) { |
| this.metaStorageRaftGrpSvc = metaStorageRaftGrpSvc; |
| this.watchProcessor = new WatchProcessor(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Entry> get(@NotNull ByteArray key) { |
| return metaStorageRaftGrpSvc.run(new GetCommand(key)).thenApply(MetaStorageServiceImpl::singleEntryResult); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Entry> get(@NotNull ByteArray key, long revUpperBound) { |
| return metaStorageRaftGrpSvc.run(new GetCommand(key, revUpperBound)) |
| .thenApply(MetaStorageServiceImpl::singleEntryResult); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> keys) { |
| return metaStorageRaftGrpSvc.run(new GetAllCommand(keys)) |
| .thenApply(MetaStorageServiceImpl::multipleEntryResult); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> keys, long revUpperBound) { |
| return metaStorageRaftGrpSvc.run(new GetAllCommand(keys, revUpperBound)). |
| thenApply(MetaStorageServiceImpl::multipleEntryResult); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Void> put(@NotNull ByteArray key, @NotNull byte[] value) { |
| return metaStorageRaftGrpSvc.run(new PutCommand(key, value)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Entry> getAndPut(@NotNull ByteArray key, @NotNull byte[] value) { |
| return metaStorageRaftGrpSvc.run(new GetAndPutCommand(key, value)) |
| .thenApply(MetaStorageServiceImpl::singleEntryResult); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Void> putAll(@NotNull Map<ByteArray, byte[]> vals) { |
| return metaStorageRaftGrpSvc.run(new PutAllCommand(vals)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAndPutAll(@NotNull Map<ByteArray, byte[]> vals) { |
| return metaStorageRaftGrpSvc.run(new GetAndPutAllCommand(vals)). |
| thenApply(MetaStorageServiceImpl::multipleEntryResult); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Void> remove(@NotNull ByteArray key) { |
| return metaStorageRaftGrpSvc.run(new RemoveCommand(key)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Entry> getAndRemove(@NotNull ByteArray key) { |
| return metaStorageRaftGrpSvc.run(new GetAndRemoveCommand(key)) |
| .thenApply(MetaStorageServiceImpl::singleEntryResult); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Void> removeAll(@NotNull Set<ByteArray> keys) { |
| return metaStorageRaftGrpSvc.run(new RemoveAllCommand(keys)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAndRemoveAll(@NotNull Set<ByteArray> keys) { |
| return metaStorageRaftGrpSvc.run(new GetAndRemoveAllCommand(keys)). |
| thenApply(MetaStorageServiceImpl::multipleEntryResult); |
| } |
| |
| @Override public @NotNull CompletableFuture<Boolean> invoke( |
| @NotNull Condition condition, |
| @NotNull Operation success, |
| @NotNull Operation failure |
| ) { |
| return invoke(condition, List.of(success), List.of(failure)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Boolean> invoke( |
| @NotNull Condition condition, |
| @NotNull Collection<Operation> success, |
| @NotNull Collection<Operation> failure |
| ) { |
| ConditionInfo cond = toConditionInfo(condition); |
| |
| List<OperationInfo> successOps = toOperationInfos(success); |
| |
| List<OperationInfo> failureOps = toOperationInfos(failure); |
| |
| return metaStorageRaftGrpSvc.run(new InvokeCommand(cond, successOps, failureOps)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull Cursor<Entry> range(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo, long revUpperBound) { |
| return new CursorImpl<>( |
| metaStorageRaftGrpSvc, |
| metaStorageRaftGrpSvc.run(new RangeCommand(keyFrom, keyTo, revUpperBound)), |
| MetaStorageServiceImpl::singleEntryResult |
| ); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull Cursor<Entry> range(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo) { |
| return new CursorImpl<>( |
| metaStorageRaftGrpSvc, |
| metaStorageRaftGrpSvc.run(new RangeCommand(keyFrom, keyTo)), |
| MetaStorageServiceImpl::singleEntryResult |
| ); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<IgniteUuid> watch( |
| @Nullable ByteArray keyFrom, |
| @Nullable ByteArray keyTo, |
| long revision, |
| @NotNull WatchListener lsnr |
| ) { |
| CompletableFuture<IgniteUuid> watchRes = |
| metaStorageRaftGrpSvc.run(new WatchRangeKeysCommand(keyFrom, keyTo, revision)); |
| |
| watchRes.thenAccept( |
| watchId -> watchProcessor.addWatch( |
| watchId, |
| new CursorImpl<>(metaStorageRaftGrpSvc, watchRes, MetaStorageServiceImpl::watchResponse), |
| lsnr |
| ) |
| ); |
| |
| return watchRes; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<IgniteUuid> watch( |
| @NotNull ByteArray key, |
| long revision, |
| @NotNull WatchListener lsnr |
| ) { |
| return watch(key, null, revision, lsnr); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<IgniteUuid> watch( |
| @NotNull Set<ByteArray> keys, |
| long revision, |
| @NotNull WatchListener lsnr |
| ) { |
| CompletableFuture<IgniteUuid> watchRes = |
| metaStorageRaftGrpSvc.run(new WatchExactKeysCommand(keys, revision)); |
| |
| watchRes.thenAccept( |
| watchId -> watchProcessor.addWatch( |
| watchId, |
| new CursorImpl<>(metaStorageRaftGrpSvc, watchRes, MetaStorageServiceImpl::watchResponse), |
| lsnr |
| ) |
| ); |
| |
| return watchRes; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Void> stopWatch(@NotNull IgniteUuid id) { |
| return CompletableFuture.runAsync(() -> watchProcessor.stopWatch(id)); |
| } |
| |
| // TODO: IGNITE-14734 Implement. |
| /** {@inheritDoc} */ |
| @Override public @NotNull CompletableFuture<Void> compact() { |
| return null; |
| } |
| |
| /** */ |
| private static List<OperationInfo> toOperationInfos(Collection<Operation> ops) { |
| List<OperationInfo> res = new ArrayList<>(ops.size()); |
| |
| for (Operation op : ops) { |
| OperationInfo info = null; |
| |
| if (op.type() == OperationType.NO_OP) |
| info = new OperationInfo(null, null, OperationType.NO_OP); |
| else if (op.type() == OperationType.REMOVE) |
| info = new OperationInfo(((Operation.RemoveOp)op.inner()).key(), null, OperationType.REMOVE); |
| else if (op.type() == OperationType.PUT) { |
| Operation.PutOp inner = (Operation.PutOp)op.inner(); |
| |
| info = new OperationInfo(inner.key(), inner.value(), OperationType.PUT); |
| } |
| else |
| assert false : "Unknown operation type " + op.type(); |
| |
| res.add(info); |
| } |
| |
| return res; |
| } |
| |
| /** */ |
| private static ConditionInfo toConditionInfo(@NotNull Condition condition) { |
| ConditionInfo cnd = null; |
| |
| Object obj = condition.inner(); |
| |
| if (obj instanceof Condition.ExistenceCondition) { |
| Condition.ExistenceCondition inner = (Condition.ExistenceCondition)obj; |
| |
| cnd = new ConditionInfo(inner.key(), inner.type(), null, 0); |
| } |
| else if (obj instanceof Condition.RevisionCondition) { |
| Condition.RevisionCondition inner = (Condition.RevisionCondition)obj; |
| |
| cnd = new ConditionInfo(inner.key(), inner.type(), null, inner.revision()); |
| } |
| else if (obj instanceof Condition.ValueCondition) { |
| Condition.ValueCondition inner = (Condition.ValueCondition)obj; |
| |
| cnd = new ConditionInfo(inner.key(), inner.type(), inner.value(), 0); |
| } |
| else |
| assert false : "Unknown condition type: " + obj.getClass().getSimpleName(); |
| |
| return cnd; |
| } |
| |
| /** */ |
| private static Map<ByteArray, Entry> multipleEntryResult(Object obj) { |
| MultipleEntryResponse resp = (MultipleEntryResponse) obj; |
| |
| Map<ByteArray, Entry> res = new HashMap<>(); |
| |
| for (SingleEntryResponse e : resp.entries()) { |
| ByteArray key = new ByteArray(e.key()); |
| |
| res.put(key, new EntryImpl(key, e.value(), e.revision(), e.updateCounter())); |
| } |
| |
| return res; |
| } |
| |
| /** */ |
| private static Entry singleEntryResult(Object obj) { |
| SingleEntryResponse resp = (SingleEntryResponse) obj; |
| |
| return new EntryImpl(new ByteArray(resp.key()), resp.value(), resp.revision(), resp.updateCounter()); |
| } |
| |
| /** */ |
| private static WatchEvent watchResponse(Object obj) { |
| MultipleEntryResponse resp = (MultipleEntryResponse) obj; |
| |
| List<EntryEvent> evts = new ArrayList<>(resp.entries().size() / 2); |
| |
| Entry o = null; |
| Entry n; |
| |
| for (int i = 0; i < resp.entries().size(); i++) { |
| SingleEntryResponse s = resp.entries().get(i); |
| |
| EntryImpl e = new EntryImpl(new ByteArray(s.key()), s.value(), s.revision(), s.updateCounter()); |
| |
| if (i % 2 == 0) |
| o = e; |
| else { |
| n = e; |
| |
| evts.add(new EntryEvent(o, n)); |
| } |
| } |
| |
| return new WatchEvent(evts); |
| } |
| |
| // TODO: IGNITE-14691 Temporally solution that should be removed after implementing reactive watches. |
| /** Watch processor, that manages {@link Watcher} threads. */ |
| private final class WatchProcessor { |
| /** Active Watcher threads that process notification pulling logic. */ |
| private final Map<IgniteUuid, Watcher> watchers = new ConcurrentHashMap<>(); |
| |
| /** |
| * Starts exclusive thread per watch that implement watch pulling logic and |
| * calls {@link WatchListener#onUpdate(WatchEvent)}} or {@link WatchListener#onError(Throwable)}. |
| * |
| * @param watchId Watch id. |
| * @param cursor Watch Cursor. |
| * @param lsnr The listener which receives and handles watch updates. |
| */ |
| private void addWatch(IgniteUuid watchId, CursorImpl<WatchEvent> cursor, WatchListener lsnr) { |
| Watcher watcher = new Watcher(cursor, lsnr); |
| |
| watchers.put(watchId, watcher); |
| |
| watcher.start(); |
| } |
| |
| /** |
| * Closes server cursor and interrupts watch pulling thread. |
| * |
| * @param watchId Watch id. |
| */ |
| private void stopWatch(IgniteUuid watchId) { |
| watchers.computeIfPresent( |
| watchId, |
| (k, v) -> { |
| CompletableFuture.runAsync(v::interrupt).thenRun(() -> { |
| try { |
| v.stop = true; |
| |
| Thread.sleep(100); |
| |
| v.cursor.close(); |
| } |
| catch (InterruptedException e) { |
| throw new IgniteInternalException(e); |
| } |
| catch (Exception e) { |
| // TODO: IGNITE-14693 Implement Meta storage exception handling logic. |
| LOG.error("Unexpected exception", e); |
| } |
| }); |
| return null; |
| } |
| ); |
| } |
| |
| /** Watcher thread, uses pulling logic in order to retrieve watch notifications from server */ |
| private final class Watcher extends Thread { |
| /** */ |
| private volatile boolean stop = false; |
| |
| /** Watch event cursor. */ |
| private Cursor<WatchEvent> cursor; |
| |
| /** The listener which receives and handles watch updates. */ |
| private WatchListener lsnr; |
| |
| /** |
| * @param cursor Watch event cursor. |
| * @param lsnr The listener which receives and handles watch updates. |
| */ |
| Watcher(Cursor<WatchEvent> cursor, WatchListener lsnr) { |
| this.cursor = cursor; |
| this.lsnr = lsnr; |
| } |
| |
| /** |
| * Pulls watch events from server side with the help of cursor.iterator.hasNext()/next() |
| * in the while(true) loop. Collects watch events with same revision and fires either onUpdate or onError(). |
| */ |
| @Override public void run() { |
| Iterator<WatchEvent> watchEvtsIter = cursor.iterator(); |
| |
| while (!stop) { |
| try { |
| if (watchEvtsIter.hasNext()) { |
| WatchEvent watchEvt = null; |
| |
| try { |
| watchEvt = watchEvtsIter.next(); |
| } |
| catch (Throwable e) { |
| lsnr.onError(e); |
| } |
| |
| assert watchEvt != null; |
| |
| lsnr.onUpdate(watchEvt); |
| } |
| else |
| Thread.sleep(10); |
| } |
| catch (Throwable e) { |
| if (e instanceof InterruptedException || e.getCause() instanceof InterruptedException) |
| break; |
| else { |
| // TODO: IGNITE-14693 Implement Meta storage exception handling logic. |
| LOG.error("Unexpected exception", e); |
| } |
| } |
| } |
| } |
| } |
| } |
| } |