blob: c0dca502848a5b3b89ebf6d09acfd1a0bea92302 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.server.master.state;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SkippingIterator;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.master.thrift.MasterState;
import org.apache.accumulo.core.util.AddressUtil;
import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
public class TabletStateChangeIterator extends SkippingIterator {
private static final String SERVERS_OPTION = "servers";
private static final String TABLES_OPTION = "tables";
private static final String MERGES_OPTION = "merges";
private static final String DEBUG_OPTION = "debug";
private static final String MIGRATIONS_OPTION = "migrations";
private static final String MASTER_STATE_OPTION = "masterState";
private static final String SHUTTING_DOWN_OPTION = "shuttingDown";
private static final Logger log = LoggerFactory.getLogger(TabletStateChangeIterator.class);
private Set<TServerInstance> current;
private Set<String> onlineTables;
private Map<String,MergeInfo> merges;
private boolean debug = false;
private Set<KeyExtent> migrations;
private MasterState masterState = MasterState.NORMAL;
@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
super.init(source, options, env);
current = parseServers(options.get(SERVERS_OPTION));
onlineTables = parseTables(options.get(TABLES_OPTION));
merges = parseMerges(options.get(MERGES_OPTION));
debug = options.containsKey(DEBUG_OPTION);
migrations = parseMigrations(options.get(MIGRATIONS_OPTION));
try {
masterState = MasterState.valueOf(options.get(MASTER_STATE_OPTION));
} catch (Exception ex) {
if (options.get(MASTER_STATE_OPTION) != null) {
log.error("Unable to decode masterState " + options.get(MASTER_STATE_OPTION));
}
}
Set<TServerInstance> shuttingDown = parseServers(options.get(SHUTTING_DOWN_OPTION));
if (current != null && shuttingDown != null) {
current.removeAll(shuttingDown);
}
}
private Set<KeyExtent> parseMigrations(String migrations) {
if (migrations == null)
return Collections.emptySet();
try {
Set<KeyExtent> result = new HashSet<>();
DataInputBuffer buffer = new DataInputBuffer();
byte[] data = Base64.getDecoder().decode(migrations);
buffer.reset(data, data.length);
while (buffer.available() > 0) {
KeyExtent extent = new KeyExtent();
extent.readFields(buffer);
result.add(extent);
}
return result;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
private Set<String> parseTables(String tables) {
if (tables == null)
return null;
Set<String> result = new HashSet<>();
for (String table : tables.split(","))
result.add(table);
return result;
}
private Set<TServerInstance> parseServers(String servers) {
if (servers == null)
return null;
// parse "host:port[INSTANCE]"
Set<TServerInstance> result = new HashSet<>();
if (servers.length() > 0) {
for (String part : servers.split(",")) {
String parts[] = part.split("\\[", 2);
String hostport = parts[0];
String instance = parts[1];
if (instance != null && instance.endsWith("]"))
instance = instance.substring(0, instance.length() - 1);
result.add(new TServerInstance(AddressUtil.parseAddress(hostport, false), instance));
}
}
return result;
}
private Map<String,MergeInfo> parseMerges(String merges) {
if (merges == null)
return null;
try {
Map<String,MergeInfo> result = new HashMap<>();
DataInputBuffer buffer = new DataInputBuffer();
byte[] data = Base64.getDecoder().decode(merges);
buffer.reset(data, data.length);
while (buffer.available() > 0) {
MergeInfo mergeInfo = new MergeInfo();
mergeInfo.readFields(buffer);
result.put(mergeInfo.extent.getTableId(), mergeInfo);
}
return result;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@Override
protected void consume() throws IOException {
while (getSource().hasTop()) {
Key k = getSource().getTopKey();
Value v = getSource().getTopValue();
if (onlineTables == null || current == null || masterState != MasterState.NORMAL)
return;
TabletLocationState tls;
try {
tls = MetaDataTableScanner.createTabletLocationState(k, v);
if (tls == null)
return;
} catch (BadLocationStateException e) {
// maybe the master can do something with a tablet with bad/inconsistent state
return;
}
// we always want data about merges
MergeInfo merge = merges.get(tls.extent.getTableId());
if (merge != null) {
// could make this smarter by only returning if the tablet is involved in the merge
return;
}
// always return the information for migrating tablets
if (migrations.contains(tls.extent)) {
return;
}
// is the table supposed to be online or offline?
boolean shouldBeOnline = onlineTables.contains(tls.extent.getTableId());
if (debug) {
log.debug(tls.extent + " is " + tls.getState(current) + " and should be " + (shouldBeOnline ? "on" : "off") + "line");
}
switch (tls.getState(current)) {
case ASSIGNED:
// we always want data about assigned tablets
return;
case HOSTED:
if (!shouldBeOnline)
return;
break;
case ASSIGNED_TO_DEAD_SERVER:
return;
case SUSPENDED:
case UNASSIGNED:
if (shouldBeOnline)
return;
break;
default:
throw new AssertionError("Inconceivable! The tablet is an unrecognized state: " + tls.getState(current));
}
// table is in the expected state so don't bother returning any information about it
getSource().next();
}
}
@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
throw new UnsupportedOperationException();
}
public static void setCurrentServers(IteratorSetting cfg, Set<TServerInstance> goodServers) {
if (goodServers != null) {
List<String> servers = new ArrayList<>();
for (TServerInstance server : goodServers)
servers.add(server.toString());
cfg.addOption(SERVERS_OPTION, Joiner.on(",").join(servers));
}
}
public static void setOnlineTables(IteratorSetting cfg, Set<String> onlineTables) {
if (onlineTables != null)
cfg.addOption(TABLES_OPTION, Joiner.on(",").join(onlineTables));
}
public static void setMerges(IteratorSetting cfg, Collection<MergeInfo> merges) {
DataOutputBuffer buffer = new DataOutputBuffer();
try {
for (MergeInfo info : merges) {
KeyExtent extent = info.getExtent();
if (extent != null && !info.getState().equals(MergeState.NONE)) {
info.write(buffer);
}
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
String encoded = Base64.getEncoder().encodeToString(Arrays.copyOf(buffer.getData(), buffer.getLength()));
cfg.addOption(MERGES_OPTION, encoded);
}
public static void setMigrations(IteratorSetting cfg, Collection<KeyExtent> migrations) {
DataOutputBuffer buffer = new DataOutputBuffer();
try {
for (KeyExtent extent : migrations) {
extent.write(buffer);
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
String encoded = Base64.getEncoder().encodeToString(Arrays.copyOf(buffer.getData(), buffer.getLength()));
cfg.addOption(MIGRATIONS_OPTION, encoded);
}
public static void setMasterState(IteratorSetting cfg, MasterState state) {
cfg.addOption(MASTER_STATE_OPTION, state.toString());
}
public static void setShuttingDown(IteratorSetting cfg, Set<TServerInstance> servers) {
if (servers != null) {
List<String> serverList = new ArrayList<>();
for (TServerInstance server : servers) {
serverList.add(server.toString());
}
cfg.addOption(SHUTTING_DOWN_OPTION, Joiner.on(",").join(servers));
}
}
}