blob: 7dd97c60fbd1acf448e5f00737f2fe55ab10e758 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.server.util;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.impl.ClientContext;
import org.apache.accumulo.core.client.impl.ScannerImpl;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.zookeeper.ZooLock;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.hadoop.io.Text;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
public class MasterMetadataUtil {
private static final Logger log = LoggerFactory.getLogger(MasterMetadataUtil.class);
public static void addNewTablet(ClientContext context, KeyExtent extent, String path,
TServerInstance location, Map<FileRef,DataFileValue> datafileSizes,
Map<Long,? extends Collection<FileRef>> bulkLoadedFiles, String time, long lastFlushID,
long lastCompactID, ZooLock zooLock) {
Mutation m = extent.getPrevRowUpdateMutation();
TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(path.getBytes(UTF_8)));
TabletsSection.ServerColumnFamily.TIME_COLUMN.put(m, new Value(time.getBytes(UTF_8)));
if (lastFlushID > 0)
TabletsSection.ServerColumnFamily.FLUSH_COLUMN.put(m,
new Value(("" + lastFlushID).getBytes()));
if (lastCompactID > 0)
TabletsSection.ServerColumnFamily.COMPACT_COLUMN.put(m,
new Value(("" + lastCompactID).getBytes()));
if (location != null) {
location.putLocation(m);
location.clearFutureLocation(m);
}
for (Entry<FileRef,DataFileValue> entry : datafileSizes.entrySet()) {
m.put(DataFileColumnFamily.NAME, entry.getKey().meta(), new Value(entry.getValue().encode()));
}
for (Entry<Long,? extends Collection<FileRef>> entry : bulkLoadedFiles.entrySet()) {
Value tidBytes = new Value(Long.toString(entry.getKey()).getBytes());
for (FileRef ref : entry.getValue()) {
m.put(TabletsSection.BulkFileColumnFamily.NAME, ref.meta(), new Value(tidBytes));
}
}
MetadataTableUtil.update(context, zooLock, m, extent);
}
public static KeyExtent fixSplit(ClientContext context, Text metadataEntry,
SortedMap<ColumnFQ,Value> columns, TServerInstance tserver, ZooLock lock)
throws AccumuloException, IOException {
log.info("Incomplete split " + metadataEntry + " attempting to fix");
Value oper = columns.get(TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN);
if (columns.get(TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN) == null) {
throw new IllegalArgumentException(
"Metadata entry does not have split ratio (" + metadataEntry + ")");
}
double splitRatio = Double.parseDouble(
new String(columns.get(TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN).get(), UTF_8));
Value prevEndRowIBW = columns.get(TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN);
if (prevEndRowIBW == null) {
throw new IllegalArgumentException(
"Metadata entry does not have prev row (" + metadataEntry + ")");
}
Value time = columns.get(TabletsSection.ServerColumnFamily.TIME_COLUMN);
if (time == null) {
throw new IllegalArgumentException(
"Metadata entry does not have time (" + metadataEntry + ")");
}
Value flushID = columns.get(TabletsSection.ServerColumnFamily.FLUSH_COLUMN);
long initFlushID = -1;
if (flushID != null)
initFlushID = Long.parseLong(flushID.toString());
Value compactID = columns.get(TabletsSection.ServerColumnFamily.COMPACT_COLUMN);
long initCompactID = -1;
if (compactID != null)
initCompactID = Long.parseLong(compactID.toString());
Text metadataPrevEndRow = KeyExtent.decodePrevEndRow(prevEndRowIBW);
String table = (new KeyExtent(metadataEntry, (Text) null)).getTableId();
return fixSplit(context, table, metadataEntry, metadataPrevEndRow, oper, splitRatio, tserver,
time.toString(), initFlushID, initCompactID, lock);
}
private static KeyExtent fixSplit(ClientContext context, String table, Text metadataEntry,
Text metadataPrevEndRow, Value oper, double splitRatio, TServerInstance tserver, String time,
long initFlushID, long initCompactID, ZooLock lock) throws AccumuloException, IOException {
if (metadataPrevEndRow == null)
// something is wrong, this should not happen... if a tablet is split, it will always have a
// prev end row....
throw new AccumuloException(
"Split tablet does not have prev end row, something is amiss, extent = " + metadataEntry);
// check to see if prev tablet exist in metadata tablet
Key prevRowKey = new Key(new Text(KeyExtent.getMetadataEntry(table, metadataPrevEndRow)));
try (ScannerImpl scanner2 = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY)) {
scanner2.setRange(new Range(prevRowKey, prevRowKey.followingKey(PartialKey.ROW)));
VolumeManager fs = VolumeManagerImpl.get();
if (!scanner2.iterator().hasNext()) {
log.info("Rolling back incomplete split " + metadataEntry + " " + metadataPrevEndRow);
MetadataTableUtil.rollBackSplit(metadataEntry, KeyExtent.decodePrevEndRow(oper), context,
lock);
return new KeyExtent(metadataEntry, KeyExtent.decodePrevEndRow(oper));
} else {
log.info("Finishing incomplete split " + metadataEntry + " " + metadataPrevEndRow);
List<FileRef> highDatafilesToRemove = new ArrayList<>();
SortedMap<FileRef,DataFileValue> origDatafileSizes = new TreeMap<>();
SortedMap<FileRef,DataFileValue> highDatafileSizes = new TreeMap<>();
SortedMap<FileRef,DataFileValue> lowDatafileSizes = new TreeMap<>();
try (Scanner scanner3 = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY)) {
Key rowKey = new Key(metadataEntry);
scanner3.fetchColumnFamily(DataFileColumnFamily.NAME);
scanner3.setRange(new Range(rowKey, rowKey.followingKey(PartialKey.ROW)));
for (Entry<Key,Value> entry : scanner3) {
if (entry.getKey().compareColumnFamily(DataFileColumnFamily.NAME) == 0) {
origDatafileSizes.put(new FileRef(fs, entry.getKey()),
new DataFileValue(entry.getValue().get()));
}
}
}
MetadataTableUtil.splitDatafiles(table, metadataPrevEndRow, splitRatio,
new HashMap<FileRef,FileUtil.FileInfo>(), origDatafileSizes, lowDatafileSizes,
highDatafileSizes, highDatafilesToRemove);
MetadataTableUtil.finishSplit(metadataEntry, highDatafileSizes, highDatafilesToRemove,
context, lock);
return new KeyExtent(metadataEntry, KeyExtent.encodePrevEndRow(metadataPrevEndRow));
}
}
}
private static TServerInstance getTServerInstance(String address, ZooLock zooLock) {
while (true) {
try {
return new TServerInstance(address, zooLock.getSessionId());
} catch (KeeperException e) {
log.error("{}", e.getMessage(), e);
} catch (InterruptedException e) {
log.error("{}", e.getMessage(), e);
}
sleepUninterruptibly(1, TimeUnit.SECONDS);
}
}
public static void replaceDatafiles(ClientContext context, KeyExtent extent,
Set<FileRef> datafilesToDelete, Set<FileRef> scanFiles, FileRef path, Long compactionId,
DataFileValue size, String address, TServerInstance lastLocation, ZooLock zooLock)
throws IOException {
replaceDatafiles(context, extent, datafilesToDelete, scanFiles, path, compactionId, size,
address, lastLocation, zooLock, true);
}
public static void replaceDatafiles(ClientContext context, KeyExtent extent,
Set<FileRef> datafilesToDelete, Set<FileRef> scanFiles, FileRef path, Long compactionId,
DataFileValue size, String address, TServerInstance lastLocation, ZooLock zooLock,
boolean insertDeleteFlags) throws IOException {
if (insertDeleteFlags) {
// add delete flags for those paths before the data file reference is removed
MetadataTableUtil.addDeleteEntries(extent, datafilesToDelete, context);
}
// replace data file references to old mapfiles with the new mapfiles
Mutation m = new Mutation(extent.getMetadataEntry());
for (FileRef pathToRemove : datafilesToDelete)
m.putDelete(DataFileColumnFamily.NAME, pathToRemove.meta());
for (FileRef scanFile : scanFiles)
m.put(ScanFileColumnFamily.NAME, scanFile.meta(), new Value(new byte[0]));
if (size.getNumEntries() > 0)
m.put(DataFileColumnFamily.NAME, path.meta(), new Value(size.encode()));
if (compactionId != null)
TabletsSection.ServerColumnFamily.COMPACT_COLUMN.put(m,
new Value(("" + compactionId).getBytes()));
TServerInstance self = getTServerInstance(address, zooLock);
self.putLastLocation(m);
// remove the old location
if (lastLocation != null && !lastLocation.equals(self))
lastLocation.clearLastLocation(m);
MetadataTableUtil.update(context, zooLock, m, extent);
}
/**
* new data file update function adds one data file to a tablet's list
*
* @param path
* should be relative to the table directory
*
*/
public static void updateTabletDataFile(ClientContext context, KeyExtent extent, FileRef path,
FileRef mergeFile, DataFileValue dfv, String time, Set<FileRef> filesInUseByScans,
String address, ZooLock zooLock, Set<String> unusedWalLogs, TServerInstance lastLocation,
long flushId) {
if (extent.isRootTablet()) {
if (unusedWalLogs != null) {
updateRootTabletDataFile(extent, path, mergeFile, dfv, time, filesInUseByScans, address,
zooLock, unusedWalLogs, lastLocation, flushId);
}
return;
}
Mutation m = getUpdateForTabletDataFile(extent, path, mergeFile, dfv, time, filesInUseByScans,
address, zooLock, unusedWalLogs, lastLocation, flushId);
MetadataTableUtil.update(context, zooLock, m, extent);
}
/**
* Update the data file for the root tablet
*/
private static void updateRootTabletDataFile(KeyExtent extent, FileRef path, FileRef mergeFile,
DataFileValue dfv, String time, Set<FileRef> filesInUseByScans, String address,
ZooLock zooLock, Set<String> unusedWalLogs, TServerInstance lastLocation, long flushId) {
IZooReaderWriter zk = ZooReaderWriter.getInstance();
String root = MetadataTableUtil.getZookeeperLogLocation();
for (String entry : unusedWalLogs) {
String[] parts = entry.split("/");
String zpath = root + "/" + parts[parts.length - 1];
while (true) {
try {
if (zk.exists(zpath)) {
log.debug("Removing WAL reference for root table " + zpath);
zk.recursiveDelete(zpath, NodeMissingPolicy.SKIP);
}
break;
} catch (KeeperException e) {
log.error("{}", e.getMessage(), e);
} catch (InterruptedException e) {
log.error("{}", e.getMessage(), e);
}
sleepUninterruptibly(1, TimeUnit.SECONDS);
}
}
}
/**
* Create an update that updates a tablet
*
* @return A Mutation to update a tablet from the given information
*/
private static Mutation getUpdateForTabletDataFile(KeyExtent extent, FileRef path,
FileRef mergeFile, DataFileValue dfv, String time, Set<FileRef> filesInUseByScans,
String address, ZooLock zooLock, Set<String> unusedWalLogs, TServerInstance lastLocation,
long flushId) {
Mutation m = new Mutation(extent.getMetadataEntry());
if (dfv.getNumEntries() > 0) {
m.put(DataFileColumnFamily.NAME, path.meta(), new Value(dfv.encode()));
TabletsSection.ServerColumnFamily.TIME_COLUMN.put(m, new Value(time.getBytes(UTF_8)));
// stuff in this location
TServerInstance self = getTServerInstance(address, zooLock);
self.putLastLocation(m);
// erase the old location
if (lastLocation != null && !lastLocation.equals(self))
lastLocation.clearLastLocation(m);
}
if (unusedWalLogs != null) {
for (String entry : unusedWalLogs) {
m.putDelete(LogColumnFamily.NAME, new Text(entry));
}
}
for (FileRef scanFile : filesInUseByScans)
m.put(ScanFileColumnFamily.NAME, scanFile.meta(), new Value(new byte[0]));
if (mergeFile != null)
m.putDelete(DataFileColumnFamily.NAME, mergeFile.meta());
TabletsSection.ServerColumnFamily.FLUSH_COLUMN.put(m,
new Value(Long.toString(flushId).getBytes(UTF_8)));
return m;
}
}