blob: e69e3f04129fd43ee776a851a6fd5842a41fe10c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.hadoop.impl;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.hadoop.HadoopInputSplit;
import org.apache.ignite.hadoop.HadoopMapReducePlan;
import org.apache.ignite.hadoop.mapreduce.IgniteHadoopWeightedMapReducePlanner;
import org.apache.ignite.hadoop.planner.HadoopAbstractMapReducePlanner;
import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
import org.apache.ignite.internal.processors.igfs.IgfsIgniteMock;
import org.apache.ignite.internal.processors.igfs.IgfsMock;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
/**
* Tests for weighted map-reduce planned.
*/
public class HadoopWeightedMapReducePlannerTest extends GridCommonAbstractTest {
/** ID 1. */
private static final UUID ID_1 = new UUID(0, 1);
/** ID 2. */
private static final UUID ID_2 = new UUID(0, 2);
/** ID 3. */
private static final UUID ID_3 = new UUID(0, 3);
/** MAC 1. */
private static final String MAC_1 = "mac1";
/** MAC 2. */
private static final String MAC_2 = "mac2";
/** MAC 3. */
private static final String MAC_3 = "mac3";
/** Host 1. */
private static final String HOST_1 = "host1";
/** Host 2. */
private static final String HOST_2 = "host2";
/** Host 3. */
private static final String HOST_3 = "host3";
/** Host 4. */
private static final String HOST_4 = "host4";
/** Host 5. */
private static final String HOST_5 = "host5";
/** Standard node 1. */
private static final MockNode NODE_1 = new MockNode(ID_1, MAC_1, HOST_1);
/** Standard node 2. */
private static final MockNode NODE_2 = new MockNode(ID_2, MAC_2, HOST_2);
/** Standard node 3. */
private static final MockNode NODE_3 = new MockNode(ID_3, MAC_3, HOST_3);
/** Standard nodes. */
private static final Collection<ClusterNode> NODES;
/**
* Static initializer.
*/
static {
NODES = new ArrayList<>();
NODES.add(NODE_1);
NODES.add(NODE_2);
NODES.add(NODE_3);
}
/**
* Test one IGFS split being assigned to affinity node.
*
* @throws Exception If failed.
*/
@Test
public void testOneIgfsSplitAffinity() throws Exception {
IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs();
List<HadoopInputSplit> splits = new ArrayList<>();
splits.add(new HadoopFileBlock(new String[] { HOST_1 }, URI.create("igfs://igfs@/file"), 0, 50));
final int expReducers = 4;
HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, expReducers);
IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs);
HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null);
assert plan.mappers() == 1;
assert plan.mapperNodeIds().size() == 1;
assert plan.mapperNodeIds().contains(ID_1);
checkPlanMappers(plan, splits, NODES, false/*only 1 split*/);
checkPlanReducers(plan, NODES, expReducers, false/* because of threshold behavior.*/);
}
/**
* Test one HDFS splits.
*
* @throws Exception If failed.
*/
@Test
public void testHdfsSplitsAffinity() throws Exception {
IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs();
final List<HadoopInputSplit> splits = new ArrayList<>();
splits.add(new HadoopFileBlock(new String[] { HOST_1 }, URI.create("hfds://" + HOST_1 + "/x"), 0, 50));
splits.add(new HadoopFileBlock(new String[] { HOST_2 }, URI.create("hfds://" + HOST_2 + "/x"), 50, 100));
splits.add(new HadoopFileBlock(new String[] { HOST_3 }, URI.create("hfds://" + HOST_3 + "/x"), 100, 37));
// The following splits belong to hosts that are out of Ignite topology at all.
// This means that these splits should be assigned to any least loaded modes:
splits.add(new HadoopFileBlock(new String[] { HOST_4 }, URI.create("hfds://" + HOST_4 + "/x"), 138, 2));
splits.add(new HadoopFileBlock(new String[] { HOST_5 }, URI.create("hfds://" + HOST_5 + "/x"), 140, 3));
final int expReducers = 7;
HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, expReducers);
IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs);
final HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null);
checkPlanMappers(plan, splits, NODES, true);
checkPlanReducers(plan, NODES, expReducers, true);
}
/**
* Test HDFS splits with Replication == 3.
*
* @throws Exception If failed.
*/
@Test
public void testHdfsSplitsReplication() throws Exception {
IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs();
final List<HadoopInputSplit> splits = new ArrayList<>();
splits.add(new HadoopFileBlock(new String[] { HOST_1, HOST_2, HOST_3 }, URI.create("hfds://" + HOST_1 + "/x"), 0, 50));
splits.add(new HadoopFileBlock(new String[] { HOST_2, HOST_3, HOST_4 }, URI.create("hfds://" + HOST_2 + "/x"), 50, 100));
splits.add(new HadoopFileBlock(new String[] { HOST_3, HOST_4, HOST_5 }, URI.create("hfds://" + HOST_3 + "/x"), 100, 37));
// The following splits belong to hosts that are out of Ignite topology at all.
// This means that these splits should be assigned to any least loaded modes:
splits.add(new HadoopFileBlock(new String[] { HOST_4, HOST_5, HOST_1 }, URI.create("hfds://" + HOST_4 + "/x"), 138, 2));
splits.add(new HadoopFileBlock(new String[] { HOST_5, HOST_1, HOST_2 }, URI.create("hfds://" + HOST_5 + "/x"), 140, 3));
final int expReducers = 8;
HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, expReducers);
IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs);
final HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null);
checkPlanMappers(plan, splits, NODES, true);
checkPlanReducers(plan, NODES, expReducers, true);
}
/**
* Get all IDs.
*
* @param nodes Nodes.
* @return IDs.
*/
private static Set<UUID> allIds(Collection<ClusterNode> nodes) {
Set<UUID> allIds = new HashSet<>();
for (ClusterNode n : nodes)
allIds.add(n.id());
return allIds;
}
/**
* Check mappers for the plan.
*
* @param plan Plan.
* @param splits Splits.
* @param nodes Nodes.
* @param expectUniformity WHether uniformity is expected.
*/
private static void checkPlanMappers(HadoopMapReducePlan plan, List<HadoopInputSplit> splits,
Collection<ClusterNode> nodes, boolean expectUniformity) {
// Number of mappers should correspomd to the number of input splits:
assertEquals(splits.size(), plan.mappers());
if (expectUniformity) {
// mappers are assigned to all available nodes:
assertEquals(nodes.size(), plan.mapperNodeIds().size());
assertEquals(allIds(nodes), plan.mapperNodeIds());
}
// Check all splits are covered by mappers:
Set<HadoopInputSplit> set = new HashSet<>();
for (UUID id: plan.mapperNodeIds()) {
Collection<HadoopInputSplit> sp = plan.mappers(id);
assert sp != null;
for (HadoopInputSplit s: sp)
assertTrue(set.add(s));
}
// must be of the same size & contain same elements:
assertEquals(set, new HashSet<>(splits));
}
/**
* Check plan reducers.
*
* @param plan Plan.
* @param nodes Nodes.
* @param expReducers Expected reducers.
* @param expectUniformity Expected uniformity.
*/
private static void checkPlanReducers(HadoopMapReducePlan plan,
Collection<ClusterNode> nodes, int expReducers, boolean expectUniformity) {
assertEquals(expReducers, plan.reducers());
if (expectUniformity)
assertEquals(allIds(nodes), plan.reducerNodeIds());
int sum = 0;
int lenSum = 0;
for (UUID uuid: plan.reducerNodeIds()) {
int[] rr = plan.reducers(uuid);
assert rr != null;
lenSum += rr.length;
for (int i: rr)
sum += i;
}
assertEquals(expReducers, lenSum);
// Numbers in the arrays must be consequtive integers stating from 0,
// check that simply calculating their total sum:
assertEquals((lenSum * (lenSum - 1) / 2), sum);
}
/**
* Create planner for IGFS.
*
* @param igfs IGFS.
* @return Planner.
*/
private static IgniteHadoopWeightedMapReducePlanner createPlanner(IgfsMock igfs) {
IgniteHadoopWeightedMapReducePlanner planner = new IgniteHadoopWeightedMapReducePlanner();
IgfsIgniteMock ignite = new IgfsIgniteMock(null, igfs);
GridTestUtils.setFieldValue(planner, HadoopAbstractMapReducePlanner.class, "ignite", ignite);
return planner;
}
/**
* Throw {@link UnsupportedOperationException}.
*/
private static void throwUnsupported() {
throw new UnsupportedOperationException("Should not be called!");
}
/**
* Mocked node.
*/
private static class MockNode implements ClusterNode {
/** ID. */
private final UUID id;
/** MAC addresses. */
private final String macs;
/** Addresses. */
private final List<String> addrs;
/**
* Constructor.
*
* @param id Node ID.
* @param macs MAC addresses.
* @param addrs Addresses.
*/
public MockNode(UUID id, String macs, String... addrs) {
assert addrs != null;
this.id = id;
this.macs = macs;
this.addrs = Arrays.asList(addrs);
}
/** {@inheritDoc} */
@Override public UUID id() {
return id;
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Nullable @Override public <T> T attribute(String name) {
if (F.eq(name, IgniteNodeAttributes.ATTR_MACS))
return (T)macs;
throwUnsupported();
return null;
}
/** {@inheritDoc} */
@Override public Collection<String> addresses() {
return addrs;
}
/** {@inheritDoc} */
@Override public Object consistentId() {
throwUnsupported();
return null;
}
/** {@inheritDoc} */
@Override public ClusterMetrics metrics() {
throwUnsupported();
return null;
}
/** {@inheritDoc} */
@Override public Map<String, Object> attributes() {
throwUnsupported();
return null;
}
/** {@inheritDoc} */
@Override public Collection<String> hostNames() {
throwUnsupported();
return null;
}
/** {@inheritDoc} */
@Override public long order() {
throwUnsupported();
return 0;
}
/** {@inheritDoc} */
@Override public IgniteProductVersion version() {
throwUnsupported();
return null;
}
/** {@inheritDoc} */
@Override public boolean isLocal() {
throwUnsupported();
return false;
}
/** {@inheritDoc} */
@Override public boolean isDaemon() {
throwUnsupported();
return false;
}
/** {@inheritDoc} */
@Override public boolean isClient() {
throwUnsupported();
return false;
}
}
/**
* Locations builder.
*/
private static class LocationsBuilder {
/** Locations. */
private final TreeMap<Long, Collection<MockNode>> locs = new TreeMap<>();
/**
* Create new locations builder.
*
* @return Locations builder.
*/
public static LocationsBuilder create() {
return new LocationsBuilder();
}
/**
* Add locations.
*
* @param start Start.
* @param nodes Nodes.
* @return This builder for chaining.
*/
public LocationsBuilder add(long start, MockNode... nodes) {
locs.put(start, Arrays.asList(nodes));
return this;
}
/**
* Build locations.
*
* @return Locations.
*/
public TreeMap<Long, Collection<MockNode>> build() {
return locs;
}
/**
* Build IGFS.
*
* @return IGFS.
*/
public MockIgfs buildIgfs() {
return new MockIgfs(build());
}
}
/**
* Mocked IGFS.
*/
private static class MockIgfs extends IgfsMock {
/** Block locations. */
private final TreeMap<Long, Collection<MockNode>> locs;
/**
* Constructor.
*
* @param locs Block locations.
*/
public MockIgfs(TreeMap<Long, Collection<MockNode>> locs) {
super("igfs");
this.locs = locs;
}
/** {@inheritDoc} */
@Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) {
Collection<IgfsBlockLocation> res = new ArrayList<>();
long cur = start;
long remaining = len;
long prevLocStart = -1;
Collection<MockNode> prevLocNodes = null;
for (Map.Entry<Long, Collection<MockNode>> locEntry : locs.entrySet()) {
long locStart = locEntry.getKey();
Collection<MockNode> locNodes = locEntry.getValue();
if (prevLocNodes != null) {
if (cur < locStart) {
// Add part from previous block.
long prevLen = locStart - prevLocStart;
res.add(new IgfsBlockLocationMock(cur, prevLen, prevLocNodes));
cur = locStart;
remaining -= prevLen;
}
}
prevLocStart = locStart;
prevLocNodes = locNodes;
if (remaining == 0)
break;
}
// Add remainder.
if (remaining != 0)
res.add(new IgfsBlockLocationMock(cur, remaining, prevLocNodes));
return res;
}
/** {@inheritDoc} */
@Override public boolean exists(IgfsPath path) {
return true;
}
/** {@inheritDoc} */
@Override public boolean isProxy(URI path) {
return false;
}
}
/**
* Mocked block location.
*/
private static class IgfsBlockLocationMock implements IgfsBlockLocation {
/** Start. */
private final long start;
/** Length. */
private final long len;
/** Node IDs. */
private final List<UUID> nodeIds;
/**
* Constructor.
*
* @param start Start.
* @param len Length.
* @param nodes Nodes.
*/
public IgfsBlockLocationMock(long start, long len, Collection<MockNode> nodes) {
this.start = start;
this.len = len;
this.nodeIds = new ArrayList<>(nodes.size());
for (MockNode node : nodes)
nodeIds.add(node.id);
}
/** {@inheritDoc} */
@Override public long start() {
return start;
}
/** {@inheritDoc} */
@Override public long length() {
return len;
}
/** {@inheritDoc} */
@Override public Collection<UUID> nodeIds() {
return nodeIds;
}
/** {@inheritDoc} */
@Override public Collection<String> names() {
throwUnsupported();
return null;
}
/** {@inheritDoc} */
@Override public Collection<String> hosts() {
throwUnsupported();
return null;
}
}
}