blob: d19057d431ea35cc8a7e7b7d9a8cd1ed4c93bc6e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.storage;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.configuration.storage.ConfigurationStorage;
import org.apache.ignite.configuration.storage.ConfigurationStorageListener;
import org.apache.ignite.configuration.storage.ConfigurationType;
import org.apache.ignite.configuration.storage.Data;
import org.apache.ignite.configuration.storage.StorageException;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.internal.metastorage.client.Conditions;
import org.apache.ignite.internal.metastorage.client.Entry;
import org.apache.ignite.internal.metastorage.client.EntryEvent;
import org.apache.ignite.internal.metastorage.client.Operation;
import org.apache.ignite.internal.metastorage.client.Operations;
import org.apache.ignite.internal.metastorage.client.WatchEvent;
import org.apache.ignite.internal.metastorage.client.WatchListener;
import org.jetbrains.annotations.NotNull;
/**
* Distributed configuration storage.
*/
public class DistributedConfigurationStorage implements ConfigurationStorage {
/** Prefix that we add to configuration keys to distinguish them in meta storage. Must end with dot. */
private static final String DISTRIBUTED_PREFIX = "dst-cfg.";
/** Logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(DistributedConfigurationStorage.class);
/**
* Key for CAS-ing configuration keys to meta storage. This key is expected to be the first key in lexicographical
* order of distributed configuration keys.
*/
private static final ByteArray MASTER_KEY = new ByteArray(DISTRIBUTED_PREFIX);
/**
* This key is expected to be the last key in lexicographical order of distributed configuration keys. It is
* possible because keys are in lexicographical order in meta storage and adding {@code (char)('.' + 1)} to the end
* will produce all keys with prefix {@link DistributedConfigurationStorage#DISTRIBUTED_PREFIX}
*/
private static final ByteArray DST_KEYS_END_RANGE =
new ByteArray(DISTRIBUTED_PREFIX.substring(0, DISTRIBUTED_PREFIX.length() - 1) + (char)('.' + 1));
/** MetaStorage manager */
private final MetaStorageManager metaStorageMgr;
/** Configuration changes listener. */
private ConfigurationStorageListener lsnr;
/** Storage version. It stores actual meta storage revision, that is applied to configuration manager. */
private final AtomicLong ver = new AtomicLong(0L);
/**
* Constructor.
*
* @param metaStorageMgr MetaStorage Manager.
*/
public DistributedConfigurationStorage(MetaStorageManager metaStorageMgr) {
this.metaStorageMgr = metaStorageMgr;
}
/** {@inheritDoc} */
@Override public synchronized Data readAll() throws StorageException {
HashMap<String, Serializable> data = new HashMap<>();
Iterator<Entry> entries = allDistributedConfigKeys().iterator();
long maxRevision = 0L;
if (!entries.hasNext())
return new Data(data, ver.get());
Entry entryForMasterKey = entries.next();
// First key must be the masterKey because it's supposed to be the first in lexicographical order
assert entryForMasterKey.key().equals(MASTER_KEY);
while (entries.hasNext()) {
Entry entry = entries.next();
data.put(entry.key().toString().substring((DISTRIBUTED_PREFIX).length()), (Serializable)ByteUtils.fromBytes(entry.value()));
// Move to stream
if (maxRevision < entry.revision())
maxRevision = entry.revision();
}
if (!data.isEmpty()) {
assert maxRevision == entryForMasterKey.revision();
assert maxRevision >= ver.get();
return new Data(data, maxRevision);
}
return new Data(data, ver.get());
}
/** {@inheritDoc} */
@Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long sentVersion) {
assert sentVersion <= ver.get();
assert lsnr != null : "Configuration listener must be initialized before write.";
if (sentVersion != ver.get())
// This means that sentVersion is less than version and other node has already updated configuration and
// write should be retried. Actual version will be set when watch and corresponding configuration listener
// updates configuration and notifyApplied is triggered afterwards.
return CompletableFuture.completedFuture(false);
HashSet<Operation> operations = new HashSet<>();
for (Map.Entry<String, Serializable> entry : newValues.entrySet()) {
ByteArray key = new ByteArray(DISTRIBUTED_PREFIX + entry.getKey());
if (entry.getValue() != null)
// TODO: investigate overhead when serialize int, long, double, boolean, string, arrays of above
// TODO: https://issues.apache.org/jira/browse/IGNITE-14698
operations.add(Operations.put(key, ByteUtils.toBytes(entry.getValue())));
else
operations.add(Operations.remove(key));
}
operations.add(Operations.put(MASTER_KEY, ByteUtils.longToBytes(sentVersion)));
if (sentVersion == 0) {
return metaStorageMgr.invoke(
Conditions.notExists(MASTER_KEY),
operations,
Collections.singleton(Operations.noop()));
}
else {
return metaStorageMgr.invoke(
Conditions.revision(MASTER_KEY).eq(ver.get()),
operations,
Collections.singleton(Operations.noop()));
}
}
/** {@inheritDoc} */
@Override public synchronized void registerConfigurationListener(@NotNull ConfigurationStorageListener lsnr) {
if (this.lsnr == null) {
this.lsnr = lsnr;
// TODO: registerWatchByPrefix could throw OperationTimeoutException and CompactedException and we should
// TODO: properly handle such cases https://issues.apache.org/jira/browse/IGNITE-14604
metaStorageMgr.registerWatchByPrefix(MASTER_KEY, new WatchListener() {
@Override public boolean onUpdate(@NotNull WatchEvent events) {
HashMap<String, Serializable> data = new HashMap<>();
long maxRevision = 0L;
Entry entryForMasterKey = null;
for (EntryEvent event : events.entryEvents()) {
Entry e = event.newEntry();
if (!e.key().equals(MASTER_KEY)) {
data.put(e.key().toString().substring((DISTRIBUTED_PREFIX).length()),
(Serializable)ByteUtils.fromBytes(e.value()));
if (maxRevision < e.revision())
maxRevision = e.revision();
} else
entryForMasterKey = e;
}
// Contract of meta storage ensures that all updates of one revision will come in one batch.
// Also masterKey should be updated every time when we update cfg.
// That means that masterKey update must be included in the batch.
assert entryForMasterKey != null;
assert maxRevision == entryForMasterKey.revision();
assert maxRevision >= ver.get();
long finalMaxRevision = maxRevision;
DistributedConfigurationStorage.this.lsnr.onEntriesChanged(new Data(data, finalMaxRevision));
return true;
}
@Override public void onError(@NotNull Throwable e) {
// TODO: need to handle this case and there should some mechanism for registering new watch as far as
// TODO: onError unregisters failed watch https://issues.apache.org/jira/browse/IGNITE-14604
LOG.error("Meta storage listener issue", e);
}
});
} else
LOG.warn("Configuration listener has already been set.");
}
/** {@inheritDoc} */
@Override public synchronized void notifyApplied(long storageRevision) {
assert ver.get() <= storageRevision;
ver.set(storageRevision);
// TODO: Also we should persist version,
// TODO: this should be done when nodes restart is introduced.
// TODO: https://issues.apache.org/jira/browse/IGNITE-14697
}
/** {@inheritDoc} */
@Override public ConfigurationType type() {
return ConfigurationType.DISTRIBUTED;
}
/**
* Method that returns all distributed configuration keys from meta storage filtered out by the current applied
* revision as an upper bound. Applied revision is a revision of the last successful vault update.
* <p>
* This is possible to distinguish cfg keys from meta storage because we add special prefix {@link
* DistributedConfigurationStorage#DISTRIBUTED_PREFIX} to all configuration keys that we put to meta storage.
*
* @return Cursor built upon all distributed configuration entries.
*/
private Cursor<Entry> allDistributedConfigKeys() {
// TODO: rangeWithAppliedRevision could throw OperationTimeoutException and CompactedException and we should
// TODO: properly handle such cases https://issues.apache.org/jira/browse/IGNITE-14604
return metaStorageMgr.rangeWithAppliedRevision(MASTER_KEY, DST_KEYS_END_RANGE);
}
}