| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.accumulo.master; |
| |
| import static java.lang.Math.min; |
| import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.SortedMap; |
| import java.util.SortedSet; |
| import java.util.TreeMap; |
| import java.util.TreeSet; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.accumulo.core.client.AccumuloClient; |
| import org.apache.accumulo.core.client.AccumuloException; |
| import org.apache.accumulo.core.client.BatchWriter; |
| import org.apache.accumulo.core.client.BatchWriterConfig; |
| import org.apache.accumulo.core.client.MutationsRejectedException; |
| import org.apache.accumulo.core.client.RowIterator; |
| import org.apache.accumulo.core.client.Scanner; |
| import org.apache.accumulo.core.client.TableNotFoundException; |
| import org.apache.accumulo.core.conf.Property; |
| import org.apache.accumulo.core.data.Key; |
| import org.apache.accumulo.core.data.Mutation; |
| import org.apache.accumulo.core.data.PartialKey; |
| import org.apache.accumulo.core.data.Range; |
| import org.apache.accumulo.core.data.TableId; |
| import org.apache.accumulo.core.data.Value; |
| import org.apache.accumulo.core.dataImpl.KeyExtent; |
| import org.apache.accumulo.core.master.state.tables.TableState; |
| import org.apache.accumulo.core.master.thrift.MasterState; |
| import org.apache.accumulo.core.master.thrift.TabletServerStatus; |
| import org.apache.accumulo.core.metadata.MetadataTable; |
| import org.apache.accumulo.core.metadata.RootTable; |
| import org.apache.accumulo.core.metadata.TabletFileUtil; |
| import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; |
| import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily; |
| import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; |
| import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; |
| import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily; |
| import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; |
| import org.apache.accumulo.core.metadata.schema.MetadataTime; |
| import org.apache.accumulo.core.security.Authorizations; |
| import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; |
| import org.apache.accumulo.core.util.Daemon; |
| import org.apache.accumulo.master.Master.TabletGoalState; |
| import org.apache.accumulo.master.state.MergeStats; |
| import org.apache.accumulo.master.state.TableCounts; |
| import org.apache.accumulo.master.state.TableStats; |
| import org.apache.accumulo.server.conf.TableConfiguration; |
| import org.apache.accumulo.server.fs.FileRef; |
| import org.apache.accumulo.server.gc.GcVolumeUtil; |
| import org.apache.accumulo.server.log.WalStateManager; |
| import org.apache.accumulo.server.log.WalStateManager.WalMarkerException; |
| import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection; |
| import org.apache.accumulo.server.master.state.Assignment; |
| import org.apache.accumulo.server.master.state.ClosableIterator; |
| import org.apache.accumulo.server.master.state.DistributedStoreException; |
| import org.apache.accumulo.server.master.state.MergeInfo; |
| import org.apache.accumulo.server.master.state.MergeState; |
| import org.apache.accumulo.server.master.state.TServerInstance; |
| import org.apache.accumulo.server.master.state.TabletLocationState; |
| import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException; |
| import org.apache.accumulo.server.master.state.TabletState; |
| import org.apache.accumulo.server.master.state.TabletStateStore; |
| import org.apache.accumulo.server.metadata.ServerAmpleImpl; |
| import org.apache.accumulo.server.tablets.TabletTime; |
| import org.apache.accumulo.server.util.MetadataTableUtil; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.Text; |
| import org.apache.thrift.TException; |
| |
| import com.google.common.collect.ImmutableSortedSet; |
| import com.google.common.collect.Iterators; |
| |
| abstract class TabletGroupWatcher extends Daemon { |
| // Constants used to make sure assignment logging isn't excessive in quantity or size |
| private static final String ASSIGNMENT_BUFFER_SEPARATOR = ", "; |
| private static final int ASSIGNMENT_BUFFER_MAX_LENGTH = 4096; |
| |
| private final Master master; |
| private final TabletStateStore store; |
| private final TabletGroupWatcher dependentWatcher; |
| final TableStats stats = new TableStats(); |
| private SortedSet<TServerInstance> lastScanServers = ImmutableSortedSet.of(); |
| |
| TabletGroupWatcher(Master master, TabletStateStore store, TabletGroupWatcher dependentWatcher) { |
| this.master = master; |
| this.store = store; |
| this.dependentWatcher = dependentWatcher; |
| } |
| |
| /** Should this {@code TabletGroupWatcher} suspend tablets? */ |
| abstract boolean canSuspendTablets(); |
| |
| Map<TableId,TableCounts> getStats() { |
| return stats.getLast(); |
| } |
| |
| TableCounts getStats(TableId tableId) { |
| return stats.getLast(tableId); |
| } |
| |
| /** |
| * True if the collection of live tservers specified in 'candidates' hasn't changed since the last |
| * time an assignment scan was started. |
| */ |
| synchronized boolean isSameTserversAsLastScan(Set<TServerInstance> candidates) { |
| return candidates.equals(lastScanServers); |
| } |
| |
| @Override |
| public void run() { |
| Thread.currentThread().setName("Watching " + store.name()); |
| int[] oldCounts = new int[TabletState.values().length]; |
| EventCoordinator.Listener eventListener = this.master.nextEvent.getListener(); |
| |
| WalStateManager wals = new WalStateManager(master.getContext()); |
| |
| while (this.master.stillMaster()) { |
| // slow things down a little, otherwise we spam the logs when there are many wake-up events |
| sleepUninterruptibly(100, TimeUnit.MILLISECONDS); |
| |
| int totalUnloaded = 0; |
| int unloaded = 0; |
| ClosableIterator<TabletLocationState> iter = null; |
| try { |
| Map<TableId,MergeStats> mergeStatsCache = new HashMap<>(); |
| Map<TableId,MergeStats> currentMerges = new HashMap<>(); |
| for (MergeInfo merge : master.merges()) { |
| if (merge.getExtent() != null) { |
| currentMerges.put(merge.getExtent().getTableId(), new MergeStats(merge)); |
| } |
| } |
| |
| // Get the current status for the current list of tservers |
| SortedMap<TServerInstance,TabletServerStatus> currentTServers = new TreeMap<>(); |
| for (TServerInstance entry : this.master.tserverSet.getCurrentServers()) { |
| currentTServers.put(entry, this.master.tserverStatus.get(entry)); |
| } |
| |
| if (currentTServers.size() == 0) { |
| eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS); |
| synchronized (this) { |
| lastScanServers = ImmutableSortedSet.of(); |
| } |
| continue; |
| } |
| |
| // Don't move tablets to servers that are shutting down |
| SortedMap<TServerInstance,TabletServerStatus> destinations = new TreeMap<>(currentTServers); |
| destinations.keySet().removeAll(this.master.serversToShutdown); |
| |
| List<Assignment> assignments = new ArrayList<>(); |
| List<Assignment> assigned = new ArrayList<>(); |
| List<TabletLocationState> assignedToDeadServers = new ArrayList<>(); |
| List<TabletLocationState> suspendedToGoneServers = new ArrayList<>(); |
| Map<KeyExtent,TServerInstance> unassigned = new HashMap<>(); |
| Map<TServerInstance,List<Path>> logsForDeadServers = new TreeMap<>(); |
| |
| MasterState masterState = master.getMasterState(); |
| int[] counts = new int[TabletState.values().length]; |
| stats.begin(); |
| // Walk through the tablets in our store, and work tablets |
| // towards their goal |
| iter = store.iterator(); |
| while (iter.hasNext()) { |
| TabletLocationState tls = iter.next(); |
| if (tls == null) { |
| continue; |
| } |
| |
| // this can get spammy during merges |
| if (currentMerges.isEmpty()) { |
| Master.log.debug("{} location State: {}", store.name(), tls); |
| } |
| |
| // ignore entries for tables that do not exist in zookeeper |
| if (master.getTableManager().getTableState(tls.extent.getTableId()) == null) |
| continue; |
| |
| if (Master.log.isTraceEnabled()) |
| Master.log.trace("{} walogs {}", tls, tls.walogs.size()); |
| |
| // Don't overwhelm the tablet servers with work |
| if (unassigned.size() + unloaded |
| > Master.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) { |
| flushChanges(destinations, assignments, assigned, assignedToDeadServers, |
| logsForDeadServers, suspendedToGoneServers, unassigned); |
| assignments.clear(); |
| assigned.clear(); |
| assignedToDeadServers.clear(); |
| suspendedToGoneServers.clear(); |
| unassigned.clear(); |
| unloaded = 0; |
| eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS); |
| } |
| TableId tableId = tls.extent.getTableId(); |
| TableConfiguration tableConf = |
| this.master.getConfigurationFactory().getTableConfiguration(tableId); |
| |
| MergeStats mergeStats = mergeStatsCache.get(tableId); |
| if (mergeStats == null) { |
| mergeStats = currentMerges.get(tableId); |
| if (mergeStats == null) { |
| mergeStats = new MergeStats(new MergeInfo()); |
| } |
| mergeStatsCache.put(tableId, mergeStats); |
| } |
| TabletGoalState goal = this.master.getGoalState(tls, mergeStats.getMergeInfo()); |
| TServerInstance server = tls.getServer(); |
| TabletState state = tls.getState(currentTServers.keySet()); |
| if (Master.log.isTraceEnabled()) { |
| Master.log.trace("Goal state {} current {} for {}", goal, state, tls.extent); |
| } |
| stats.update(tableId, state); |
| mergeStats.update(tls.extent, state, tls.chopped, !tls.walogs.isEmpty()); |
| sendChopRequest(mergeStats.getMergeInfo(), state, tls); |
| sendSplitRequest(mergeStats.getMergeInfo(), state, tls); |
| |
| // Always follow through with assignments |
| if (state == TabletState.ASSIGNED) { |
| goal = TabletGoalState.HOSTED; |
| } |
| |
| // if we are shutting down all the tabletservers, we have to do it in order |
| if (goal == TabletGoalState.SUSPENDED && state == TabletState.HOSTED) { |
| if (this.master.serversToShutdown.equals(currentTServers.keySet())) { |
| if (dependentWatcher != null && dependentWatcher.assignedOrHosted() > 0) { |
| goal = TabletGoalState.HOSTED; |
| } |
| } |
| } |
| |
| if (goal == TabletGoalState.HOSTED) { |
| if (state != TabletState.HOSTED && !tls.walogs.isEmpty()) { |
| if (this.master.recoveryManager.recoverLogs(tls.extent, tls.walogs)) |
| continue; |
| } |
| switch (state) { |
| case HOSTED: |
| if (server.equals(this.master.migrations.get(tls.extent))) |
| this.master.migrations.remove(tls.extent); |
| break; |
| case ASSIGNED_TO_DEAD_SERVER: |
| assignedToDeadServers.add(tls); |
| if (server.equals(this.master.migrations.get(tls.extent))) |
| this.master.migrations.remove(tls.extent); |
| TServerInstance tserver = tls.futureOrCurrent(); |
| if (!logsForDeadServers.containsKey(tserver)) { |
| logsForDeadServers.put(tserver, wals.getWalsInUse(tserver)); |
| } |
| break; |
| case SUSPENDED: |
| if (master.getSteadyTime() - tls.suspend.suspensionTime |
| < tableConf.getTimeInMillis(Property.TABLE_SUSPEND_DURATION)) { |
| // Tablet is suspended. See if its tablet server is back. |
| TServerInstance returnInstance = null; |
| Iterator<TServerInstance> find = destinations |
| .tailMap(new TServerInstance(tls.suspend.server, " ")).keySet().iterator(); |
| if (find.hasNext()) { |
| TServerInstance found = find.next(); |
| if (found.getLocation().equals(tls.suspend.server)) { |
| returnInstance = found; |
| } |
| } |
| |
| // Old tablet server is back. Return this tablet to its previous owner. |
| if (returnInstance != null) { |
| assignments.add(new Assignment(tls.extent, returnInstance)); |
| } |
| // else - tablet server not back. Don't ask for a new assignment right now. |
| |
| } else { |
| // Treat as unassigned, ask for a new assignment. |
| unassigned.put(tls.extent, server); |
| } |
| break; |
| case UNASSIGNED: |
| // maybe it's a finishing migration |
| TServerInstance dest = this.master.migrations.get(tls.extent); |
| if (dest != null) { |
| // if destination is still good, assign it |
| if (destinations.containsKey(dest)) { |
| assignments.add(new Assignment(tls.extent, dest)); |
| } else { |
| // get rid of this migration |
| this.master.migrations.remove(tls.extent); |
| unassigned.put(tls.extent, server); |
| } |
| } else { |
| unassigned.put(tls.extent, server); |
| } |
| break; |
| case ASSIGNED: |
| // Send another reminder |
| assigned.add(new Assignment(tls.extent, tls.future)); |
| break; |
| } |
| } else { |
| switch (state) { |
| case SUSPENDED: |
| // Request a move to UNASSIGNED, so as to allow balancing to continue. |
| suspendedToGoneServers.add(tls); |
| cancelOfflineTableMigrations(tls); |
| break; |
| case UNASSIGNED: |
| cancelOfflineTableMigrations(tls); |
| break; |
| case ASSIGNED_TO_DEAD_SERVER: |
| assignedToDeadServers.add(tls); |
| if (!logsForDeadServers.containsKey(tls.futureOrCurrent())) { |
| logsForDeadServers.put(tls.futureOrCurrent(), |
| wals.getWalsInUse(tls.futureOrCurrent())); |
| } |
| break; |
| case HOSTED: |
| TServerConnection client = this.master.tserverSet.getConnection(server); |
| if (client != null) { |
| client.unloadTablet(this.master.masterLock, tls.extent, goal.howUnload(), |
| master.getSteadyTime()); |
| unloaded++; |
| totalUnloaded++; |
| } else { |
| Master.log.warn("Could not connect to server {}", server); |
| } |
| break; |
| case ASSIGNED: |
| break; |
| } |
| } |
| counts[state.ordinal()]++; |
| } |
| |
| flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers, |
| suspendedToGoneServers, unassigned); |
| |
| // provide stats after flushing changes to avoid race conditions w/ delete table |
| stats.end(masterState); |
| |
| // Report changes |
| for (TabletState state : TabletState.values()) { |
| int i = state.ordinal(); |
| if (counts[i] > 0 && counts[i] != oldCounts[i]) { |
| this.master.nextEvent.event("[%s]: %d tablets are %s", store.name(), counts[i], |
| state.name()); |
| } |
| } |
| Master.log.debug(String.format("[%s]: scan time %.2f seconds", store.name(), |
| stats.getScanTime() / 1000.)); |
| oldCounts = counts; |
| if (totalUnloaded > 0) { |
| this.master.nextEvent.event("[%s]: %d tablets unloaded", store.name(), totalUnloaded); |
| } |
| |
| updateMergeState(mergeStatsCache); |
| |
| synchronized (this) { |
| lastScanServers = ImmutableSortedSet.copyOf(currentTServers.keySet()); |
| } |
| if (this.master.tserverSet.getCurrentServers().equals(currentTServers.keySet())) { |
| Master.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), |
| Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.)); |
| eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS); |
| } else { |
| Master.log.info("Detected change in current tserver set, re-running state machine."); |
| } |
| } catch (Exception ex) { |
| Master.log.error("Error processing table state for store " + store.name(), ex); |
| if (ex.getCause() != null && ex.getCause() instanceof BadLocationStateException) { |
| repairMetadata(((BadLocationStateException) ex.getCause()).getEncodedEndRow()); |
| } else { |
| sleepUninterruptibly(Master.WAIT_BETWEEN_ERRORS, TimeUnit.MILLISECONDS); |
| } |
| } finally { |
| if (iter != null) { |
| try { |
| iter.close(); |
| } catch (IOException ex) { |
| Master.log.warn("Error closing TabletLocationState iterator: " + ex, ex); |
| } |
| } |
| } |
| } |
| } |
| |
| private void cancelOfflineTableMigrations(TabletLocationState tls) { |
| TServerInstance dest = this.master.migrations.get(tls.extent); |
| TableState tableState = master.getTableManager().getTableState(tls.extent.getTableId()); |
| if (dest != null && tableState == TableState.OFFLINE) { |
| this.master.migrations.remove(tls.extent); |
| } |
| } |
| |
| private void repairMetadata(Text row) { |
| Master.log.debug("Attempting repair on {}", row); |
| // ACCUMULO-2261 if a dying tserver writes a location before its lock information propagates, it |
| // may cause duplicate assignment. |
| // Attempt to find the dead server entry and remove it. |
| try { |
| Map<Key,Value> future = new HashMap<>(); |
| Map<Key,Value> assigned = new HashMap<>(); |
| KeyExtent extent = new KeyExtent(row, new Value(new byte[] {0})); |
| String table = MetadataTable.NAME; |
| if (extent.isMeta()) |
| table = RootTable.NAME; |
| Scanner scanner = this.master.getContext().createScanner(table, Authorizations.EMPTY); |
| scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME); |
| scanner.fetchColumnFamily(FutureLocationColumnFamily.NAME); |
| scanner.setRange(new Range(row)); |
| for (Entry<Key,Value> entry : scanner) { |
| if (entry.getKey().getColumnFamily().equals(CurrentLocationColumnFamily.NAME)) { |
| assigned.put(entry.getKey(), entry.getValue()); |
| } else if (entry.getKey().getColumnFamily().equals(FutureLocationColumnFamily.NAME)) { |
| future.put(entry.getKey(), entry.getValue()); |
| } |
| } |
| if (future.size() > 0 && assigned.size() > 0) { |
| Master.log.warn("Found a tablet assigned and hosted, attempting to repair"); |
| } else if (future.size() > 1 && assigned.size() == 0) { |
| Master.log.warn("Found a tablet assigned to multiple servers, attempting to repair"); |
| } else if (future.size() == 0 && assigned.size() > 1) { |
| Master.log.warn("Found a tablet hosted on multiple servers, attempting to repair"); |
| } else { |
| Master.log.info("Attempted a repair, but nothing seems to be obviously wrong. {} {}", |
| assigned, future); |
| return; |
| } |
| Iterator<Entry<Key,Value>> iter = |
| Iterators.concat(future.entrySet().iterator(), assigned.entrySet().iterator()); |
| while (iter.hasNext()) { |
| Entry<Key,Value> entry = iter.next(); |
| TServerInstance alive = master.tserverSet.find(entry.getValue().toString()); |
| if (alive == null) { |
| Master.log.info("Removing entry {}", entry); |
| BatchWriter bw = |
| this.master.getContext().createBatchWriter(table, new BatchWriterConfig()); |
| Mutation m = new Mutation(entry.getKey().getRow()); |
| m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier()); |
| bw.addMutation(m); |
| bw.close(); |
| return; |
| } |
| } |
| Master.log.error( |
| "Metadata table is inconsistent at {} and all assigned/future tservers are still online.", |
| row); |
| } catch (Throwable e) { |
| Master.log.error("Error attempting repair of metadata " + row + ": " + e, e); |
| } |
| } |
| |
| private int assignedOrHosted() { |
| int result = 0; |
| for (TableCounts counts : stats.getLast().values()) { |
| result += counts.assigned() + counts.hosted(); |
| } |
| return result; |
| } |
| |
| private void sendSplitRequest(MergeInfo info, TabletState state, TabletLocationState tls) { |
| // Already split? |
| if (!info.getState().equals(MergeState.SPLITTING)) |
| return; |
| // Merges don't split |
| if (!info.isDelete()) |
| return; |
| // Online and ready to split? |
| if (!state.equals(TabletState.HOSTED)) |
| return; |
| // Does this extent cover the end points of the delete? |
| KeyExtent range = info.getExtent(); |
| if (tls.extent.overlaps(range)) { |
| for (Text splitPoint : new Text[] {range.getPrevEndRow(), range.getEndRow()}) { |
| if (splitPoint == null) |
| continue; |
| if (!tls.extent.contains(splitPoint)) |
| continue; |
| if (splitPoint.equals(tls.extent.getEndRow())) |
| continue; |
| if (splitPoint.equals(tls.extent.getPrevEndRow())) |
| continue; |
| try { |
| TServerConnection conn; |
| conn = this.master.tserverSet.getConnection(tls.current); |
| if (conn != null) { |
| Master.log.info("Asking {} to split {} at {}", tls.current, tls.extent, splitPoint); |
| conn.splitTablet(tls.extent, splitPoint); |
| } else { |
| Master.log.warn("Not connected to server {}", tls.current); |
| } |
| } catch (NotServingTabletException e) { |
| Master.log.debug("Error asking tablet server to split a tablet: ", e); |
| } catch (Exception e) { |
| Master.log.warn("Error asking tablet server to split a tablet: ", e); |
| } |
| } |
| } |
| } |
| |
| private void sendChopRequest(MergeInfo info, TabletState state, TabletLocationState tls) { |
| // Don't bother if we're in the wrong state |
| if (!info.getState().equals(MergeState.WAITING_FOR_CHOPPED)) |
| return; |
| // Tablet must be online |
| if (!state.equals(TabletState.HOSTED)) |
| return; |
| // Tablet isn't already chopped |
| if (tls.chopped) |
| return; |
| // Tablet ranges intersect |
| if (info.needsToBeChopped(tls.extent)) { |
| TServerConnection conn; |
| try { |
| conn = this.master.tserverSet.getConnection(tls.current); |
| if (conn != null) { |
| Master.log.info("Asking {} to chop {}", tls.current, tls.extent); |
| conn.chop(this.master.masterLock, tls.extent); |
| } else { |
| Master.log.warn("Could not connect to server {}", tls.current); |
| } |
| } catch (TException e) { |
| Master.log.warn("Communications error asking tablet server to chop a tablet"); |
| } |
| } |
| } |
| |
| private void updateMergeState(Map<TableId,MergeStats> mergeStatsCache) { |
| for (MergeStats stats : mergeStatsCache.values()) { |
| try { |
| MergeState update = stats.nextMergeState(this.master.getContext(), this.master); |
| // when next state is MERGING, its important to persist this before |
| // starting the merge... the verification check that is done before |
| // moving into the merging state could fail if merge starts but does |
| // not finish |
| if (update == MergeState.COMPLETE) |
| update = MergeState.NONE; |
| if (update != stats.getMergeInfo().getState()) { |
| this.master.setMergeState(stats.getMergeInfo(), update); |
| } |
| |
| if (update == MergeState.MERGING) { |
| try { |
| if (stats.getMergeInfo().isDelete()) { |
| deleteTablets(stats.getMergeInfo()); |
| } else { |
| mergeMetadataRecords(stats.getMergeInfo()); |
| } |
| update = MergeState.COMPLETE; |
| this.master.setMergeState(stats.getMergeInfo(), update); |
| } catch (Exception ex) { |
| Master.log.error("Unable merge metadata table records", ex); |
| } |
| } |
| } catch (Exception ex) { |
| Master.log.error( |
| "Unable to update merge state for merge " + stats.getMergeInfo().getExtent(), ex); |
| } |
| } |
| } |
| |
| private void deleteTablets(MergeInfo info) throws AccumuloException { |
| KeyExtent extent = info.getExtent(); |
| String targetSystemTable = extent.isMeta() ? RootTable.NAME : MetadataTable.NAME; |
| Master.log.debug("Deleting tablets for {}", extent); |
| MetadataTime metadataTime = null; |
| KeyExtent followingTablet = null; |
| if (extent.getEndRow() != null) { |
| Key nextExtent = new Key(extent.getEndRow()).followingKey(PartialKey.ROW); |
| followingTablet = getHighTablet( |
| new KeyExtent(extent.getTableId(), nextExtent.getRow(), extent.getEndRow())); |
| Master.log.debug("Found following tablet {}", followingTablet); |
| } |
| try { |
| AccumuloClient client = this.master.getContext(); |
| Text start = extent.getPrevEndRow(); |
| if (start == null) { |
| start = new Text(); |
| } |
| Master.log.debug("Making file deletion entries for {}", extent); |
| Range deleteRange = new Range(TabletsSection.getRow(extent.getTableId(), start), false, |
| TabletsSection.getRow(extent.getTableId(), extent.getEndRow()), true); |
| Scanner scanner = client.createScanner(targetSystemTable, Authorizations.EMPTY); |
| scanner.setRange(deleteRange); |
| TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner); |
| TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner); |
| scanner.fetchColumnFamily(DataFileColumnFamily.NAME); |
| scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME); |
| Set<FileRef> datafiles = new TreeSet<>(); |
| for (Entry<Key,Value> entry : scanner) { |
| Key key = entry.getKey(); |
| if (key.compareColumnFamily(DataFileColumnFamily.NAME) == 0) { |
| datafiles |
| .add(new FileRef(TabletFileUtil.validate(key.getColumnQualifierData().toString()))); |
| if (datafiles.size() > 1000) { |
| MetadataTableUtil.addDeleteEntries(extent, datafiles, master.getContext()); |
| datafiles.clear(); |
| } |
| } else if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(key)) { |
| metadataTime = MetadataTime.parse(entry.getValue().toString()); |
| } else if (key.compareColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME) == 0) { |
| throw new IllegalStateException( |
| "Tablet " + key.getRow() + " is assigned during a merge!"); |
| } else if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) { |
| String path = GcVolumeUtil.getDeleteTabletOnAllVolumesUri(extent.getTableId(), |
| entry.getValue().toString()); |
| datafiles.add(new FileRef(path)); |
| if (datafiles.size() > 1000) { |
| MetadataTableUtil.addDeleteEntries(extent, datafiles, master.getContext()); |
| datafiles.clear(); |
| } |
| } |
| } |
| MetadataTableUtil.addDeleteEntries(extent, datafiles, master.getContext()); |
| BatchWriter bw = client.createBatchWriter(targetSystemTable, new BatchWriterConfig()); |
| try { |
| deleteTablets(info, deleteRange, bw, client); |
| } finally { |
| bw.close(); |
| } |
| |
| if (followingTablet != null) { |
| Master.log.debug("Updating prevRow of {} to {}", followingTablet, extent.getPrevEndRow()); |
| bw = client.createBatchWriter(targetSystemTable, new BatchWriterConfig()); |
| try { |
| Mutation m = new Mutation(followingTablet.getMetadataEntry()); |
| TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.put(m, |
| KeyExtent.encodePrevEndRow(extent.getPrevEndRow())); |
| ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m); |
| bw.addMutation(m); |
| bw.flush(); |
| } finally { |
| bw.close(); |
| } |
| } else { |
| // Recreate the default tablet to hold the end of the table |
| MetadataTableUtil.addTablet( |
| new KeyExtent(extent.getTableId(), null, extent.getPrevEndRow()), |
| ServerColumnFamily.DEFAULT_TABLET_DIR_NAME, master.getContext(), metadataTime.getType(), |
| this.master.masterLock); |
| } |
| } catch (RuntimeException | TableNotFoundException ex) { |
| throw new AccumuloException(ex); |
| } |
| } |
| |
| private void mergeMetadataRecords(MergeInfo info) throws AccumuloException { |
| KeyExtent range = info.getExtent(); |
| Master.log.debug("Merging metadata for {}", range); |
| KeyExtent stop = getHighTablet(range); |
| Master.log.debug("Highest tablet is {}", stop); |
| Value firstPrevRowValue = null; |
| Text stopRow = stop.getMetadataEntry(); |
| Text start = range.getPrevEndRow(); |
| if (start == null) { |
| start = new Text(); |
| } |
| Range scanRange = |
| new Range(TabletsSection.getRow(range.getTableId(), start), false, stopRow, false); |
| String targetSystemTable = MetadataTable.NAME; |
| if (range.isMeta()) { |
| targetSystemTable = RootTable.NAME; |
| } |
| |
| AccumuloClient client = this.master.getContext(); |
| |
| try (BatchWriter bw = client.createBatchWriter(targetSystemTable, new BatchWriterConfig())) { |
| long fileCount = 0; |
| // Make file entries in highest tablet |
| Scanner scanner = client.createScanner(targetSystemTable, Authorizations.EMPTY); |
| scanner.setRange(scanRange); |
| TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); |
| TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner); |
| TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner); |
| scanner.fetchColumnFamily(DataFileColumnFamily.NAME); |
| Mutation m = new Mutation(stopRow); |
| MetadataTime maxLogicalTime = null; |
| for (Entry<Key,Value> entry : scanner) { |
| Key key = entry.getKey(); |
| Value value = entry.getValue(); |
| if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) { |
| m.put(key.getColumnFamily(), key.getColumnQualifier(), value); |
| fileCount++; |
| } else if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key) |
| && firstPrevRowValue == null) { |
| Master.log.debug("prevRow entry for lowest tablet is {}", value); |
| firstPrevRowValue = new Value(value); |
| } else if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(key)) { |
| maxLogicalTime = |
| TabletTime.maxMetadataTime(maxLogicalTime, MetadataTime.parse(value.toString())); |
| } else if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) { |
| String uri = |
| GcVolumeUtil.getDeleteTabletOnAllVolumesUri(range.getTableId(), value.toString()); |
| bw.addMutation(ServerAmpleImpl.createDeleteMutation(uri)); |
| } |
| } |
| |
| // read the logical time from the last tablet in the merge range, it is not included in |
| // the loop above |
| scanner = client.createScanner(targetSystemTable, Authorizations.EMPTY); |
| scanner.setRange(new Range(stopRow)); |
| TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(scanner); |
| for (Entry<Key,Value> entry : scanner) { |
| if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(entry.getKey())) { |
| maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, |
| MetadataTime.parse(entry.getValue().toString())); |
| } |
| } |
| |
| if (maxLogicalTime != null) |
| TabletsSection.ServerColumnFamily.TIME_COLUMN.put(m, new Value(maxLogicalTime.encode())); |
| |
| if (!m.getUpdates().isEmpty()) { |
| bw.addMutation(m); |
| } |
| |
| bw.flush(); |
| |
| Master.log.debug("Moved {} files to {}", fileCount, stop); |
| |
| if (firstPrevRowValue == null) { |
| Master.log.debug("tablet already merged"); |
| return; |
| } |
| |
| stop.setPrevEndRow(KeyExtent.decodePrevEndRow(firstPrevRowValue)); |
| Mutation updatePrevRow = stop.getPrevRowUpdateMutation(); |
| Master.log.debug("Setting the prevRow for last tablet: {}", stop); |
| bw.addMutation(updatePrevRow); |
| bw.flush(); |
| |
| deleteTablets(info, scanRange, bw, client); |
| |
| // Clean-up the last chopped marker |
| m = new Mutation(stopRow); |
| ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m); |
| bw.addMutation(m); |
| bw.flush(); |
| |
| } catch (Exception ex) { |
| throw new AccumuloException(ex); |
| } |
| } |
| |
| private void deleteTablets(MergeInfo info, Range scanRange, BatchWriter bw, AccumuloClient client) |
| throws TableNotFoundException, MutationsRejectedException { |
| Scanner scanner; |
| Mutation m; |
| // Delete everything in the other tablets |
| // group all deletes into tablet into one mutation, this makes tablets |
| // either disappear entirely or not all.. this is important for the case |
| // where the process terminates in the loop below... |
| scanner = client.createScanner(info.getExtent().isMeta() ? RootTable.NAME : MetadataTable.NAME, |
| Authorizations.EMPTY); |
| Master.log.debug("Deleting range {}", scanRange); |
| scanner.setRange(scanRange); |
| RowIterator rowIter = new RowIterator(scanner); |
| while (rowIter.hasNext()) { |
| Iterator<Entry<Key,Value>> row = rowIter.next(); |
| m = null; |
| while (row.hasNext()) { |
| Entry<Key,Value> entry = row.next(); |
| Key key = entry.getKey(); |
| |
| if (m == null) |
| m = new Mutation(key.getRow()); |
| |
| m.putDelete(key.getColumnFamily(), key.getColumnQualifier()); |
| Master.log.debug("deleting entry {}", key); |
| } |
| bw.addMutation(m); |
| } |
| |
| bw.flush(); |
| } |
| |
| private KeyExtent getHighTablet(KeyExtent range) throws AccumuloException { |
| try { |
| AccumuloClient client = this.master.getContext(); |
| Scanner scanner = client.createScanner(range.isMeta() ? RootTable.NAME : MetadataTable.NAME, |
| Authorizations.EMPTY); |
| TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); |
| KeyExtent start = new KeyExtent(range.getTableId(), range.getEndRow(), null); |
| scanner.setRange(new Range(start.getMetadataEntry(), null)); |
| Iterator<Entry<Key,Value>> iterator = scanner.iterator(); |
| if (!iterator.hasNext()) { |
| throw new AccumuloException("No last tablet for a merge " + range); |
| } |
| Entry<Key,Value> entry = iterator.next(); |
| KeyExtent highTablet = |
| new KeyExtent(entry.getKey().getRow(), KeyExtent.decodePrevEndRow(entry.getValue())); |
| if (!highTablet.getTableId().equals(range.getTableId())) { |
| throw new AccumuloException("No last tablet for merge " + range + " " + highTablet); |
| } |
| return highTablet; |
| } catch (Exception ex) { |
| throw new AccumuloException("Unexpected failure finding the last tablet for a merge " + range, |
| ex); |
| } |
| } |
| |
| private void flushChanges(SortedMap<TServerInstance,TabletServerStatus> currentTServers, |
| List<Assignment> assignments, List<Assignment> assigned, |
| List<TabletLocationState> assignedToDeadServers, |
| Map<TServerInstance,List<Path>> logsForDeadServers, |
| List<TabletLocationState> suspendedToGoneServers, Map<KeyExtent,TServerInstance> unassigned) |
| throws DistributedStoreException, TException, WalMarkerException { |
| boolean tabletsSuspendable = canSuspendTablets(); |
| if (!assignedToDeadServers.isEmpty()) { |
| int maxServersToShow = min(assignedToDeadServers.size(), 100); |
| Master.log.debug("{} assigned to dead servers: {}...", assignedToDeadServers.size(), |
| assignedToDeadServers.subList(0, maxServersToShow)); |
| Master.log.debug("logs for dead servers: {}", logsForDeadServers); |
| if (tabletsSuspendable) { |
| store.suspend(assignedToDeadServers, logsForDeadServers, master.getSteadyTime()); |
| } else { |
| store.unassign(assignedToDeadServers, logsForDeadServers); |
| } |
| this.master.markDeadServerLogsAsClosed(logsForDeadServers); |
| this.master.nextEvent.event( |
| "Marked %d tablets as suspended because they don't have current servers", |
| assignedToDeadServers.size()); |
| } |
| if (!suspendedToGoneServers.isEmpty()) { |
| int maxServersToShow = min(assignedToDeadServers.size(), 100); |
| Master.log.debug(assignedToDeadServers.size() + " suspended to gone servers: " |
| + assignedToDeadServers.subList(0, maxServersToShow) + "..."); |
| store.unsuspend(suspendedToGoneServers); |
| } |
| |
| if (!currentTServers.isEmpty()) { |
| Map<KeyExtent,TServerInstance> assignedOut = new HashMap<>(); |
| final StringBuilder builder = new StringBuilder(64); |
| this.master.tabletBalancer.getAssignments(Collections.unmodifiableSortedMap(currentTServers), |
| Collections.unmodifiableMap(unassigned), assignedOut); |
| for (Entry<KeyExtent,TServerInstance> assignment : assignedOut.entrySet()) { |
| if (unassigned.containsKey(assignment.getKey())) { |
| if (assignment.getValue() != null) { |
| if (!currentTServers.containsKey(assignment.getValue())) { |
| Master.log.warn( |
| "balancer assigned {} to a tablet server that is not current {} ignoring", |
| assignment.getKey(), assignment.getValue()); |
| continue; |
| } |
| |
| if (builder.length() > 0) { |
| builder.append(ASSIGNMENT_BUFFER_SEPARATOR); |
| } |
| |
| builder.append(assignment); |
| |
| // Don't let the log message get too gigantic |
| if (builder.length() > ASSIGNMENT_BUFFER_MAX_LENGTH) { |
| builder.append("]"); |
| Master.log.debug("{} assigning tablets: [{}", store.name(), builder); |
| builder.setLength(0); |
| } |
| |
| assignments.add(new Assignment(assignment.getKey(), assignment.getValue())); |
| } |
| } else { |
| Master.log.warn( |
| "{} load balancer assigning tablet that was not nominated for assignment {}", |
| store.name(), assignment.getKey()); |
| } |
| } |
| |
| if (builder.length() > 0) { |
| // Make sure to log any leftover assignments |
| builder.append("]"); |
| Master.log.debug("{} assigning tablets: [{}", store.name(), builder); |
| } |
| |
| if (!unassigned.isEmpty() && assignedOut.isEmpty()) |
| Master.log.warn("Load balancer failed to assign any tablets"); |
| } |
| |
| if (assignments.size() > 0) { |
| Master.log.info(String.format("Assigning %d tablets", assignments.size())); |
| store.setFutureLocations(assignments); |
| } |
| assignments.addAll(assigned); |
| for (Assignment a : assignments) { |
| TServerConnection client = this.master.tserverSet.getConnection(a.server); |
| if (client != null) { |
| client.assignTablet(this.master.masterLock, a.tablet); |
| } else { |
| Master.log.warn("Could not connect to server {}", a.server); |
| } |
| master.assignedTablet(a.tablet); |
| } |
| } |
| |
| } |