blob: 0bfb011577a71f0f66fe13bf06af7b21db6bc98f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.master.tableOps.bulkVer2;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.clientImpl.bulk.Bulk;
import org.apache.accumulo.core.clientImpl.bulk.Bulk.Files;
import org.apache.accumulo.core.clientImpl.bulk.BulkSerialize;
import org.apache.accumulo.core.clientImpl.bulk.LoadMappingIterator;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.MapFileInfo;
import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.MapCounter;
import org.apache.accumulo.core.util.PeekingIterator;
import org.apache.accumulo.core.util.TextUtil;
import org.apache.accumulo.fate.FateTxId;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.master.Master;
import org.apache.accumulo.master.tableOps.MasterRepo;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
/**
* Make asynchronous load calls to each overlapping Tablet. This RepO does its work on the isReady
* and will return a linear sleep value based on the largest number of Tablets on a TabletServer.
*/
class LoadFiles extends MasterRepo {
private static final long serialVersionUID = 1L;
private static final Logger log = LoggerFactory.getLogger(LoadFiles.class);
private final BulkInfo bulkInfo;
public LoadFiles(BulkInfo bulkInfo) {
this.bulkInfo = bulkInfo;
}
@Override
public long isReady(long tid, Master master) throws Exception {
if (master.onlineTabletServers().isEmpty()) {
log.warn("There are no tablet server to process bulkDir import, waiting (tid = "
+ FateTxId.formatTid(tid) + ")");
return 100;
}
VolumeManager fs = master.getVolumeManager();
final Path bulkDir = new Path(bulkInfo.bulkDir);
try (LoadMappingIterator lmi =
BulkSerialize.getUpdatedLoadMapping(bulkDir.toString(), bulkInfo.tableId, fs::open)) {
return loadFiles(bulkInfo.tableId, bulkDir, lmi, master, tid);
}
}
@Override
public Repo<Master> call(final long tid, final Master master) {
if (bulkInfo.tableState == TableState.ONLINE) {
return new CompleteBulkImport(bulkInfo);
} else {
return new CleanUpBulkImport(bulkInfo);
}
}
private abstract static class Loader {
protected Path bulkDir;
protected Master master;
protected long tid;
protected boolean setTime;
void start(Path bulkDir, Master master, long tid, boolean setTime) throws Exception {
this.bulkDir = bulkDir;
this.master = master;
this.tid = tid;
this.setTime = setTime;
}
abstract void load(List<TabletMetadata> tablets, Files files) throws Exception;
abstract long finish() throws Exception;
}
private static class OnlineLoader extends Loader {
long timeInMillis;
String fmtTid;
int locationLess = 0;
// track how many tablets were sent load messages per tablet server
MapCounter<HostAndPort> loadMsgs;
// Each RPC to a tablet server needs to check in zookeeper to see if the transaction is still
// active. The purpose of this map is to group load request by tablet servers inorder to do less
// RPCs. Less RPCs will result in less calls to Zookeeper.
Map<HostAndPort,Map<TKeyExtent,Map<String,MapFileInfo>>> loadQueue;
private int queuedDataSize = 0;
@Override
void start(Path bulkDir, Master master, long tid, boolean setTime) throws Exception {
super.start(bulkDir, master, tid, setTime);
timeInMillis = master.getConfiguration().getTimeInMillis(Property.MANAGER_BULK_TIMEOUT);
fmtTid = FateTxId.formatTid(tid);
loadMsgs = new MapCounter<>();
loadQueue = new HashMap<>();
}
private void sendQueued(int threshhold) {
if (queuedDataSize > threshhold || threshhold == 0) {
loadQueue.forEach((server, tabletFiles) -> {
if (log.isTraceEnabled()) {
log.trace("{} asking {} to bulk import {} files for {} tablets", fmtTid, server,
tabletFiles.values().stream().mapToInt(Map::size).sum(), tabletFiles.size());
}
TabletClientService.Client client = null;
try {
client = ThriftUtil.getTServerClient(server, master.getContext(), timeInMillis);
client.loadFiles(TraceUtil.traceInfo(), master.getContext().rpcCreds(), tid,
bulkDir.toString(), tabletFiles, setTime);
} catch (TException ex) {
log.debug("rpc failed server: " + server + ", " + fmtTid + " " + ex.getMessage(), ex);
} finally {
ThriftUtil.returnClient(client);
}
});
loadQueue.clear();
queuedDataSize = 0;
}
}
private void addToQueue(HostAndPort server, KeyExtent extent,
Map<String,MapFileInfo> thriftImports) {
if (!thriftImports.isEmpty()) {
loadMsgs.increment(server, 1);
Map<String,MapFileInfo> prev = loadQueue.computeIfAbsent(server, k -> new HashMap<>())
.putIfAbsent(extent.toThrift(), thriftImports);
Preconditions.checkState(prev == null, "Unexpectedly saw extent %s twice", extent);
// keep a very rough estimate of how much is memory so we can send if over a few megs is
// buffered
queuedDataSize += thriftImports.keySet().stream().mapToInt(String::length).sum()
+ server.getHost().length() + 4 + thriftImports.size() * 32;
}
}
@Override
void load(List<TabletMetadata> tablets, Files files) {
for (TabletMetadata tablet : tablets) {
// send files to tablet sever
// ideally there should only be one tablet location to send all the files
TabletMetadata.Location location = tablet.getLocation();
HostAndPort server = null;
if (location == null) {
locationLess++;
continue;
} else {
server = location.getHostAndPort();
}
Set<TabletFile> loadedFiles = tablet.getLoaded().keySet();
Map<String,MapFileInfo> thriftImports = new HashMap<>();
for (final Bulk.FileInfo fileInfo : files) {
Path fullPath = new Path(bulkDir, fileInfo.getFileName());
TabletFile bulkFile = new TabletFile(fullPath);
if (!loadedFiles.contains(bulkFile)) {
thriftImports.put(fileInfo.getFileName(), new MapFileInfo(fileInfo.getEstFileSize()));
}
}
addToQueue(server, tablet.getExtent(), thriftImports);
}
sendQueued(4 * 1024 * 1024);
}
@Override
long finish() {
sendQueued(0);
long sleepTime = 0;
if (loadMsgs.size() > 0) {
// find which tablet server had the most load messages sent to it and sleep 13ms for each
// load message
sleepTime = loadMsgs.max() * 13;
}
if (locationLess > 0) {
sleepTime = Math.max(Math.max(100L, locationLess), sleepTime);
}
return sleepTime;
}
}
private static class OfflineLoader extends Loader {
BatchWriter bw;
// track how many tablets were sent load messages per tablet server
MapCounter<HostAndPort> unloadingTablets;
@Override
void start(Path bulkDir, Master master, long tid, boolean setTime) throws Exception {
Preconditions.checkArgument(!setTime);
super.start(bulkDir, master, tid, setTime);
bw = master.getContext().createBatchWriter(MetadataTable.NAME);
unloadingTablets = new MapCounter<>();
}
@Override
void load(List<TabletMetadata> tablets, Files files) throws MutationsRejectedException {
byte[] fam = TextUtil.getBytes(DataFileColumnFamily.NAME);
for (TabletMetadata tablet : tablets) {
if (tablet.getLocation() != null) {
unloadingTablets.increment(tablet.getLocation().getHostAndPort(), 1L);
continue;
}
Mutation mutation = new Mutation(tablet.getExtent().toMetaRow());
for (final Bulk.FileInfo fileInfo : files) {
String fullPath = new Path(bulkDir, fileInfo.getFileName()).toString();
byte[] val =
new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries()).encode();
mutation.put(fam, fullPath.getBytes(UTF_8), val);
}
bw.addMutation(mutation);
}
}
@Override
long finish() throws Exception {
bw.close();
long sleepTime = 0;
if (unloadingTablets.size() > 0) {
// find which tablet server had the most tablets to unload and sleep 13ms for each tablet
sleepTime = unloadingTablets.max() * 13;
}
return sleepTime;
}
}
/**
* Make asynchronous load calls to each overlapping Tablet in the bulk mapping. Return a sleep
* time to isReady based on a factor of the TabletServer with the most Tablets. This method will
* scan the metadata table getting Tablet range and location information. It will return 0 when
* all files have been loaded.
*/
private long loadFiles(TableId tableId, Path bulkDir, LoadMappingIterator loadMapIter,
Master master, long tid) throws Exception {
PeekingIterator<Map.Entry<KeyExtent,Bulk.Files>> lmi = new PeekingIterator<>(loadMapIter);
Map.Entry<KeyExtent,Bulk.Files> loadMapEntry = lmi.peek();
Text startRow = loadMapEntry.getKey().prevEndRow();
Iterator<TabletMetadata> tabletIter =
TabletsMetadata.builder().forTable(tableId).overlapping(startRow, null).checkConsistency()
.fetch(PREV_ROW, LOCATION, LOADED).build(master.getContext()).iterator();
Loader loader;
if (bulkInfo.tableState == TableState.ONLINE) {
loader = new OnlineLoader();
} else {
loader = new OfflineLoader();
}
loader.start(bulkDir, master, tid, bulkInfo.setTime);
long t1 = System.currentTimeMillis();
while (lmi.hasNext()) {
loadMapEntry = lmi.next();
List<TabletMetadata> tablets = findOverlappingTablets(loadMapEntry.getKey(), tabletIter);
loader.load(tablets, loadMapEntry.getValue());
}
long sleepTime = loader.finish();
if (sleepTime > 0) {
long scanTime = Math.min(System.currentTimeMillis() - t1, 30000);
sleepTime = Math.max(sleepTime, scanTime * 2);
}
return sleepTime;
}
private static final Comparator<Text> PREV_COMP = Comparator.nullsFirst(Text::compareTo);
private static final Comparator<Text> END_COMP = Comparator.nullsLast(Text::compareTo);
/**
* Find all the tablets within the provided bulk load mapping range.
*/
private List<TabletMetadata> findOverlappingTablets(KeyExtent loadRange,
Iterator<TabletMetadata> tabletIter) {
TabletMetadata currTablet = null;
try {
List<TabletMetadata> tablets = new ArrayList<>();
currTablet = tabletIter.next();
int cmp;
// skip tablets until we find the prevEndRow of loadRange
while ((cmp = PREV_COMP.compare(currTablet.getPrevEndRow(), loadRange.prevEndRow())) < 0) {
currTablet = tabletIter.next();
}
if (cmp != 0) {
throw new IllegalStateException("Unexpected prev end row " + currTablet + " " + loadRange);
}
// we have found the first tablet in the range, add it to the list
tablets.add(currTablet);
// find the remaining tablets within the loadRange by
// adding tablets to the list until the endRow matches the loadRange
while ((cmp = END_COMP.compare(currTablet.getEndRow(), loadRange.endRow())) < 0) {
currTablet = tabletIter.next();
tablets.add(currTablet);
}
if (cmp != 0) {
throw new IllegalStateException("Unexpected end row " + currTablet + " " + loadRange);
}
return tablets;
} catch (NoSuchElementException e) {
NoSuchElementException ne2 = new NoSuchElementException(
"Failed to find overlapping tablets " + currTablet + " " + loadRange);
ne2.initCause(e);
throw ne2;
}
}
}