blob: 3ed9b5711949fb6e7725b520e045dd4edd46265b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.igfs;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteFileSystem;
import org.apache.ignite.cache.affinity.AffinityKeyMapper;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.mapreduce.IgfsJob;
import org.apache.ignite.igfs.mapreduce.IgfsRecordResolver;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.util.ipc.IpcServerEndpoint;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.lang.IgniteClosure;
import org.jetbrains.annotations.Nullable;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGFS;
/**
* Fully operational Ignite file system processor.
*/
public class IgfsProcessor extends IgfsProcessorAdapter {
/** Converts context to IGFS. */
private static final IgniteClosure<IgfsContext,IgniteFileSystem> CTX_TO_IGFS = new C1<IgfsContext, IgniteFileSystem>() {
@Override public IgniteFileSystem apply(IgfsContext igfsCtx) {
return igfsCtx.igfs();
}
};
/** */
private final ConcurrentMap<String, IgfsContext> igfsCache =
new ConcurrentHashMap<>();
/**
* @param ctx Kernal context.
*/
public IgfsProcessor(GridKernalContext ctx) {
super(ctx);
}
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
IgniteConfiguration igniteCfg = ctx.config();
if (igniteCfg.isDaemon())
return;
FileSystemConfiguration[] cfgs = igniteCfg.getFileSystemConfiguration();
assert cfgs != null && cfgs.length > 0;
// Start IGFS instances.
for (FileSystemConfiguration cfg : cfgs) {
assert cfg.getName() != null;
FileSystemConfiguration cfg0 = new FileSystemConfiguration(cfg);
boolean metaClient = true;
CacheConfiguration[] cacheCfgs = igniteCfg.getCacheConfiguration();
String metaCacheName = cfg.getMetaCacheConfiguration().getName();
if (cacheCfgs != null) {
for (CacheConfiguration cacheCfg : cacheCfgs) {
if (F.eq(cacheCfg.getName(), metaCacheName)) {
metaClient = false;
break;
}
}
}
if (igniteCfg.isClientMode() != null && igniteCfg.isClientMode())
metaClient = true;
IgfsContext igfsCtx = new IgfsContext(
ctx,
cfg0,
new IgfsMetaManager(cfg0.isRelaxedConsistency(), metaClient),
new IgfsDataManager(),
new IgfsServerManager(),
new IgfsFragmentizerManager());
// Start managers first.
for (IgfsManager mgr : igfsCtx.managers())
mgr.start(igfsCtx);
igfsCache.put(cfg0.getName(), igfsCtx);
}
if (log.isDebugEnabled())
log.debug("IGFS processor started.");
// Node doesn't have IGFS if it:
// is daemon;
// doesn't have configured IGFS;
// doesn't have configured caches.
if (igniteCfg.isDaemon() || F.isEmpty(igniteCfg.getFileSystemConfiguration()) ||
F.isEmpty(igniteCfg.getCacheConfiguration()))
return;
final Map<String, CacheConfiguration> cacheCfgs = new HashMap<>();
assert igniteCfg.getCacheConfiguration() != null;
for (CacheConfiguration ccfg : igniteCfg.getCacheConfiguration())
cacheCfgs.put(ccfg.getName(), ccfg);
Collection<IgfsAttributes> attrVals = new ArrayList<>();
assert igniteCfg.getFileSystemConfiguration() != null;
for (FileSystemConfiguration igfsCfg : igniteCfg.getFileSystemConfiguration()) {
String dataCacheName = igfsCfg.getDataCacheConfiguration().getName();
CacheConfiguration cacheCfg = cacheCfgs.get(dataCacheName);
if (cacheCfg == null)
continue; // No cache for the given IGFS configuration.
AffinityKeyMapper affMapper = cacheCfg.getAffinityMapper();
if (!(affMapper instanceof IgfsGroupDataBlocksKeyMapper))
// Do not create IGFS attributes for such a node nor throw error about invalid configuration.
// Configuration will be validated later, while starting IgfsProcessor.
continue;
attrVals.add(new IgfsAttributes(
igfsCfg.getName(),
igfsCfg.getBlockSize(),
((IgfsGroupDataBlocksKeyMapper)affMapper).getGroupSize(),
igfsCfg.getMetaCacheConfiguration().getName(),
dataCacheName,
igfsCfg.getDefaultMode(),
igfsCfg.getPathModes(),
igfsCfg.isFragmentizerEnabled()));
}
ctx.addNodeAttribute(ATTR_IGFS, attrVals.toArray(new IgfsAttributes[attrVals.size()]));
}
/** {@inheritDoc} */
@Override public void onKernalStart(boolean active) throws IgniteCheckedException {
if (!active || ctx.config().isDaemon())
return;
if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
for (ClusterNode n : ctx.discovery().remoteNodes())
checkIgfsOnRemoteNode(n);
}
for (IgfsContext igfsCtx : igfsCache.values())
for (IgfsManager mgr : igfsCtx.managers())
mgr.onKernalStart();
}
/** {@inheritDoc} */
@Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
onKernalStart(true);
}
/** {@inheritDoc} */
@Override public void onDeActivate(GridKernalContext kctx) {
onKernalStop(true);
}
/** {@inheritDoc} */
@Override public void stop(boolean cancel) {
// Stop IGFS instances.
for (IgfsContext igfsCtx : igfsCache.values()) {
if (log.isDebugEnabled())
log.debug("Stopping igfs: " + igfsCtx.configuration().getName());
List<IgfsManager> mgrs = igfsCtx.managers();
for (ListIterator<IgfsManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) {
IgfsManager mgr = it.previous();
mgr.stop(cancel);
}
igfsCtx.igfs().stop(cancel);
}
igfsCache.clear();
if (log.isDebugEnabled())
log.debug("IGFS processor stopped.");
}
/** {@inheritDoc} */
@Override public void onKernalStop(boolean cancel) {
for (IgfsContext igfsCtx : igfsCache.values()) {
if (log.isDebugEnabled())
log.debug("Stopping igfs: " + igfsCtx.configuration().getName());
List<IgfsManager> mgrs = igfsCtx.managers();
for (ListIterator<IgfsManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) {
IgfsManager mgr = it.previous();
mgr.onKernalStop(cancel);
}
}
if (log.isDebugEnabled())
log.debug("Finished executing IGFS processor onKernalStop() callback.");
}
/** {@inheritDoc} */
@Override public void printMemoryStats() {
X.println(">>>");
X.println(">>> IGFS processor memory stats [igniteInstanceName=" + ctx.igniteInstanceName() + ']');
X.println(">>> igfsCacheSize: " + igfsCache.size());
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public Collection<IgniteFileSystem> igfss() {
return F.viewReadOnly(igfsCache.values(), CTX_TO_IGFS);
}
/** {@inheritDoc} */
@Override @Nullable public IgniteFileSystem igfs(String name) {
if (name == null)
throw new IllegalArgumentException("IGFS name cannot be null");
IgfsContext igfsCtx = igfsCache.get(name);
return igfsCtx == null ? null : igfsCtx.igfs();
}
/** {@inheritDoc} */
@Override @Nullable public Collection<IpcServerEndpoint> endpoints(String name) {
if (name == null)
throw new IllegalArgumentException("IGFS name cannot be null");
IgfsContext igfsCtx = igfsCache.get(name);
return igfsCtx == null ? Collections.<IpcServerEndpoint>emptyList() : igfsCtx.server().endpoints();
}
/** {@inheritDoc} */
@Nullable @Override public ComputeJob createJob(IgfsJob job, @Nullable String igfsName, IgfsPath path,
long start, long len, IgfsRecordResolver recRslv) {
return new IgfsJobImpl(job, igfsName, path, start, len, recRslv);
}
/**
* Check IGFS config on remote node.
*
* @param rmtNode Remote node.
* @throws IgniteCheckedException If check failed.
*/
private void checkIgfsOnRemoteNode(ClusterNode rmtNode) throws IgniteCheckedException {
IgfsAttributes[] locAttrs = ctx.discovery().localNode().attribute(IgniteNodeAttributes.ATTR_IGFS);
IgfsAttributes[] rmtAttrs = rmtNode.attribute(IgniteNodeAttributes.ATTR_IGFS);
if (F.isEmpty(locAttrs) || F.isEmpty(rmtAttrs))
return;
assert rmtAttrs != null && locAttrs != null;
for (IgfsAttributes rmtAttr : rmtAttrs)
for (IgfsAttributes locAttr : locAttrs) {
// Checking the use of different caches on the different IGFSes.
if (!F.eq(rmtAttr.igfsName(), locAttr.igfsName())) {
if (F.eq(rmtAttr.metaCacheName(), locAttr.metaCacheName()))
throw new IgniteCheckedException("Meta cache names should be different for different IGFS instances " +
"configuration (fix configuration or set " +
"-D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system " +
"property) [metaCacheName=" + rmtAttr.metaCacheName() +
", locNodeId=" + ctx.localNodeId() +
", rmtNodeId=" + rmtNode.id() +
", locIgfsName=" + locAttr.igfsName() +
", rmtIgfsName=" + rmtAttr.igfsName() + ']');
if (F.eq(rmtAttr.dataCacheName(), locAttr.dataCacheName()))
throw new IgniteCheckedException("Data cache names should be different for different IGFS instances " +
"configuration (fix configuration or set " +
"-D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system " +
"property)[dataCacheName=" + rmtAttr.dataCacheName() +
", locNodeId=" + ctx.localNodeId() +
", rmtNodeId=" + rmtNode.id() +
", locIgfsName=" + locAttr.igfsName() +
", rmtIgfsName=" + rmtAttr.igfsName() + ']');
continue;
}
// Compare other attributes only for IGFSes with same name.
checkSame("Data block size", "BlockSize", rmtNode.id(), rmtAttr.blockSize(),
locAttr.blockSize(), rmtAttr.igfsName());
checkSame("Affinity mapper group size", "GrpSize", rmtNode.id(), rmtAttr.groupSize(),
locAttr.groupSize(), rmtAttr.igfsName());
checkSame("Meta cache name", "MetaCacheName", rmtNode.id(), rmtAttr.metaCacheName(),
locAttr.metaCacheName(), rmtAttr.igfsName());
checkSame("Data cache name", "DataCacheName", rmtNode.id(), rmtAttr.dataCacheName(),
locAttr.dataCacheName(), rmtAttr.igfsName());
checkSame("Default mode", "DefaultMode", rmtNode.id(), rmtAttr.defaultMode(),
locAttr.defaultMode(), rmtAttr.igfsName());
checkSame("Path modes", "PathModes", rmtNode.id(), rmtAttr.pathModes(),
locAttr.pathModes(), rmtAttr.igfsName());
checkSame("Fragmentizer enabled", "FragmentizerEnabled", rmtNode.id(), rmtAttr.fragmentizerEnabled(),
locAttr.fragmentizerEnabled(), rmtAttr.igfsName());
}
}
/**
* Check IGFS property equality on local and remote nodes.
*
* @param name Property human readable name.
* @param propName Property name/
* @param rmtNodeId Remote node ID.
* @param rmtVal Remote value.
* @param locVal Local value.
* @param igfsName IGFS name.
*
* @throws IgniteCheckedException If failed.
*/
private void checkSame(String name, String propName, UUID rmtNodeId, Object rmtVal, Object locVal, String igfsName)
throws IgniteCheckedException {
if (!F.eq(rmtVal, locVal))
throw new IgniteCheckedException(name + " should be the same on all nodes in grid for IGFS configuration " +
"(fix configuration or set " +
"-D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system " +
"property ) [rmtNodeId=" + rmtNodeId +
", rmt" + propName + "=" + rmtVal +
", loc" + propName + "=" + locVal +
", ggfName=" + igfsName + ']');
}
}