| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.persistence; |
| |
| import java.util.List; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteDataStreamer; |
| import org.apache.ignite.cache.CacheAtomicityMode; |
| import org.apache.ignite.cache.affinity.AffinityFunctionContext; |
| import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.DataRegionConfiguration; |
| import org.apache.ignite.configuration.DataStorageConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.events.DiscoveryEvent; |
| import org.apache.ignite.internal.IgniteEx; |
| import org.apache.ignite.internal.pagemem.wal.WALIterator; |
| import org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord; |
| import org.apache.ignite.internal.pagemem.wal.record.WALRecord; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; |
| import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteBiTuple; |
| import org.apache.ignite.testframework.junits.WithSystemProperty; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.jetbrains.annotations.Nullable; |
| import org.junit.Test; |
| |
| import static java.lang.String.valueOf; |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING; |
| import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.METASTORE_DATA_RECORD; |
| import static org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId; |
| import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR; |
| |
| /** |
| * |
| */ |
| @WithSystemProperty(key = IGNITE_DISABLE_WAL_DURING_REBALANCING, value = "true") |
| public class LocalWalModeNoChangeDuringRebalanceOnNonNodeAssignTest extends GridCommonAbstractTest { |
| /** */ |
| private static final int NODES = 3; |
| |
| /** */ |
| private CacheAtomicityMode atomicityMode; |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String name) throws Exception { |
| IgniteConfiguration cfg = super.getConfiguration(name); |
| |
| cfg.setConsistentId(name); |
| |
| cfg.setDataStorageConfiguration( |
| new DataStorageConfiguration() |
| .setWalPath(walPath(name)) |
| .setWalArchivePath(walArchivePath(name)) |
| .setDefaultDataRegionConfiguration( |
| new DataRegionConfiguration() |
| .setMaxSize(DataStorageConfiguration.DFLT_DATA_REGION_INITIAL_SIZE) |
| .setPersistenceEnabled(true) |
| ) |
| ); |
| |
| cfg.setCacheConfiguration( |
| new CacheConfiguration(DEFAULT_CACHE_NAME) |
| .setAtomicityMode(atomicityMode) |
| .setAffinity(new CustomAffinityFunction(getTestIgniteInstanceName(NODES))) |
| ); |
| |
| return cfg; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTest() throws Exception { |
| super.beforeTest(); |
| |
| cleanPersistenceDir(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void afterTest() throws Exception { |
| super.afterTest(); |
| |
| stopAllGrids(); |
| |
| cleanPersistenceDir(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testAtomic() throws Exception { |
| atomicityMode = CacheAtomicityMode.ATOMIC; |
| |
| check(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testTx() throws Exception { |
| atomicityMode = CacheAtomicityMode.TRANSACTIONAL; |
| |
| check(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| public void check() throws Exception { |
| Ignite ig = startGridsMultiThreaded(NODES); |
| |
| int entries = 100_000; |
| |
| try (IgniteDataStreamer<Integer, Integer> st = ig.dataStreamer(DEFAULT_CACHE_NAME)) { |
| st.allowOverwrite(true); |
| |
| for (int i = 0; i < entries; i++) |
| st.addData(i, -i); |
| } |
| |
| IgniteEx ig4 = startGrid(NODES); |
| |
| ig4.cluster().setBaselineTopology(ig4.context().discovery().topologyVersion()); |
| |
| IgniteWalIteratorFactory iterFactory = new IgniteWalIteratorFactory(log); |
| |
| String name = ig4.name(); |
| |
| try (WALIterator it = iterFactory.iterator(walPath(name), walArchivePath(name))) { |
| while (it.hasNext()) { |
| IgniteBiTuple<WALPointer, WALRecord> tup = it.next(); |
| |
| WALRecord rec = tup.get2(); |
| |
| if (rec.type() == METASTORE_DATA_RECORD) { |
| MetastoreDataRecord metastoreDataRecord = (MetastoreDataRecord)rec; |
| |
| String key = metastoreDataRecord.key(); |
| |
| if (key.startsWith("grp-wal-") && |
| key.contains(valueOf(cacheId(DEFAULT_CACHE_NAME))) && |
| metastoreDataRecord.value() != null) |
| fail("WAL was disabled but should not."); |
| } |
| } |
| } |
| } |
| |
| /** |
| * |
| * @param nodeName Node name. |
| * @return Path to WAL work directory. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private String walPath(String nodeName) throws IgniteCheckedException { |
| String workDir = U.defaultWorkDirectory(); |
| |
| return workDir + "/" + DFLT_STORE_DIR + "/" + nodeName + "/wal"; |
| } |
| |
| /** |
| * |
| * @param nodeName Node name. |
| * @return Path to WAL archive directory. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private String walArchivePath(String nodeName) throws IgniteCheckedException { |
| String workDir = U.defaultWorkDirectory(); |
| |
| return workDir + "/" + DFLT_STORE_DIR + "/" + nodeName + "/walArchive"; |
| } |
| |
| /** |
| * Custom affinity function which does not allow to assign partitions to the node with id equals to {@code NODES}. |
| */ |
| public static class CustomAffinityFunction extends RendezvousAffinityFunction { |
| /** Consistent id of a node that should not be assigned as primary/backup node. */ |
| private final String consistentId; |
| |
| /** |
| * Creates a new instance of CustomAffinityFunction. |
| */ |
| public CustomAffinityFunction(String consistentId) { |
| super(false, NODES); |
| this.consistentId = consistentId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public List<List<ClusterNode>> assignPartitions(final AffinityFunctionContext affCtx) { |
| AffinityFunctionContext proxy = new AffinityFunctionContext() { |
| /** {@inheritDoc} */ |
| @Override public @Nullable List<ClusterNode> previousAssignment(int part) { |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int backups() { |
| return affCtx.backups(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public List<ClusterNode> currentTopologySnapshot() { |
| List<ClusterNode> nodes = affCtx.currentTopologySnapshot(); |
| |
| nodes.removeIf(n -> n.consistentId().toString().equals(consistentId)); |
| |
| return nodes; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public AffinityTopologyVersion currentTopologyVersion() { |
| return affCtx.currentTopologyVersion(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @Nullable DiscoveryEvent discoveryEvent() { |
| return affCtx.discoveryEvent(); |
| } |
| }; |
| |
| return super.assignPartitions(proxy); |
| } |
| } |
| } |