blob: fbcd107df56f9f32460f991ef0a6cd0c597b7170 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.impl.mock;
import accord.api.MessageSink;
import accord.api.TestableConfigurationService;
import accord.local.Node;
import accord.primitives.Ranges;
import accord.topology.Topology;
import accord.utils.EpochFunction;
import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResult;
import accord.utils.async.AsyncResults;
import org.junit.jupiter.api.Assertions;
import java.util.*;
public class MockConfigurationService implements TestableConfigurationService
{
private final MessageSink messageSink;
private final List<Topology> epochs = new ArrayList<>();
private final Map<Long, EpochReady> acks = new HashMap<>();
private final List<AsyncResult<Void>> syncs = new ArrayList<>();
private final List<Listener> listeners = new ArrayList<>();
private final EpochFunction<MockConfigurationService> fetchTopologyHandler;
public MockConfigurationService(MessageSink messageSink, EpochFunction<MockConfigurationService> fetchTopologyHandler)
{
this.messageSink = messageSink;
this.fetchTopologyHandler = fetchTopologyHandler;
epochs.add(Topology.EMPTY);
}
public MockConfigurationService(MessageSink messageSink, EpochFunction<MockConfigurationService> fetchTopologyHandler, Topology initialTopology)
{
this(messageSink, fetchTopologyHandler);
reportTopology(initialTopology);
}
@Override
public synchronized void registerListener(Listener listener)
{
listeners.add(listener);
}
@Override
public synchronized Topology currentTopology()
{
return epochs.get(epochs.size() - 1);
}
@Override
public synchronized Topology getTopologyForEpoch(long epoch)
{
return epoch >= epochs.size() ? null : epochs.get((int) epoch);
}
@Override
public synchronized void fetchTopologyForEpoch(long epoch)
{
if (epoch < epochs.size())
return;
fetchTopologyHandler.apply(epoch, this);
return;
}
@Override
public synchronized void acknowledgeEpoch(EpochReady epoch, boolean startSync)
{
Assertions.assertFalse(acks.containsKey(epoch.epoch));
acks.put(epoch.epoch, epoch);
}
public synchronized EpochReady ackFor(long epoch)
{
return acks.get(epoch);
}
@Override
public void reportEpochClosed(Ranges ranges, long epoch)
{
}
@Override
public void reportEpochRedundant(Ranges ranges, long epoch)
{
}
@Override
public synchronized void reportTopology(Topology topology)
{
if (topology.epoch() > epochs.size())
return;
Assertions.assertEquals(topology.epoch(), epochs.size());
epochs.add(topology);
List<AsyncResult<Void>> futures = new ArrayList<>();
for (Listener listener : listeners)
futures.add(listener.onTopologyUpdate(topology, true));
AsyncResult<Void> result = futures.isEmpty()
? AsyncResults.success(null)
: AsyncChains.reduce(futures, (a, b) -> null).beginAsResult();
syncs.add(result);
}
public synchronized void reportSyncComplete(Node.Id node, long epoch)
{
for (Listener listener : listeners)
listener.onRemoteSyncComplete(node, epoch);
}
}