blob: 0720d55792be0cc64da36a28f848e2323621887f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.igfs;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
import org.apache.ignite.igfs.IgfsInputStream;
import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
import org.apache.ignite.igfs.IgfsIpcEndpointType;
import org.apache.ignite.igfs.IgfsMode;
import org.apache.ignite.igfs.IgfsOutputStream;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
import static org.apache.ignite.igfs.IgfsMode.PROXY;
/**
* IGFS modes self test.
*/
public class IgfsModesSelfTest extends IgfsCommonAbstractTest {
/** Grid instance hosting primary IGFS. */
private IgniteEx grid;
/** Primary IGFS. */
private IgfsImpl igfs;
/** Secondary IGFS. */
private IgfsImpl igfsSecondary;
/** Default IGFS mode. */
private IgfsMode mode;
/** Modes map. */
private Map<String, IgfsMode> pathModes;
/** Whether to set "null" mode. */
private boolean setNullMode;
/** Whether to set secondary file system URI. */
private boolean setSecondaryFs;
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
mode = null;
pathModes = null;
setNullMode = false;
setSecondaryFs = false;
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
G.stopAll(true);
}
/**
* Perform initial startup.
*
* @throws Exception If failed.
*/
@SuppressWarnings("NullableProblems")
private void startUp() throws Exception {
startUpSecondary();
FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
igfsCfg.setName("igfs");
igfsCfg.setBlockSize(512 * 1024);
if (setNullMode)
igfsCfg.setDefaultMode(null);
else if (mode != null)
igfsCfg.setDefaultMode(mode);
igfsCfg.setPathModes(pathModes);
if (setSecondaryFs)
igfsCfg.setSecondaryFileSystem(igfsSecondary.asSecondary());
CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
dataCacheCfg.setCacheMode(PARTITIONED);
dataCacheCfg.setNearConfiguration(null);
dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128));
dataCacheCfg.setBackups(0);
dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
metaCacheCfg.setCacheMode(REPLICATED);
metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
igfsCfg.setMetaCacheConfiguration(metaCacheCfg);
igfsCfg.setDataCacheConfiguration(dataCacheCfg);
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setIgniteInstanceName("igfs-grid");
TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
cfg.setDiscoverySpi(discoSpi);
cfg.setFileSystemConfiguration(igfsCfg);
cfg.setLocalHost("127.0.0.1");
cfg.setConnectorConfiguration(null);
grid = (IgniteEx)G.start(cfg);
igfs = (IgfsImpl)grid.fileSystem("igfs");
}
/**
* Startup secondary file system.
*
* @throws Exception If failed.
*/
private void startUpSecondary() throws Exception {
FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
igfsCfg.setName("igfs-secondary");
igfsCfg.setBlockSize(512 * 1024);
igfsCfg.setDefaultMode(PRIMARY);
IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration();
endpointCfg.setType(IgfsIpcEndpointType.TCP);
endpointCfg.setPort(11500);
CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
dataCacheCfg.setCacheMode(PARTITIONED);
dataCacheCfg.setNearConfiguration(null);
dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128));
dataCacheCfg.setBackups(0);
dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
metaCacheCfg.setCacheMode(REPLICATED);
metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
igfsCfg.setMetaCacheConfiguration(metaCacheCfg);
igfsCfg.setDataCacheConfiguration(dataCacheCfg);
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setIgniteInstanceName("igfs-grid-secondary");
TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
cfg.setDiscoverySpi(discoSpi);
cfg.setFileSystemConfiguration(igfsCfg);
cfg.setLocalHost("127.0.0.1");
cfg.setConnectorConfiguration(null);
igfsSecondary = (IgfsImpl)G.start(cfg).fileSystem("igfs-secondary");
}
/**
* Set IGFS modes for particular paths.
*
* @param modes Modes.
*/
@SafeVarargs
final void pathModes(IgniteBiTuple<String, IgfsMode>... modes) {
assert modes != null;
pathModes = new LinkedHashMap<>(modes.length, 1.0f);
for (IgniteBiTuple<String, IgfsMode> mode : modes)
pathModes.put(mode.getKey(), mode.getValue());
}
/**
* Ensure that DUAL_ASYNC mode is set by default.
*
* @throws Exception If failed.
*/
public void testModeDefaultIsNotSet() throws Exception {
setSecondaryFs = true;
startUp();
checkMode("/dir", DUAL_ASYNC);
}
/**
* Ensure that when mode is set, it is correctly resolved.
*
* @throws Exception If failed.
*/
public void testModeDefaultIsSet() throws Exception {
mode = DUAL_SYNC;
setSecondaryFs = true;
startUp();
checkMode("/dir", DUAL_SYNC);
}
/**
* Ensure that Grid doesn't start in case default mode is SECONDARY and secondary FS URI is not provided.
*
* @throws Exception If failed.
*/
public void testModeSecondaryNoUri() throws Exception {
mode = PROXY;
String errMsg = null;
try {
startUp();
}
catch (IgniteException e) {
errMsg = e.getCause().getMessage();
}
assertTrue(errMsg.startsWith(
"Grid configuration parameter invalid: secondaryFileSystem cannot be null when mode is not PRIMARY"));
}
/**
* Ensure that modes are resolved correctly when path modes are set.
*
* @throws Exception If failed.
*/
public void testPathMode() throws Exception {
pathModes(F.t("/dir1", PROXY), F.t("/dir2", DUAL_SYNC),
F.t("/dir3", PRIMARY), F.t("/dir4", PRIMARY));
mode = DUAL_ASYNC;
setSecondaryFs = true;
startUp();
checkMode("/dir", DUAL_ASYNC);
checkMode("/dir1", PROXY);
checkMode("/dir2", DUAL_SYNC);
checkMode("/dir3", PRIMARY);
checkMode("/somedir/dir3", DUAL_ASYNC);
checkMode("/dir4", PRIMARY);
checkMode("/dir4/subdir", PRIMARY);
checkMode("/somedir/dir4", DUAL_ASYNC);
checkMode("/somedir/dir4/subdir", DUAL_ASYNC);
}
/**
* Ensure that path modes switch to PRIMARY in case secondary FS config is not provided.
*
* @throws Exception If failed.
*/
public void testPathModeSwitchToPrimary() throws Exception {
mode = DUAL_SYNC;
pathModes(F.t("/dir1", PRIMARY), F.t("/dir2", DUAL_SYNC));
startUp();
checkMode("/dir", PRIMARY);
checkMode("/dir1", PRIMARY);
checkMode("/dir2", PRIMARY);
}
/**
* Ensure that Grid doesn't start in case path mode is SECONDARY and secondary FS config path is not provided.
*
* @throws Exception If failed.
*/
public void testPathModeSecondaryNoCfg() throws Exception {
pathModes(F.t("dir", PROXY));
String errMsg = null;
try {
startUp();
}
catch (IgniteException e) {
errMsg = e.getCause().getMessage();
}
assertTrue(errMsg.startsWith(
"Grid configuration parameter invalid: secondaryFileSystem cannot be null when mode is not PRIMARY"));
}
/**
* Ensure that data is not propagated to the secondary IGFS in PRIMARY mode.
*
* @throws Exception If failed.
*/
public void testPropagationPrimary() throws Exception {
mode = PRIMARY;
checkPropagation();
}
/**
* Ensure that data is propagated to the secondary IGFS in DUAL_SYNC mode.
*
* @throws Exception If failed.
*/
public void testPropagationDualSync() throws Exception {
mode = DUAL_SYNC;
checkPropagation();
}
/**
* Ensure that data is propagated to the secondary IGFS in DUAL_SYNC mode.
*
* @throws Exception If failed.
*/
public void testPropagationDualAsync() throws Exception {
mode = DUAL_ASYNC;
checkPropagation();
}
/**
* Resolve IGFS mode for the given path and compare it with expected one.
*
* @param pathStr Path ot resolve.
* @param expMode Expected mode.
* @throws Exception If failed.
*/
private void checkMode(String pathStr, IgfsMode expMode) throws Exception {
assert igfs != null;
IgfsPath path = new IgfsPath(pathStr);
IgfsModeResolver rslvr = igfs.modeResolver();
IgfsMode mode = rslvr.resolveMode(path);
assertEquals(expMode, mode);
}
/**
* Check propagation of various operations to secondary file system.
*
* @throws Exception If failed.
*/
private void checkPropagation() throws Exception {
byte[] testData1 = new byte[] {0, 1, 2, 3, 4, 5, 6, 7};
byte[] testData2 = new byte[] {8, 9, 10, 11};
byte[] testData = Arrays.copyOf(testData1, testData1.length + testData2.length);
U.arrayCopy(testData2, 0, testData, testData1.length, testData2.length);
setSecondaryFs = true;
startUp();
boolean primaryNotUsed = mode == PROXY;
boolean secondaryUsed = mode != PRIMARY;
IgfsPath dir = new IgfsPath("/dir");
IgfsPath file = new IgfsPath("/dir/file");
// Create new directory.
igfs.mkdirs(dir);
// Create new file.
IgfsOutputStream os = igfs.create(file, 1024, true, null, 0, 2048, null);
os.write(testData1);
os.close();
// Re-open it and append.
os = igfs.append(file, 1024, false, null);
os.write(testData2);
os.close();
// Check file content.
IgfsInputStream is = igfs.open(file);
assertEquals(testData.length, is.length());
byte[] data = new byte[testData.length];
is.read(data, 0, testData.length);
is.close();
assert Arrays.equals(testData, data);
if (secondaryUsed) {
assert igfsSecondary.exists(dir);
assert igfsSecondary.exists(file);
// In ASYNC mode we wait at most 2 seconds for background writer to finish.
for (int i = 0; i < 20; i++) {
IgfsInputStream isSecondary = null;
try {
isSecondary = igfsSecondary.open(file);
if (isSecondary.length() == testData.length)
break;
else
U.sleep(100);
}
finally {
U.closeQuiet(isSecondary);
}
}
IgfsInputStream isSecondary = igfsSecondary.open(file);
assertEquals(testData.length, isSecondary.length());
isSecondary.read(data, 0, testData.length);
assert Arrays.equals(testData, data);
}
else {
assert !igfsSecondary.exists(dir);
assert !igfsSecondary.exists(file);
}
int cacheSize = grid.cachex(grid.igfsx("igfs").configuration().getDataCacheConfiguration()
.getName()).size();
if (primaryNotUsed)
assert cacheSize == 0;
else
assert cacheSize != 0;
// Now delete all.
igfs.delete(dir, true);
assert !igfs.exists(dir);
assert !igfs.exists(file);
assert !igfsSecondary.exists(dir);
assert !igfsSecondary.exists(file);
}
}