blob: 2b955f165909c71dc57adff1e1ccfe2fe3175180 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.server.manager.state;
import java.io.IOException;
import java.lang.ref.Cleaner.Cleanable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.SortedMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.iterators.user.WholeRowIterator;
import org.apache.accumulo.core.metadata.SuspendingTServer;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletLocationState;
import org.apache.accumulo.core.metadata.TabletLocationState.BadLocationStateException;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.cleaner.CleanerUtil;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MetaDataTableScanner implements ClosableIterator<TabletLocationState> {
private static final Logger log = LoggerFactory.getLogger(MetaDataTableScanner.class);
private final Cleanable cleanable;
private final BatchScanner mdScanner;
private final Iterator<Entry<Key,Value>> iter;
private final AtomicBoolean closed = new AtomicBoolean(false);
MetaDataTableScanner(ClientContext context, Range range, CurrentState state, String tableName) {
// scan over metadata table, looking for tablets in the wrong state based on the live servers
// and online tables
try {
mdScanner = context.createBatchScanner(tableName, Authorizations.EMPTY, 8);
} catch (TableNotFoundException e) {
throw new IllegalStateException("Metadata table " + tableName + " should exist", e);
}
cleanable = CleanerUtil.unclosed(this, MetaDataTableScanner.class, closed, log, mdScanner);
configureScanner(mdScanner, state);
mdScanner.setRanges(Collections.singletonList(range));
iter = mdScanner.iterator();
}
public static void configureScanner(ScannerBase scanner, CurrentState state) {
TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
scanner.fetchColumnFamily(FutureLocationColumnFamily.NAME);
scanner.fetchColumnFamily(LastLocationColumnFamily.NAME);
scanner.fetchColumnFamily(SuspendLocationColumn.SUSPEND_COLUMN.getColumnFamily());
scanner.fetchColumnFamily(LogColumnFamily.NAME);
scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
scanner.addScanIterator(new IteratorSetting(1000, "wholeRows", WholeRowIterator.class));
IteratorSetting tabletChange =
new IteratorSetting(1001, "tabletChange", TabletStateChangeIterator.class);
if (state != null) {
TabletStateChangeIterator.setCurrentServers(tabletChange, state.onlineTabletServers());
TabletStateChangeIterator.setOnlineTables(tabletChange, state.onlineTables());
TabletStateChangeIterator.setMerges(tabletChange, state.merges());
TabletStateChangeIterator.setMigrations(tabletChange, state.migrationsSnapshot());
TabletStateChangeIterator.setManagerState(tabletChange, state.getManagerState());
TabletStateChangeIterator.setShuttingDown(tabletChange, state.shutdownServers());
}
scanner.addScanIterator(tabletChange);
}
public MetaDataTableScanner(ClientContext context, Range range, String tableName) {
this(context, range, null, tableName);
}
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
// deregister cleanable, but it won't run because it checks
// the value of closed first, which is now true
cleanable.clean();
mdScanner.close();
}
}
@Override
public boolean hasNext() {
if (closed.get()) {
return false;
}
boolean result = iter.hasNext();
if (!result) {
close();
}
return result;
}
@Override
public TabletLocationState next() {
if (closed.get()) {
throw new NoSuchElementException(this.getClass().getSimpleName() + " is closed");
}
try {
Entry<Key,Value> e = iter.next();
return createTabletLocationState(e.getKey(), e.getValue());
} catch (IOException | BadLocationStateException ex) {
throw new RuntimeException(ex);
}
}
public static TabletLocationState createTabletLocationState(Key k, Value v)
throws IOException, BadLocationStateException {
final SortedMap<Key,Value> decodedRow = WholeRowIterator.decodeRow(k, v);
KeyExtent extent = null;
TServerInstance future = null;
TServerInstance current = null;
TServerInstance last = null;
SuspendingTServer suspend = null;
long lastTimestamp = 0;
List<Collection<String>> walogs = new ArrayList<>();
boolean chopped = false;
for (Entry<Key,Value> entry : decodedRow.entrySet()) {
Key key = entry.getKey();
Text row = key.getRow();
Text cf = key.getColumnFamily();
Text cq = key.getColumnQualifier();
if (cf.compareTo(FutureLocationColumnFamily.NAME) == 0) {
TServerInstance location = new TServerInstance(entry.getValue(), cq);
if (future != null) {
throw new BadLocationStateException("found two assignments for the same extent " + row
+ ": " + future + " and " + location, row);
}
future = location;
} else if (cf.compareTo(CurrentLocationColumnFamily.NAME) == 0) {
TServerInstance location = new TServerInstance(entry.getValue(), cq);
if (current != null) {
throw new BadLocationStateException("found two locations for the same extent " + row
+ ": " + current + " and " + location, row);
}
current = location;
} else if (cf.compareTo(LogColumnFamily.NAME) == 0) {
String[] split = entry.getValue().toString().split("\\|")[0].split(";");
walogs.add(Arrays.asList(split));
} else if (cf.compareTo(LastLocationColumnFamily.NAME) == 0) {
if (lastTimestamp < entry.getKey().getTimestamp()) {
last = new TServerInstance(entry.getValue(), cq);
}
} else if (cf.compareTo(ChoppedColumnFamily.NAME) == 0) {
chopped = true;
} else if (TabletColumnFamily.PREV_ROW_COLUMN.equals(cf, cq)) {
extent = KeyExtent.fromMetaPrevRow(entry);
} else if (SuspendLocationColumn.SUSPEND_COLUMN.equals(cf, cq)) {
suspend = SuspendingTServer.fromValue(entry.getValue());
}
}
if (extent == null) {
String msg = "No prev-row for key extent " + decodedRow;
log.error(msg);
throw new BadLocationStateException(msg, k.getRow());
}
return new TabletLocationState(extent, future, current, last, suspend, walogs, chopped);
}
}