blob: 663d3154dddb0cc4f6f06cf3e329ebcfa613c67d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.spi.balancer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl;
import org.apache.accumulo.core.manager.balancer.TServerStatusImpl;
import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl;
import org.apache.accumulo.core.manager.balancer.TabletStatisticsImpl;
import org.apache.accumulo.core.master.thrift.TableInfo;
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
import org.apache.accumulo.core.spi.balancer.data.TabletStatistics;
import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
import org.apache.hadoop.io.Text;
import org.junit.Before;
import org.junit.Test;
public class SimpleLoadBalancerTest {
static class FakeTServer {
List<TabletId> tablets = new ArrayList<>();
TServerStatus getStatus() {
org.apache.accumulo.core.master.thrift.TabletServerStatus result =
new org.apache.accumulo.core.master.thrift.TabletServerStatus();
result.tableMap = new HashMap<>();
for (TabletId tabletId : tablets) {
TableInfo info = result.tableMap.get(tabletId.getTable().canonical());
if (info == null)
result.tableMap.put(tabletId.getTable().canonical(), info = new TableInfo());
info.onlineTablets++;
info.recs = info.onlineTablets;
info.ingestRate = 123.;
info.queryRate = 456.;
}
return new TServerStatusImpl(result);
}
}
Map<TabletServerId,FakeTServer> servers = new HashMap<>();
Map<TabletId,TabletServerId> last = new HashMap<>();
class TestSimpleLoadBalancer extends SimpleLoadBalancer {
@Override
protected List<TabletStatistics> getOnlineTabletsForTable(TabletServerId tserver,
TableId tableId) {
List<TabletStatistics> result = new ArrayList<>();
for (TabletId tabletId : servers.get(tserver).tablets) {
if (tabletId.getTable().equals(tableId)) {
KeyExtent extent = new KeyExtent(tableId, tabletId.getEndRow(), tabletId.getPrevEndRow());
TabletStats stats =
new TabletStats(new TabletStats(extent.toThrift(), null, null, null, 0L, 0., 0., 0));
result.add(new TabletStatisticsImpl(stats));
}
}
return result;
}
}
@Before
public void setUp() {
last.clear();
servers.clear();
}
@Test
public void testAssignMigrations() {
servers.put(new TabletServerIdImpl("127.0.0.1", 1234, "a"), new FakeTServer());
servers.put(new TabletServerIdImpl("127.0.0.2", 1234, "b"), new FakeTServer());
servers.put(new TabletServerIdImpl("127.0.0.3", 1234, "c"), new FakeTServer());
List<TabletId> metadataTable = new ArrayList<>();
String table = "t1";
metadataTable.add(makeTablet(table, null, null));
table = "t2";
metadataTable.add(makeTablet(table, "a", null));
metadataTable.add(makeTablet(table, null, "a"));
table = "t3";
metadataTable.add(makeTablet(table, "a", null));
metadataTable.add(makeTablet(table, "b", "a"));
metadataTable.add(makeTablet(table, "c", "b"));
metadataTable.add(makeTablet(table, "d", "c"));
metadataTable.add(makeTablet(table, "e", "d"));
metadataTable.add(makeTablet(table, null, "e"));
Collections.sort(metadataTable);
TestSimpleLoadBalancer balancer = new TestSimpleLoadBalancer();
SortedMap<TabletServerId,TServerStatus> current = new TreeMap<>();
for (Entry<TabletServerId,FakeTServer> entry : servers.entrySet()) {
current.put(entry.getKey(), entry.getValue().getStatus());
}
assignTablets(metadataTable, servers, current, balancer);
// Verify that the counts on the tables are correct
Map<String,Integer> expectedCounts = new HashMap<>();
expectedCounts.put("t1", 1);
expectedCounts.put("t2", 1);
expectedCounts.put("t3", 2);
checkBalance(metadataTable, servers, expectedCounts);
// Rebalance once
for (Entry<TabletServerId,FakeTServer> entry : servers.entrySet()) {
current.put(entry.getKey(), entry.getValue().getStatus());
}
// Nothing should happen, we are balanced
ArrayList<TabletMigration> out = new ArrayList<>();
balancer.getMigrations(current, out);
assertEquals(out.size(), 0);
// Take down a tabletServer
TabletServerId first = current.keySet().iterator().next();
current.remove(first);
FakeTServer remove = servers.remove(first);
// reassign offline extents
assignTablets(remove.tablets, servers, current, balancer);
checkBalance(metadataTable, servers, null);
}
private void assignTablets(List<TabletId> metadataTable, Map<TabletServerId,FakeTServer> servers,
SortedMap<TabletServerId,TServerStatus> status, TestSimpleLoadBalancer balancer) {
// Assign tablets
for (TabletId tabletId : metadataTable) {
TabletServerId assignment = balancer.getAssignment(status, last.get(tabletId));
assertNotNull(assignment);
assertFalse(servers.get(assignment).tablets.contains(tabletId));
servers.get(assignment).tablets.add(tabletId);
last.put(tabletId, assignment);
}
}
SortedMap<TabletServerId,TServerStatus> getAssignments(Map<TabletServerId,FakeTServer> servers) {
SortedMap<TabletServerId,TServerStatus> result = new TreeMap<>();
for (Entry<TabletServerId,FakeTServer> entry : servers.entrySet()) {
result.put(entry.getKey(), entry.getValue().getStatus());
}
return result;
}
@Test
public void testUnevenAssignment() {
for (char c : "abcdefghijklmnopqrstuvwxyz".toCharArray()) {
String cString = Character.toString(c);
TabletServerId tsid = new TabletServerIdImpl("127.0.0.1", c, cString);
FakeTServer fakeTServer = new FakeTServer();
servers.put(tsid, fakeTServer);
fakeTServer.tablets.add(makeTablet(cString, null, null));
}
// Put more tablets on one server, but not more than the number of servers
Entry<TabletServerId,FakeTServer> first = servers.entrySet().iterator().next();
first.getValue().tablets.add(makeTablet("newTable", "a", null));
first.getValue().tablets.add(makeTablet("newTable", "b", "a"));
first.getValue().tablets.add(makeTablet("newTable", "c", "b"));
first.getValue().tablets.add(makeTablet("newTable", "d", "c"));
first.getValue().tablets.add(makeTablet("newTable", "e", "d"));
first.getValue().tablets.add(makeTablet("newTable", "f", "e"));
first.getValue().tablets.add(makeTablet("newTable", "g", "f"));
first.getValue().tablets.add(makeTablet("newTable", "h", "g"));
first.getValue().tablets.add(makeTablet("newTable", "i", null));
TestSimpleLoadBalancer balancer = new TestSimpleLoadBalancer();
Set<TabletId> migrations = Collections.emptySet();
int moved = 0;
// balance until we can't balance no more!
while (true) {
List<TabletMigration> migrationsOut = new ArrayList<>();
balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut));
if (migrationsOut.isEmpty())
break;
for (TabletMigration migration : migrationsOut) {
if (servers.get(migration.getOldTabletServer()).tablets.remove(migration.getTablet()))
moved++;
servers.get(migration.getNewTabletServer()).tablets.add(migration.getTablet());
}
}
assertEquals(8, moved);
}
@Test
public void testUnevenAssignment2() {
// make 26 servers
for (char c : "abcdefghijklmnopqrstuvwxyz".toCharArray()) {
TabletServerId tsid = new TabletServerIdImpl("127.0.0.1", c, Character.toString(c));
FakeTServer fakeTServer = new FakeTServer();
servers.put(tsid, fakeTServer);
}
// put 60 tablets on 25 of them
List<Entry<TabletServerId,FakeTServer>> shortList = new ArrayList<>(servers.entrySet());
Entry<TabletServerId,FakeTServer> shortServer = shortList.remove(0);
int c = 0;
for (int i = 0; i < 60; i++) {
for (Entry<TabletServerId,FakeTServer> entry : shortList) {
entry.getValue().tablets.add(makeTablet("t" + c, null, null));
}
}
// put 10 on the that short server:
for (int i = 0; i < 10; i++) {
shortServer.getValue().tablets.add(makeTablet("s" + i, null, null));
}
TestSimpleLoadBalancer balancer = new TestSimpleLoadBalancer();
Set<TabletId> migrations = Collections.emptySet();
int moved = 0;
// balance until we can't balance no more!
while (true) {
List<TabletMigration> migrationsOut = new ArrayList<>();
balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut));
if (migrationsOut.isEmpty())
break;
for (TabletMigration migration : migrationsOut) {
if (servers.get(migration.getOldTabletServer()).tablets.remove(migration.getTablet()))
moved++;
last.remove(migration.getTablet());
servers.get(migration.getNewTabletServer()).tablets.add(migration.getTablet());
last.put(migration.getTablet(), migration.getNewTabletServer());
}
}
// average is 58, with 2 at 59: we need 48 more moved to the short server
assertEquals(48, moved);
}
private void checkBalance(List<TabletId> metadataTable, Map<TabletServerId,FakeTServer> servers,
Map<String,Integer> expectedCounts) {
// Verify they are spread evenly over the cluster
int average = metadataTable.size() / servers.size();
for (FakeTServer server : servers.values()) {
int diff = server.tablets.size() - average;
if (diff < 0)
fail("average number of tablets is " + average + " but a server has "
+ server.tablets.size());
if (diff > 1)
fail("average number of tablets is " + average + " but a server has "
+ server.tablets.size());
}
if (expectedCounts != null) {
for (FakeTServer server : servers.values()) {
Map<String,Integer> counts = new HashMap<>();
server.tablets.forEach(tabletId -> {
String t = tabletId.getTable().canonical();
counts.putIfAbsent(t, 0);
counts.put(t, counts.get(t) + 1);
});
counts.forEach((k, v) -> assertEquals(expectedCounts.get(k), v));
}
}
}
private static TabletId makeTablet(String table, String end, String prev) {
return new TabletIdImpl(new KeyExtent(TableId.of(table), toText(end), toText(prev)));
}
private static Text toText(String value) {
if (value != null)
return new Text(value);
return null;
}
}