| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.accumulo.server.master.tableOps; |
| |
| import java.io.BufferedReader; |
| import java.io.BufferedWriter; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.io.InputStreamReader; |
| import java.io.OutputStreamWriter; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.accumulo.cloudtrace.instrument.TraceExecutorService; |
| import org.apache.accumulo.core.Constants; |
| import org.apache.accumulo.core.client.Connector; |
| import org.apache.accumulo.core.client.Instance; |
| import org.apache.accumulo.core.client.IsolatedScanner; |
| import org.apache.accumulo.core.client.Scanner; |
| import org.apache.accumulo.core.client.impl.ServerClient; |
| import org.apache.accumulo.core.client.impl.Tables; |
| import org.apache.accumulo.core.client.impl.thrift.ClientService; |
| import org.apache.accumulo.core.client.impl.thrift.ClientService.Iface; |
| import org.apache.accumulo.core.client.impl.thrift.TableOperation; |
| import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; |
| import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; |
| import org.apache.accumulo.core.conf.Property; |
| import org.apache.accumulo.core.conf.SiteConfiguration; |
| import org.apache.accumulo.core.data.Key; |
| import org.apache.accumulo.core.data.KeyExtent; |
| import org.apache.accumulo.core.data.Value; |
| import org.apache.accumulo.core.file.FileOperations; |
| import org.apache.accumulo.core.master.state.tables.TableState; |
| import org.apache.accumulo.core.security.thrift.AuthInfo; |
| import org.apache.accumulo.core.util.CachedConfiguration; |
| import org.apache.accumulo.core.util.Daemon; |
| import org.apache.accumulo.core.util.Pair; |
| import org.apache.accumulo.core.util.UtilWaitThread; |
| import org.apache.accumulo.server.ServerConstants; |
| import org.apache.accumulo.server.client.HdfsZooInstance; |
| import org.apache.accumulo.server.conf.ServerConfiguration; |
| import org.apache.accumulo.server.fate.Repo; |
| import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection; |
| import org.apache.accumulo.server.master.Master; |
| import org.apache.accumulo.server.master.state.TServerInstance; |
| import org.apache.accumulo.server.security.SecurityConstants; |
| import org.apache.accumulo.server.tabletserver.UniqueNameAllocator; |
| import org.apache.accumulo.server.trace.TraceFileSystem; |
| import org.apache.accumulo.server.util.MetadataTable; |
| import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; |
| import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.MapFile; |
| import org.apache.hadoop.io.Text; |
| import org.apache.log4j.Logger; |
| import org.apache.thrift.TException; |
| |
| |
| /* |
| * Bulk import makes requests of tablet servers, and those requests can take a |
| * long time. Our communications to the tablet server may fail, so we won't know |
| * the status of the request. The master will repeat failed requests so now |
| * there are multiple requests to the tablet server. The tablet server will not |
| * execute the request multiple times, so long as the marker it wrote in the |
| * metadata table stays there. The master needs to know when all requests have |
| * finished so it can remove the markers. Did it start? Did it finish? We can see |
| * that *a* request completed by seeing the flag written into the metadata |
| * table, but we won't know if some other rogue thread is still waiting to start |
| * a thread and repeat the operation. |
| * |
| * The master can ask the tablet server if it has any requests still running. |
| * Except the tablet server might have some thread about to start a request, but |
| * before it has made any bookkeeping about the request. To prevent problems |
| * like this, an Arbitrator is used. Before starting any new request, the tablet |
| * server checks the Arbitrator to see if the request is still valid. |
| * |
| */ |
| |
| public class BulkImport extends MasterRepo { |
| |
| private static final long serialVersionUID = 1L; |
| |
| private static final Logger log = Logger.getLogger(BulkImport.class); |
| |
| private String tableId; |
| private String sourceDir; |
| private String errorDir; |
| private boolean setTime; |
| |
| public BulkImport(String tableId, String sourceDir, String errorDir, boolean setTime) { |
| this.tableId = tableId; |
| this.sourceDir = sourceDir; |
| this.errorDir = errorDir; |
| this.setTime = setTime; |
| } |
| |
| @Override |
| public long isReady(long tid, Master master) throws Exception { |
| if (!Utils.getReadLock(tableId, tid).tryLock()) |
| return 100; |
| |
| Instance instance = HdfsZooInstance.getInstance(); |
| Tables.clearCache(instance); |
| if (Tables.getTableState(instance, tableId) == TableState.ONLINE) { |
| long reserve1, reserve2; |
| reserve1 = reserve2 = Utils.reserveHdfsDirectory(sourceDir, tid); |
| if (reserve1 == 0) |
| reserve2 = Utils.reserveHdfsDirectory(errorDir, tid); |
| return reserve2; |
| } else { |
| throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.OFFLINE, null); |
| } |
| } |
| |
| @Override |
| public Repo<Master> call(long tid, Master master) throws Exception { |
| log.debug(" tid " + tid + " sourceDir " + sourceDir); |
| |
| Utils.getReadLock(tableId, tid).lock(); |
| |
| // check that the error directory exists and is empty |
| FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(), |
| ServerConfiguration.getSiteConfiguration())); |
| ; |
| Path errorPath = new Path(errorDir); |
| FileStatus errorStatus = null; |
| try { |
| errorStatus = fs.getFileStatus(errorPath); |
| } catch (FileNotFoundException ex) { |
| // ignored |
| } |
| if (errorStatus == null) |
| throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, errorDir |
| + " does not exist"); |
| if (!errorStatus.isDir()) |
| throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, errorDir |
| + " is not a directory"); |
| if (fs.listStatus(errorPath).length != 0) |
| throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, errorDir |
| + " is not empty"); |
| |
| ZooArbitrator.start(Constants.BULK_ARBITRATOR_TYPE, tid); |
| |
| // move the files into the directory |
| try { |
| String bulkDir = prepareBulkImport(fs, sourceDir, tableId); |
| log.debug(" tid " + tid + " bulkDir " + bulkDir); |
| return new LoadFiles(tableId, sourceDir, bulkDir, errorDir, setTime); |
| } catch (IOException ex) { |
| log.error("error preparing the bulk import directory", ex); |
| throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_INPUT_DIRECTORY, sourceDir + ": " |
| + ex); |
| } |
| } |
| |
| private Path createNewBulkDir(FileSystem fs, String tableId) throws IOException { |
| Path directory = new Path(ServerConstants.getTablesDir() + "/" + tableId); |
| fs.mkdirs(directory); |
| |
| // only one should be able to create the lock file |
| // the purpose of the lock file is to avoid a race |
| // condition between the call to fs.exists() and |
| // fs.mkdirs()... if only hadoop had a mkdir() function |
| // that failed when the dir existed |
| |
| UniqueNameAllocator namer = UniqueNameAllocator.getInstance(); |
| |
| while (true) { |
| Path newBulkDir = new Path(directory, Constants.BULK_PREFIX + namer.getNextName()); |
| if (fs.exists(newBulkDir)) // sanity check |
| throw new IllegalStateException("Dir exist when it should not " + newBulkDir); |
| if (fs.mkdirs(newBulkDir)) |
| return newBulkDir; |
| log.warn("Failed to create " + newBulkDir + " for unknown reason"); |
| |
| UtilWaitThread.sleep(3000); |
| } |
| } |
| |
| private String prepareBulkImport(FileSystem fs, String dir, String tableId) throws IOException { |
| Path bulkDir = createNewBulkDir(fs, tableId); |
| |
| MetadataTable.addBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName()); |
| |
| Path dirPath = new Path(dir); |
| FileStatus[] mapFiles = fs.listStatus(dirPath); |
| |
| UniqueNameAllocator namer = UniqueNameAllocator.getInstance(); |
| |
| for (FileStatus fileStatus : mapFiles) { |
| String sa[] = fileStatus.getPath().getName().split("\\."); |
| String extension = ""; |
| if (sa.length > 1) { |
| extension = sa[sa.length - 1]; |
| |
| if (!FileOperations.getValidExtensions().contains(extension)) { |
| log.warn(fileStatus.getPath() + " does not have a valid extension, ignoring"); |
| continue; |
| } |
| } else { |
| // assume it is a map file |
| extension = Constants.MAPFILE_EXTENSION; |
| } |
| |
| if (extension.equals(Constants.MAPFILE_EXTENSION)) { |
| if (!fileStatus.isDir()) { |
| log.warn(fileStatus.getPath() + " is not a map file, ignoring"); |
| continue; |
| } |
| |
| if (fileStatus.getPath().getName().equals("_logs")) { |
| log.info(fileStatus.getPath() + " is probably a log directory from a map/reduce task, skipping"); |
| continue; |
| } |
| try { |
| FileStatus dataStatus = fs.getFileStatus(new Path(fileStatus.getPath(), MapFile.DATA_FILE_NAME)); |
| if (dataStatus.isDir()) { |
| log.warn(fileStatus.getPath() + " is not a map file, ignoring"); |
| continue; |
| } |
| } catch (FileNotFoundException fnfe) { |
| log.warn(fileStatus.getPath() + " is not a map file, ignoring"); |
| continue; |
| } |
| } |
| |
| String newName = "I" + namer.getNextName() + "." + extension; |
| Path newPath = new Path(bulkDir, newName); |
| try { |
| fs.rename(fileStatus.getPath(), newPath); |
| log.debug("Moved " + fileStatus.getPath() + " to " + newPath); |
| } catch (IOException E1) { |
| log.error("Could not move: " + fileStatus.getPath().toString() + " " + E1.getMessage()); |
| } |
| } |
| return bulkDir.toString(); |
| } |
| |
| @Override |
| public void undo(long tid, Master environment) throws Exception { |
| // unreserve source/error directories |
| Utils.unreserveHdfsDirectory(sourceDir, tid); |
| Utils.unreserveHdfsDirectory(errorDir, tid); |
| Utils.getReadLock(tableId, tid).unlock(); |
| } |
| } |
| |
| class CleanUpBulkImport extends MasterRepo { |
| |
| private static final long serialVersionUID = 1L; |
| |
| private static final Logger log = Logger.getLogger(CleanUpBulkImport.class); |
| |
| private String tableId; |
| private String source; |
| private String bulk; |
| private String error; |
| |
| public CleanUpBulkImport(String tableId, String source, String bulk, String error) { |
| this.tableId = tableId; |
| this.source = source; |
| this.bulk = bulk; |
| this.error = error; |
| } |
| |
| @Override |
| public Repo<Master> call(long tid, Master master) throws Exception { |
| log.debug("removing the bulk processing flag file in " + bulk); |
| Path bulkDir = new Path(bulk); |
| MetadataTable.removeBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName()); |
| MetadataTable.addDeleteEntry(tableId, "/" + bulkDir.getName()); |
| log.debug("removing the metadata table markers for loaded files"); |
| AuthInfo creds = SecurityConstants.getSystemCredentials(); |
| Connector conn = HdfsZooInstance.getInstance().getConnector(creds.user, creds.password); |
| MetadataTable.removeBulkLoadEntries(conn, tableId, tid); |
| log.debug("releasing HDFS reservations for " + source + " and " + error); |
| Utils.unreserveHdfsDirectory(source, tid); |
| Utils.unreserveHdfsDirectory(error, tid); |
| Utils.getReadLock(tableId, tid).unlock(); |
| log.debug("completing bulk import transaction " + tid); |
| ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid); |
| return null; |
| } |
| } |
| |
| class CompleteBulkImport extends MasterRepo { |
| |
| private static final long serialVersionUID = 1L; |
| |
| private String tableId; |
| private String source; |
| private String bulk; |
| private String error; |
| |
| public CompleteBulkImport(String tableId, String source, String bulk, String error) { |
| this.tableId = tableId; |
| this.source = source; |
| this.bulk = bulk; |
| this.error = error; |
| } |
| |
| @Override |
| public Repo<Master> call(long tid, Master master) throws Exception { |
| ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid); |
| return new CopyFailed(tableId, source, bulk, error); |
| } |
| } |
| |
| class CopyFailed extends MasterRepo { |
| |
| private static final long serialVersionUID = 1L; |
| |
| private String tableId; |
| private String source; |
| private String bulk; |
| private String error; |
| |
| public CopyFailed(String tableId, String source, String bulk, String error) { |
| this.tableId = tableId; |
| this.source = source; |
| this.bulk = bulk; |
| this.error = error; |
| } |
| |
| @Override |
| public long isReady(long tid, Master master) throws Exception { |
| Set<TServerInstance> finished = new HashSet<TServerInstance>(); |
| Set<TServerInstance> running = master.onlineTabletServers(); |
| for (TServerInstance server : running) { |
| try { |
| TServerConnection client = master.getConnection(server); |
| if (client != null && !client.isActive(tid)) |
| finished.add(server); |
| } catch (TException ex) { |
| log.info("Ignoring error trying to check on tid " + tid + " from server " + server + ": " + ex); |
| } |
| } |
| if (finished.containsAll(running)) |
| return 0; |
| return 500; |
| } |
| |
| @Override |
| public Repo<Master> call(long tid, Master environment) throws Exception { |
| //This needs to execute after the arbiter is stopped |
| |
| FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(), |
| ServerConfiguration.getSiteConfiguration())); |
| |
| if (!fs.exists(new Path(error, "failures.txt"))) |
| return new CleanUpBulkImport(tableId, source, bulk, error); |
| |
| HashMap<String,String> failures = new HashMap<String,String>(); |
| HashMap<String,String> loadedFailures = new HashMap<String,String>(); |
| |
| FSDataInputStream failFile = fs.open(new Path(error, "failures.txt")); |
| BufferedReader in = new BufferedReader(new InputStreamReader(failFile)); |
| try { |
| String line = null; |
| while ((line = in.readLine()) != null) { |
| Path path = new Path(line); |
| if (!fs.exists(new Path(error, path.getName()))) |
| failures.put("/" + path.getParent().getName() + "/" + path.getName(), line); |
| } |
| } finally { |
| failFile.close(); |
| } |
| |
| /* |
| * I thought I could move files that have no file references in the table. However its possible a clone references a file. Therefore only move files that |
| * have no loaded markers. |
| */ |
| |
| // determine which failed files were loaded |
| AuthInfo creds = SecurityConstants.getSystemCredentials(); |
| Connector conn = HdfsZooInstance.getInstance().getConnector(creds.user, creds.password); |
| Scanner mscanner = new IsolatedScanner(conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS)); |
| mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange()); |
| mscanner.fetchColumnFamily(Constants.METADATA_BULKFILE_COLUMN_FAMILY); |
| |
| for (Entry<Key,Value> entry : mscanner) { |
| if (Long.parseLong(entry.getValue().toString()) == tid) { |
| String loadedFile = entry.getKey().getColumnQualifier().toString(); |
| String absPath = failures.remove(loadedFile); |
| if (absPath != null) { |
| loadedFailures.put(loadedFile, absPath); |
| } |
| } |
| } |
| |
| // move failed files that were not loaded |
| for (String failure : failures.values()) { |
| Path orig = new Path(failure); |
| Path dest = new Path(error, orig.getName()); |
| fs.rename(orig, dest); |
| log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": failed"); |
| } |
| |
| if (loadedFailures.size() > 0) { |
| DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() |
| + Constants.ZBULK_FAILED_COPYQ); |
| |
| HashSet<String> workIds = new HashSet<String>(); |
| |
| for (String failure : loadedFailures.values()) { |
| Path orig = new Path(failure); |
| Path dest = new Path(error, orig.getName()); |
| |
| if (fs.exists(dest)) |
| continue; |
| |
| bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes()); |
| workIds.add(orig.getName()); |
| log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed"); |
| } |
| |
| bifCopyQueue.waitUntilDone(workIds); |
| } |
| |
| fs.delete(new Path(error, "failures.txt"), true); |
| return new CleanUpBulkImport(tableId, source, bulk, error); |
| } |
| |
| } |
| |
| class LoadFiles extends MasterRepo { |
| |
| private static final long serialVersionUID = 1L; |
| final static int THREAD_POOL_SIZE = ServerConfiguration.getSystemConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE); |
| |
| private static ExecutorService threadPool = null; |
| static { |
| if (threadPool == null) { |
| ThreadFactory threadFactory = new ThreadFactory() { |
| int count = 0; |
| |
| @Override |
| public Thread newThread(Runnable r) { |
| return new Daemon(r, "bulk loader " + count++); |
| } |
| }; |
| ThreadPoolExecutor pool = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, 1l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), |
| threadFactory); |
| pool.allowCoreThreadTimeOut(true); |
| threadPool = new TraceExecutorService(pool); |
| } |
| } |
| private static final Logger log = Logger.getLogger(BulkImport.class); |
| |
| private String tableId; |
| private String source; |
| private String bulk; |
| private String errorDir; |
| private boolean setTime; |
| |
| public LoadFiles(String tableId, String source, String bulk, String errorDir, boolean setTime) { |
| this.tableId = tableId; |
| this.source = source; |
| this.bulk = bulk; |
| this.errorDir = errorDir; |
| this.setTime = setTime; |
| } |
| |
| @Override |
| public long isReady(long tid, Master master) throws Exception { |
| if (master.onlineTabletServers().size() == 0) |
| return 500; |
| return 0; |
| } |
| |
| @Override |
| public Repo<Master> call(final long tid, final Master master) throws Exception { |
| final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration(); |
| FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(), |
| ServerConfiguration.getSiteConfiguration())); |
| List<FileStatus> files = new ArrayList<FileStatus>(); |
| for (FileStatus entry : fs.listStatus(new Path(bulk))) { |
| files.add(entry); |
| } |
| log.debug("tid " + tid + " importing " + files.size() + " files"); |
| |
| Path writable = new Path(this.errorDir, ".iswritable"); |
| if (!fs.createNewFile(writable)) { |
| // Maybe this is a re-try... clear the flag and try again |
| fs.delete(writable, false); |
| if (!fs.createNewFile(writable)) |
| throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, |
| "Unable to write to " + this.errorDir); |
| } |
| fs.delete(writable, false); |
| |
| final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>()); |
| for (FileStatus f : files) |
| filesToLoad.add(f.getPath().toString()); |
| |
| final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES)); |
| for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) { |
| List<Future<List<String>>> results = new ArrayList<Future<List<String>>>(); |
| |
| if (master.onlineTabletServers().size() == 0) |
| log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid + ")"); |
| |
| while (master.onlineTabletServers().size() == 0) { |
| UtilWaitThread.sleep(500); |
| } |
| |
| // Use the threadpool to assign files one-at-a-time to the server |
| final List<String> loaded = Collections.synchronizedList(new ArrayList<String>()); |
| for (final String file : filesToLoad) { |
| results.add(threadPool.submit(new Callable<List<String>>() { |
| @Override |
| public List<String> call() { |
| List<String> failures = new ArrayList<String>(); |
| ClientService.Iface client = null; |
| String server = null; |
| try { |
| // get a connection to a random tablet server, do not prefer cached connections because |
| // this is running on the master and there are lots of connections to tablet servers |
| // serving the !METADATA tablets |
| long timeInMillis = ServerConfiguration.getSystemConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT); |
| Pair<String,Iface> pair = ServerClient.getConnection(master.getInstance(), false, timeInMillis); |
| client = pair.getSecond(); |
| server = pair.getFirst(); |
| List<String> attempt = Collections.singletonList(file); |
| log.debug("Asking " + pair.getFirst() + " to bulk import " + file); |
| List<String> fail = client.bulkImportFiles(null, SecurityConstants.getSystemCredentials(), tid, tableId, attempt, errorDir, setTime); |
| if (fail.isEmpty()) { |
| loaded.add(file); |
| } else { |
| failures.addAll(fail); |
| } |
| } catch (Exception ex) { |
| log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex, ex); |
| } finally { |
| ServerClient.close(client); |
| } |
| return failures; |
| } |
| })); |
| } |
| Set<String> failures = new HashSet<String>(); |
| for (Future<List<String>> f : results) |
| failures.addAll(f.get()); |
| filesToLoad.removeAll(loaded); |
| if (filesToLoad.size() > 0) { |
| log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad, 10) + " failed"); |
| UtilWaitThread.sleep(100); |
| } |
| } |
| |
| FSDataOutputStream failFile = fs.create(new Path(errorDir, "failures.txt"), true); |
| BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile)); |
| try { |
| for (String f : filesToLoad) { |
| out.write(f); |
| out.write("\n"); |
| } |
| } finally { |
| out.close(); |
| } |
| |
| // return the next step, which will perform cleanup |
| return new CompleteBulkImport(tableId, source, bulk, errorDir); |
| } |
| |
| static String sampleList(Collection<?> potentiallyLongList, int max) { |
| StringBuffer result = new StringBuffer(); |
| result.append("["); |
| int i = 0; |
| for (Object obj : potentiallyLongList) { |
| result.append(obj); |
| if (i >= max) { |
| result.append("..."); |
| break; |
| } else { |
| result.append(", "); |
| } |
| i++; |
| } |
| if (i < max) |
| result.delete(result.length() - 2, result.length()); |
| result.append("]"); |
| return result.toString(); |
| } |
| |
| } |