| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.accumulo.core.client.impl; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static java.nio.charset.StandardCharsets.UTF_8; |
| import static java.util.Objects.requireNonNull; |
| import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| import static java.util.concurrent.TimeUnit.SECONDS; |
| import static java.util.stream.Collectors.toSet; |
| import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; |
| |
| import java.io.BufferedReader; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.io.InputStreamReader; |
| import java.nio.ByteBuffer; |
| import java.security.SecureRandom; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Objects; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.SortedSet; |
| import java.util.TreeMap; |
| import java.util.TreeSet; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.function.Predicate; |
| import java.util.regex.Pattern; |
| import java.util.stream.Collectors; |
| import java.util.zip.ZipEntry; |
| import java.util.zip.ZipInputStream; |
| |
| import org.apache.accumulo.core.Constants; |
| import org.apache.accumulo.core.client.AccumuloException; |
| import org.apache.accumulo.core.client.AccumuloSecurityException; |
| import org.apache.accumulo.core.client.IsolatedScanner; |
| import org.apache.accumulo.core.client.IteratorSetting; |
| import org.apache.accumulo.core.client.NamespaceExistsException; |
| import org.apache.accumulo.core.client.NamespaceNotFoundException; |
| import org.apache.accumulo.core.client.RowIterator; |
| import org.apache.accumulo.core.client.Scanner; |
| import org.apache.accumulo.core.client.TableDeletedException; |
| import org.apache.accumulo.core.client.TableExistsException; |
| import org.apache.accumulo.core.client.TableNotFoundException; |
| import org.apache.accumulo.core.client.TableOfflineException; |
| import org.apache.accumulo.core.client.admin.CompactionConfig; |
| import org.apache.accumulo.core.client.admin.DiskUsage; |
| import org.apache.accumulo.core.client.admin.FindMax; |
| import org.apache.accumulo.core.client.admin.Locations; |
| import org.apache.accumulo.core.client.admin.NewTableConfiguration; |
| import org.apache.accumulo.core.client.admin.SummaryRetriever; |
| import org.apache.accumulo.core.client.admin.TableOperations; |
| import org.apache.accumulo.core.client.admin.TimeType; |
| import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation; |
| import org.apache.accumulo.core.client.impl.thrift.ClientService.Client; |
| import org.apache.accumulo.core.client.impl.thrift.TDiskUsage; |
| import org.apache.accumulo.core.client.impl.thrift.ThriftNotActiveServiceException; |
| import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; |
| import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; |
| import org.apache.accumulo.core.client.sample.SamplerConfiguration; |
| import org.apache.accumulo.core.client.summary.SummarizerConfiguration; |
| import org.apache.accumulo.core.client.summary.Summary; |
| import org.apache.accumulo.core.conf.AccumuloConfiguration; |
| import org.apache.accumulo.core.conf.ConfigurationCopy; |
| import org.apache.accumulo.core.conf.Property; |
| import org.apache.accumulo.core.constraints.Constraint; |
| import org.apache.accumulo.core.data.ByteSequence; |
| import org.apache.accumulo.core.data.Key; |
| import org.apache.accumulo.core.data.Range; |
| import org.apache.accumulo.core.data.TabletId; |
| import org.apache.accumulo.core.data.Value; |
| import org.apache.accumulo.core.data.impl.KeyExtent; |
| import org.apache.accumulo.core.data.impl.TabletIdImpl; |
| import org.apache.accumulo.core.data.thrift.TRowRange; |
| import org.apache.accumulo.core.data.thrift.TSummaries; |
| import org.apache.accumulo.core.data.thrift.TSummarizerConfiguration; |
| import org.apache.accumulo.core.data.thrift.TSummaryRequest; |
| import org.apache.accumulo.core.iterators.IteratorUtil; |
| import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; |
| import org.apache.accumulo.core.iterators.SortedKeyValueIterator; |
| import org.apache.accumulo.core.master.state.tables.TableState; |
| import org.apache.accumulo.core.master.thrift.FateOperation; |
| import org.apache.accumulo.core.master.thrift.MasterClientService; |
| import org.apache.accumulo.core.metadata.MetadataServicer; |
| import org.apache.accumulo.core.metadata.MetadataTable; |
| import org.apache.accumulo.core.metadata.RootTable; |
| import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; |
| import org.apache.accumulo.core.rpc.ThriftUtil; |
| import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; |
| import org.apache.accumulo.core.security.Authorizations; |
| import org.apache.accumulo.core.summary.SummarizerConfigurationUtil; |
| import org.apache.accumulo.core.summary.SummaryCollection; |
| import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; |
| import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; |
| import org.apache.accumulo.core.trace.Tracer; |
| import org.apache.accumulo.core.util.CachedConfiguration; |
| import org.apache.accumulo.core.util.HostAndPort; |
| import org.apache.accumulo.core.util.LocalityGroupUtil; |
| import org.apache.accumulo.core.util.MapCounter; |
| import org.apache.accumulo.core.util.NamingThreadFactory; |
| import org.apache.accumulo.core.util.OpTimer; |
| import org.apache.accumulo.core.util.Pair; |
| import org.apache.accumulo.core.util.TextUtil; |
| import org.apache.accumulo.core.volume.VolumeConfiguration; |
| import org.apache.accumulo.fate.util.Retry; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.Text; |
| import org.apache.thrift.TApplicationException; |
| import org.apache.thrift.TException; |
| import org.apache.thrift.transport.TTransportException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.base.Joiner; |
| import com.google.common.base.Preconditions; |
| |
| public class TableOperationsImpl extends TableOperationsHelper { |
| |
| public static final String CLONE_EXCLUDE_PREFIX = "!"; |
| private static final Logger log = LoggerFactory.getLogger(TableOperations.class); |
| private final ClientContext context; |
| |
| public TableOperationsImpl(ClientContext context) { |
| checkArgument(context != null, "context is null"); |
| this.context = context; |
| } |
| |
| @Override |
| public SortedSet<String> list() { |
| |
| OpTimer timer = null; |
| |
| if (log.isTraceEnabled()) { |
| log.trace("tid={} Fetching list of tables...", Thread.currentThread().getId()); |
| timer = new OpTimer().start(); |
| } |
| |
| TreeSet<String> tableNames = new TreeSet<>(Tables.getNameToIdMap(context).keySet()); |
| |
| if (timer != null) { |
| timer.stop(); |
| log.trace("tid={} Fetched {} table names in {}", Thread.currentThread().getId(), |
| tableNames.size(), String.format("%.3f secs", timer.scale(TimeUnit.SECONDS))); |
| } |
| |
| return tableNames; |
| } |
| |
| @Override |
| public boolean exists(String tableName) { |
| checkArgument(tableName != null, "tableName is null"); |
| if (tableName.equals(MetadataTable.NAME) || tableName.equals(RootTable.NAME)) |
| return true; |
| |
| OpTimer timer = null; |
| |
| if (log.isTraceEnabled()) { |
| log.trace("tid={} Checking if table {} exists...", Thread.currentThread().getId(), tableName); |
| timer = new OpTimer().start(); |
| } |
| |
| boolean exists = Tables.getNameToIdMap(context).containsKey(tableName); |
| |
| if (timer != null) { |
| timer.stop(); |
| log.trace("tid={} Checked existance of {} in {}", Thread.currentThread().getId(), exists, |
| String.format("%.3f secs", timer.scale(TimeUnit.SECONDS))); |
| } |
| |
| return exists; |
| } |
| |
| @Override |
| public void create(String tableName) |
| throws AccumuloException, AccumuloSecurityException, TableExistsException { |
| create(tableName, new NewTableConfiguration()); |
| } |
| |
| @Override |
| @Deprecated |
| public void create(String tableName, boolean limitVersion) |
| throws AccumuloException, AccumuloSecurityException, TableExistsException { |
| create(tableName, limitVersion, TimeType.MILLIS); |
| } |
| |
| @Override |
| @Deprecated |
| public void create(String tableName, boolean limitVersion, TimeType timeType) |
| throws AccumuloException, AccumuloSecurityException, TableExistsException { |
| checkArgument(tableName != null, "tableName is null"); |
| checkArgument(timeType != null, "timeType is null"); |
| |
| NewTableConfiguration ntc = new NewTableConfiguration().setTimeType(timeType); |
| |
| if (limitVersion) |
| create(tableName, ntc); |
| else |
| create(tableName, ntc.withoutDefaultIterators()); |
| } |
| |
| @Override |
| public void create(String tableName, NewTableConfiguration ntc) |
| throws AccumuloException, AccumuloSecurityException, TableExistsException { |
| checkArgument(tableName != null, "tableName is null"); |
| checkArgument(ntc != null, "ntc is null"); |
| |
| List<ByteBuffer> args = new ArrayList<>(); |
| args.add(ByteBuffer.wrap(tableName.getBytes(UTF_8))); |
| args.add(ByteBuffer.wrap(ntc.getTimeType().name().getBytes(UTF_8))); |
| // Send info relating to initial table creation i.e, create online or offline |
| args.add(ByteBuffer.wrap(ntc.getInitialTableState().name().getBytes(UTF_8))); |
| // Check for possible initial splits to be added at table creation |
| // Always send number of initial splits to be created, even if zero. If greater than zero, |
| // add the splits to the argument List which will be used by the FATE operations. |
| int numSplits = ntc.getSplits().size(); |
| args.add(ByteBuffer.wrap(String.valueOf(numSplits).getBytes(UTF_8))); |
| if (numSplits > 0) { |
| for (Text t : ntc.getSplits()) { |
| args.add(TextUtil.getByteBuffer(t)); |
| } |
| } |
| |
| Map<String,String> opts = ntc.getProperties(); |
| |
| try { |
| doTableFateOperation(tableName, AccumuloException.class, FateOperation.TABLE_CREATE, args, |
| opts); |
| } catch (TableNotFoundException e) { |
| // should not happen |
| throw new AssertionError(e); |
| } |
| } |
| |
| private long beginFateOperation() throws ThriftSecurityException, TException { |
| while (true) { |
| MasterClientService.Iface client = null; |
| try { |
| client = MasterClient.getConnectionWithRetry(context); |
| return client.beginFateOperation(Tracer.traceInfo(), context.rpcCreds()); |
| } catch (TTransportException tte) { |
| log.debug("Failed to call beginFateOperation(), retrying ... ", tte); |
| sleepUninterruptibly(100, TimeUnit.MILLISECONDS); |
| } catch (ThriftNotActiveServiceException e) { |
| // Let it loop, fetching a new location |
| log.debug("Contacted a Master which is no longer active, retrying"); |
| sleepUninterruptibly(100, TimeUnit.MILLISECONDS); |
| } finally { |
| MasterClient.close(client); |
| } |
| } |
| } |
| |
| // This method is for retrying in the case of network failures; anything else it passes to the |
| // caller to deal with |
| private void executeFateOperation(long opid, FateOperation op, List<ByteBuffer> args, |
| Map<String,String> opts, boolean autoCleanUp) |
| throws ThriftSecurityException, TException, ThriftTableOperationException { |
| while (true) { |
| MasterClientService.Iface client = null; |
| try { |
| client = MasterClient.getConnectionWithRetry(context); |
| client.executeFateOperation(Tracer.traceInfo(), context.rpcCreds(), opid, op, args, opts, |
| autoCleanUp); |
| return; |
| } catch (TTransportException tte) { |
| log.debug("Failed to call executeFateOperation(), retrying ... ", tte); |
| sleepUninterruptibly(100, TimeUnit.MILLISECONDS); |
| } catch (ThriftNotActiveServiceException e) { |
| // Let it loop, fetching a new location |
| log.debug("Contacted a Master which is no longer active, retrying"); |
| sleepUninterruptibly(100, TimeUnit.MILLISECONDS); |
| } finally { |
| MasterClient.close(client); |
| } |
| } |
| } |
| |
| private String waitForFateOperation(long opid) |
| throws ThriftSecurityException, TException, ThriftTableOperationException { |
| while (true) { |
| MasterClientService.Iface client = null; |
| try { |
| client = MasterClient.getConnectionWithRetry(context); |
| return client.waitForFateOperation(Tracer.traceInfo(), context.rpcCreds(), opid); |
| } catch (TTransportException tte) { |
| log.debug("Failed to call waitForFateOperation(), retrying ... ", tte); |
| sleepUninterruptibly(100, TimeUnit.MILLISECONDS); |
| } catch (ThriftNotActiveServiceException e) { |
| // Let it loop, fetching a new location |
| log.debug("Contacted a Master which is no longer active, retrying"); |
| sleepUninterruptibly(100, TimeUnit.MILLISECONDS); |
| } finally { |
| MasterClient.close(client); |
| } |
| } |
| } |
| |
| private void finishFateOperation(long opid) throws ThriftSecurityException, TException { |
| while (true) { |
| MasterClientService.Iface client = null; |
| try { |
| client = MasterClient.getConnectionWithRetry(context); |
| client.finishFateOperation(Tracer.traceInfo(), context.rpcCreds(), opid); |
| break; |
| } catch (TTransportException tte) { |
| log.debug("Failed to call finishFateOperation(), retrying ... ", tte); |
| sleepUninterruptibly(100, TimeUnit.MILLISECONDS); |
| } catch (ThriftNotActiveServiceException e) { |
| // Let it loop, fetching a new location |
| log.debug("Contacted a Master which is no longer active, retrying"); |
| sleepUninterruptibly(100, TimeUnit.MILLISECONDS); |
| } finally { |
| MasterClient.close(client); |
| } |
| } |
| } |
| |
| String doFateOperation(FateOperation op, List<ByteBuffer> args, Map<String,String> opts, |
| String tableOrNamespaceName) |
| throws AccumuloSecurityException, TableExistsException, TableNotFoundException, |
| AccumuloException, NamespaceExistsException, NamespaceNotFoundException { |
| return doFateOperation(op, args, opts, tableOrNamespaceName, true); |
| } |
| |
| String doFateOperation(FateOperation op, List<ByteBuffer> args, Map<String,String> opts, |
| String tableOrNamespaceName, boolean wait) |
| throws AccumuloSecurityException, TableExistsException, TableNotFoundException, |
| AccumuloException, NamespaceExistsException, NamespaceNotFoundException { |
| Long opid = null; |
| |
| try { |
| opid = beginFateOperation(); |
| executeFateOperation(opid, op, args, opts, !wait); |
| if (!wait) { |
| opid = null; |
| return null; |
| } |
| return waitForFateOperation(opid); |
| } catch (ThriftSecurityException e) { |
| switch (e.getCode()) { |
| case TABLE_DOESNT_EXIST: |
| throw new TableNotFoundException(null, tableOrNamespaceName, |
| "Target table does not exist"); |
| case NAMESPACE_DOESNT_EXIST: |
| throw new NamespaceNotFoundException(null, tableOrNamespaceName, |
| "Target namespace does not exist"); |
| default: |
| String tableInfo = Tables.getPrintableTableInfoFromName(context, tableOrNamespaceName); |
| throw new AccumuloSecurityException(e.user, e.code, tableInfo, e); |
| } |
| } catch (ThriftTableOperationException e) { |
| switch (e.getType()) { |
| case EXISTS: |
| throw new TableExistsException(e); |
| case NOTFOUND: |
| throw new TableNotFoundException(e); |
| case NAMESPACE_EXISTS: |
| throw new NamespaceExistsException(e); |
| case NAMESPACE_NOTFOUND: |
| throw new NamespaceNotFoundException(e); |
| case OFFLINE: |
| throw new TableOfflineException( |
| Tables.getTableOfflineMsg(context, Tables.getTableId(context, tableOrNamespaceName))); |
| default: |
| throw new AccumuloException(e.description, e); |
| } |
| } catch (Exception e) { |
| throw new AccumuloException(e.getMessage(), e); |
| } finally { |
| Tables.clearCache(context); |
| // always finish table op, even when exception |
| if (opid != null) |
| try { |
| finishFateOperation(opid); |
| } catch (Exception e) { |
| log.warn("Exception thrown while finishing fate table operation", e); |
| } |
| } |
| } |
| |
| private static class SplitEnv { |
| private String tableName; |
| private Table.ID tableId; |
| private ExecutorService executor; |
| private CountDownLatch latch; |
| private AtomicReference<Throwable> exception; |
| |
| SplitEnv(String tableName, Table.ID tableId, ExecutorService executor, CountDownLatch latch, |
| AtomicReference<Throwable> exception) { |
| this.tableName = tableName; |
| this.tableId = tableId; |
| this.executor = executor; |
| this.latch = latch; |
| this.exception = exception; |
| } |
| } |
| |
| private class SplitTask implements Runnable { |
| |
| private List<Text> splits; |
| private SplitEnv env; |
| |
| SplitTask(SplitEnv env, List<Text> splits) { |
| this.env = env; |
| this.splits = splits; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| if (env.exception.get() != null) |
| return; |
| |
| if (splits.size() <= 2) { |
| addSplits(env.tableName, new TreeSet<>(splits), env.tableId); |
| for (int i = 0; i < splits.size(); i++) |
| env.latch.countDown(); |
| return; |
| } |
| |
| int mid = splits.size() / 2; |
| |
| // split the middle split point to ensure that child task split different tablets and can |
| // therefore |
| // run in parallel |
| addSplits(env.tableName, new TreeSet<>(splits.subList(mid, mid + 1)), env.tableId); |
| env.latch.countDown(); |
| |
| env.executor.execute(new SplitTask(env, splits.subList(0, mid))); |
| env.executor.execute(new SplitTask(env, splits.subList(mid + 1, splits.size()))); |
| |
| } catch (Throwable t) { |
| env.exception.compareAndSet(null, t); |
| } |
| } |
| |
| } |
| |
| @Override |
| public void addSplits(String tableName, SortedSet<Text> partitionKeys) |
| throws TableNotFoundException, AccumuloException, AccumuloSecurityException { |
| Table.ID tableId = Tables.getTableId(context, tableName); |
| |
| List<Text> splits = new ArrayList<>(partitionKeys); |
| // should be sorted because we copied from a sorted set, but that makes assumptions about |
| // how the copy was done so resort to be sure. |
| Collections.sort(splits); |
| |
| CountDownLatch latch = new CountDownLatch(splits.size()); |
| AtomicReference<Throwable> exception = new AtomicReference<>(null); |
| |
| ExecutorService executor = Executors.newFixedThreadPool(16, |
| new NamingThreadFactory("addSplits")); |
| try { |
| executor.execute( |
| new SplitTask(new SplitEnv(tableName, tableId, executor, latch, exception), splits)); |
| |
| while (!latch.await(100, TimeUnit.MILLISECONDS)) { |
| if (exception.get() != null) { |
| executor.shutdownNow(); |
| Throwable excep = exception.get(); |
| // Below all exceptions are wrapped and rethrown. This is done so that the user knows what |
| // code path got them here. If the wrapping was not done, the |
| // user would only have the stack trace for the background thread. |
| if (excep instanceof TableNotFoundException) { |
| TableNotFoundException tnfe = (TableNotFoundException) excep; |
| throw new TableNotFoundException(tableId.canonicalID(), tableName, |
| "Table not found by background thread", tnfe); |
| } else if (excep instanceof TableOfflineException) { |
| log.debug("TableOfflineException occurred in background thread. Throwing new exception", |
| excep); |
| throw new TableOfflineException(Tables.getTableOfflineMsg(context, tableId)); |
| } else if (excep instanceof AccumuloSecurityException) { |
| // base == background accumulo security exception |
| AccumuloSecurityException base = (AccumuloSecurityException) excep; |
| throw new AccumuloSecurityException(base.getUser(), base.asThriftException().getCode(), |
| base.getTableInfo(), excep); |
| } else if (excep instanceof AccumuloServerException) { |
| throw new AccumuloServerException((AccumuloServerException) excep); |
| } else if (excep instanceof Error) { |
| throw new Error(excep); |
| } else { |
| throw new AccumuloException(excep); |
| } |
| } |
| } |
| } catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } finally { |
| executor.shutdown(); |
| } |
| } |
| |
| private void addSplits(String tableName, SortedSet<Text> partitionKeys, Table.ID tableId) |
| throws AccumuloException, AccumuloSecurityException, TableNotFoundException, |
| AccumuloServerException { |
| TabletLocator tabLocator = TabletLocator.getLocator(context, tableId); |
| |
| for (Text split : partitionKeys) { |
| boolean successful = false; |
| int attempt = 0; |
| long locationFailures = 0; |
| |
| while (!successful) { |
| |
| if (attempt > 0) |
| sleepUninterruptibly(100, TimeUnit.MILLISECONDS); |
| |
| attempt++; |
| |
| TabletLocation tl = tabLocator.locateTablet(context, split, false, false); |
| |
| if (tl == null) { |
| if (!Tables.exists(context, tableId)) |
| throw new TableNotFoundException(tableId.canonicalID(), tableName, null); |
| else if (Tables.getTableState(context, tableId) == TableState.OFFLINE) |
| throw new TableOfflineException(Tables.getTableOfflineMsg(context, tableId)); |
| continue; |
| } |
| |
| HostAndPort address = HostAndPort.fromString(tl.tablet_location); |
| |
| try { |
| TabletClientService.Client client = ThriftUtil.getTServerClient(address, context); |
| try { |
| |
| OpTimer timer = null; |
| |
| if (log.isTraceEnabled()) { |
| log.trace("tid={} Splitting tablet {} on {} at {}", Thread.currentThread().getId(), |
| tl.tablet_extent, address, split); |
| timer = new OpTimer().start(); |
| } |
| |
| client.splitTablet(Tracer.traceInfo(), context.rpcCreds(), tl.tablet_extent.toThrift(), |
| TextUtil.getByteBuffer(split)); |
| |
| // just split it, might as well invalidate it in the cache |
| tabLocator.invalidateCache(tl.tablet_extent); |
| |
| if (timer != null) { |
| timer.stop(); |
| log.trace("Split tablet in {}", |
| String.format("%.3f secs", timer.scale(TimeUnit.SECONDS))); |
| } |
| |
| } finally { |
| ThriftUtil.returnClient(client); |
| } |
| |
| } catch (TApplicationException tae) { |
| throw new AccumuloServerException(address.toString(), tae); |
| } catch (TTransportException e) { |
| tabLocator.invalidateCache(context, tl.tablet_location); |
| continue; |
| } catch (ThriftSecurityException e) { |
| Tables.clearCache(context); |
| if (!Tables.exists(context, tableId)) |
| throw new TableNotFoundException(tableId.canonicalID(), tableName, null); |
| throw new AccumuloSecurityException(e.user, e.code, e); |
| } catch (NotServingTabletException e) { |
| // Do not silently spin when we repeatedly fail to get the location for a tablet |
| locationFailures++; |
| if (5 == locationFailures || 0 == locationFailures % 50) { |
| log.warn("Having difficulty locating hosting tabletserver for split {} on table {}." |
| + " Seen {} failures.", split, tableName, locationFailures); |
| } |
| |
| tabLocator.invalidateCache(tl.tablet_extent); |
| continue; |
| } catch (TException e) { |
| tabLocator.invalidateCache(context, tl.tablet_location); |
| continue; |
| } |
| |
| successful = true; |
| } |
| } |
| } |
| |
| @Override |
| public void merge(String tableName, Text start, Text end) |
| throws AccumuloException, AccumuloSecurityException, TableNotFoundException { |
| |
| checkArgument(tableName != null, "tableName is null"); |
| ByteBuffer EMPTY = ByteBuffer.allocate(0); |
| List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes(UTF_8)), |
| start == null ? EMPTY : TextUtil.getByteBuffer(start), |
| end == null ? EMPTY : TextUtil.getByteBuffer(end)); |
| Map<String,String> opts = new HashMap<>(); |
| try { |
| doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_MERGE, args, |
| opts); |
| } catch (TableExistsException e) { |
| // should not happen |
| throw new AssertionError(e); |
| } |
| } |
| |
| @Override |
| public void deleteRows(String tableName, Text start, Text end) |
| throws AccumuloException, AccumuloSecurityException, TableNotFoundException { |
| |
| checkArgument(tableName != null, "tableName is null"); |
| ByteBuffer EMPTY = ByteBuffer.allocate(0); |
| List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes(UTF_8)), |
| start == null ? EMPTY : TextUtil.getByteBuffer(start), |
| end == null ? EMPTY : TextUtil.getByteBuffer(end)); |
| Map<String,String> opts = new HashMap<>(); |
| try { |
| doTableFateOperation(tableName, TableNotFoundException.class, |
| FateOperation.TABLE_DELETE_RANGE, args, opts); |
| } catch (TableExistsException e) { |
| // should not happen |
| throw new AssertionError(e); |
| } |
| } |
| |
| @Override |
| public Collection<Text> listSplits(String tableName) |
| throws TableNotFoundException, AccumuloSecurityException { |
| |
| checkArgument(tableName != null, "tableName is null"); |
| |
| Table.ID tableId = Tables.getTableId(context, tableName); |
| |
| TreeMap<KeyExtent,String> tabletLocations = new TreeMap<>(); |
| |
| while (true) { |
| try { |
| tabletLocations.clear(); |
| // the following method throws AccumuloException for some conditions that should be retried |
| MetadataServicer.forTableId(context, tableId).getTabletLocations(tabletLocations); |
| break; |
| } catch (AccumuloSecurityException ase) { |
| throw ase; |
| } catch (Exception e) { |
| if (!Tables.exists(context, tableId)) { |
| throw new TableNotFoundException(tableId.canonicalID(), tableName, null); |
| } |
| |
| if (e instanceof RuntimeException && e.getCause() instanceof AccumuloSecurityException) { |
| throw (AccumuloSecurityException) e.getCause(); |
| } |
| |
| log.info("{} ... retrying ...", e.getMessage()); |
| sleepUninterruptibly(3, TimeUnit.SECONDS); |
| } |
| } |
| |
| ArrayList<Text> endRows = new ArrayList<>(tabletLocations.size()); |
| |
| for (KeyExtent ke : tabletLocations.keySet()) |
| if (ke.getEndRow() != null) |
| endRows.add(ke.getEndRow()); |
| |
| return endRows; |
| } |
| |
| @Deprecated |
| @Override |
| public Collection<Text> getSplits(String tableName) throws TableNotFoundException { |
| try { |
| return listSplits(tableName); |
| } catch (AccumuloSecurityException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public Collection<Text> listSplits(String tableName, int maxSplits) |
| throws TableNotFoundException, AccumuloSecurityException { |
| Collection<Text> endRows = listSplits(tableName); |
| |
| if (endRows.size() <= maxSplits) |
| return endRows; |
| |
| double r = (maxSplits + 1) / (double) (endRows.size()); |
| double pos = 0; |
| |
| ArrayList<Text> subset = new ArrayList<>(maxSplits); |
| |
| int j = 0; |
| for (int i = 0; i < endRows.size() && j < maxSplits; i++) { |
| pos += r; |
| while (pos > 1) { |
| subset.add(((ArrayList<Text>) endRows).get(i)); |
| j++; |
| pos -= 1; |
| } |
| } |
| |
| return subset; |
| } |
| |
| @Deprecated |
| @Override |
| public Collection<Text> getSplits(String tableName, int maxSplits) throws TableNotFoundException { |
| try { |
| return listSplits(tableName, maxSplits); |
| } catch (AccumuloSecurityException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public void delete(String tableName) |
| throws AccumuloException, AccumuloSecurityException, TableNotFoundException { |
| checkArgument(tableName != null, "tableName is null"); |
| |
| List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes(UTF_8))); |
| Map<String,String> opts = new HashMap<>(); |
| |
| try { |
| doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_DELETE, |
| args, opts); |
| } catch (TableExistsException e) { |
| // should not happen |
| throw new AssertionError(e); |
| } |
| |
| } |
| |
| @Override |
| public void clone(String srcTableName, String newTableName, boolean flush, |
| Map<String,String> propertiesToSet, Set<String> propertiesToExclude) |
| throws AccumuloSecurityException, TableNotFoundException, AccumuloException, |
| TableExistsException { |
| |
| checkArgument(srcTableName != null, "srcTableName is null"); |
| checkArgument(newTableName != null, "newTableName is null"); |
| |
| Table.ID srcTableId = Tables.getTableId(context, srcTableName); |
| |
| if (flush) |
| _flush(srcTableId, null, null, true); |
| |
| if (propertiesToExclude == null) |
| propertiesToExclude = Collections.emptySet(); |
| |
| if (propertiesToSet == null) |
| propertiesToSet = Collections.emptyMap(); |
| |
| List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(srcTableId.getUtf8()), |
| ByteBuffer.wrap(newTableName.getBytes(UTF_8))); |
| Map<String,String> opts = new HashMap<>(); |
| for (Entry<String,String> entry : propertiesToSet.entrySet()) { |
| if (entry.getKey().startsWith(CLONE_EXCLUDE_PREFIX)) |
| throw new IllegalArgumentException("Property can not start with " + CLONE_EXCLUDE_PREFIX); |
| opts.put(entry.getKey(), entry.getValue()); |
| } |
| |
| for (String prop : propertiesToExclude) { |
| opts.put(CLONE_EXCLUDE_PREFIX + prop, ""); |
| } |
| |
| doTableFateOperation(newTableName, AccumuloException.class, FateOperation.TABLE_CLONE, args, |
| opts); |
| } |
| |
| @Override |
| public void rename(String oldTableName, String newTableName) throws AccumuloSecurityException, |
| TableNotFoundException, AccumuloException, TableExistsException { |
| |
| List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(oldTableName.getBytes(UTF_8)), |
| ByteBuffer.wrap(newTableName.getBytes(UTF_8))); |
| Map<String,String> opts = new HashMap<>(); |
| doTableFateOperation(oldTableName, TableNotFoundException.class, FateOperation.TABLE_RENAME, |
| args, opts); |
| } |
| |
| @Override |
| @Deprecated |
| public void flush(String tableName) throws AccumuloException, AccumuloSecurityException { |
| try { |
| flush(tableName, null, null, false); |
| } catch (TableNotFoundException e) { |
| throw new AccumuloException(e.getMessage(), e); |
| } |
| } |
| |
| @Override |
| public void flush(String tableName, Text start, Text end, boolean wait) |
| throws AccumuloException, AccumuloSecurityException, TableNotFoundException { |
| checkArgument(tableName != null, "tableName is null"); |
| |
| Table.ID tableId = Tables.getTableId(context, tableName); |
| _flush(tableId, start, end, wait); |
| } |
| |
| @Override |
| public void compact(String tableName, Text start, Text end, boolean flush, boolean wait) |
| throws AccumuloSecurityException, TableNotFoundException, AccumuloException { |
| compact(tableName, start, end, new ArrayList<>(), flush, wait); |
| } |
| |
| @Override |
| public void compact(String tableName, Text start, Text end, List<IteratorSetting> iterators, |
| boolean flush, boolean wait) |
| throws AccumuloSecurityException, TableNotFoundException, AccumuloException { |
| compact(tableName, new CompactionConfig().setStartRow(start).setEndRow(end) |
| .setIterators(iterators).setFlush(flush).setWait(wait)); |
| } |
| |
| @Override |
| public void compact(String tableName, CompactionConfig config) |
| throws AccumuloSecurityException, TableNotFoundException, AccumuloException { |
| checkArgument(tableName != null, "tableName is null"); |
| ByteBuffer EMPTY = ByteBuffer.allocate(0); |
| |
| // Ensure compaction iterators exist on a tabletserver |
| final String skviName = SortedKeyValueIterator.class.getName(); |
| for (IteratorSetting setting : config.getIterators()) { |
| String iteratorClass = setting.getIteratorClass(); |
| if (!testClassLoad(tableName, iteratorClass, skviName)) { |
| throw new AccumuloException("TabletServer could not load iterator class " + iteratorClass); |
| } |
| } |
| |
| // Make sure the specified compaction strategy exists on a tabletserver |
| final String compactionStrategyName = config.getCompactionStrategy().getClassName(); |
| if (!CompactionStrategyConfigUtil.DEFAULT_STRATEGY.getClassName() |
| .equals(compactionStrategyName)) { |
| if (!testClassLoad(tableName, compactionStrategyName, |
| "org.apache.accumulo.tserver.compaction.CompactionStrategy")) { |
| throw new AccumuloException( |
| "TabletServer could not load CompactionStrategy class " + compactionStrategyName); |
| } |
| } |
| |
| Table.ID tableId = Tables.getTableId(context, tableName); |
| |
| Text start = config.getStartRow(); |
| Text end = config.getEndRow(); |
| |
| if (config.getFlush()) |
| _flush(tableId, start, end, true); |
| |
| List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getUtf8()), |
| start == null ? EMPTY : TextUtil.getByteBuffer(start), |
| end == null ? EMPTY : TextUtil.getByteBuffer(end), |
| ByteBuffer.wrap(IteratorUtil.encodeIteratorSettings(config.getIterators())), |
| ByteBuffer.wrap(CompactionStrategyConfigUtil.encode(config.getCompactionStrategy()))); |
| |
| Map<String,String> opts = new HashMap<>(); |
| try { |
| doFateOperation(FateOperation.TABLE_COMPACT, args, opts, tableName, config.getWait()); |
| } catch (TableExistsException | NamespaceExistsException e) { |
| // should not happen |
| throw new AssertionError(e); |
| } catch (NamespaceNotFoundException e) { |
| throw new TableNotFoundException(null, tableName, "Namespace not found", e); |
| } |
| } |
| |
| @Override |
| public void cancelCompaction(String tableName) |
| throws AccumuloSecurityException, TableNotFoundException, AccumuloException { |
| Table.ID tableId = Tables.getTableId(context, tableName); |
| |
| List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getUtf8())); |
| |
| Map<String,String> opts = new HashMap<>(); |
| try { |
| doTableFateOperation(tableName, TableNotFoundException.class, |
| FateOperation.TABLE_CANCEL_COMPACT, args, opts); |
| } catch (TableExistsException e) { |
| // should not happen |
| throw new AssertionError(e); |
| } |
| |
| } |
| |
| private void _flush(Table.ID tableId, Text start, Text end, boolean wait) |
| throws AccumuloException, AccumuloSecurityException, TableNotFoundException { |
| |
| try { |
| long flushID; |
| |
| // used to pass the table name. but the tableid associated with a table name could change |
| // between calls. |
| // so pass the tableid to both calls |
| |
| while (true) { |
| MasterClientService.Iface client = null; |
| try { |
| client = MasterClient.getConnectionWithRetry(context); |
| flushID = client.initiateFlush(Tracer.traceInfo(), context.rpcCreds(), |
| tableId.canonicalID()); |
| break; |
| } catch (TTransportException tte) { |
| log.debug("Failed to call initiateFlush, retrying ... ", tte); |
| sleepUninterruptibly(100, TimeUnit.MILLISECONDS); |
| } catch (ThriftNotActiveServiceException e) { |
| // Let it loop, fetching a new location |
| log.debug("Contacted a Master which is no longer active, retrying"); |
| sleepUninterruptibly(100, TimeUnit.MILLISECONDS); |
| } finally { |
| MasterClient.close(client); |
| } |
| } |
| |
| while (true) { |
| MasterClientService.Iface client = null; |
| try { |
| client = MasterClient.getConnectionWithRetry(context); |
| client.waitForFlush(Tracer.traceInfo(), context.rpcCreds(), tableId.canonicalID(), |
| TextUtil.getByteBuffer(start), TextUtil.getByteBuffer(end), flushID, |
| wait ? Long.MAX_VALUE : 1); |
| break; |
| } catch (TTransportException tte) { |
| log.debug("Failed to call initiateFlush, retrying ... ", tte); |
| sleepUninterruptibly(100, TimeUnit.MILLISECONDS); |
| } catch (ThriftNotActiveServiceException e) { |
| // Let it loop, fetching a new location |
| log.debug("Contacted a Master which is no longer active, retrying"); |
| sleepUninterruptibly(100, TimeUnit.MILLISECONDS); |
| } finally { |
| MasterClient.close(client); |
| } |
| } |
| } catch (ThriftSecurityException e) { |
| switch (e.getCode()) { |
| case TABLE_DOESNT_EXIST: |
| throw new TableNotFoundException(tableId.canonicalID(), null, e.getMessage(), e); |
| default: |
| log.debug("flush security exception on table id {}", tableId); |
| throw new AccumuloSecurityException(e.user, e.code, e); |
| } |
| } catch (ThriftTableOperationException e) { |
| switch (e.getType()) { |
| case NOTFOUND: |
| throw new TableNotFoundException(e); |
| default: |
| throw new AccumuloException(e.description, e); |
| } |
| } catch (Exception e) { |
| throw new AccumuloException(e); |
| } |
| } |
| |
| @Override |
| public void setProperty(final String tableName, final String property, final String value) |
| throws AccumuloException, AccumuloSecurityException { |
| checkArgument(tableName != null, "tableName is null"); |
| checkArgument(property != null, "property is null"); |
| checkArgument(value != null, "value is null"); |
| try { |
| MasterClient.executeTable(context, client -> client.setTableProperty(Tracer.traceInfo(), |
| context.rpcCreds(), tableName, property, value)); |
| } catch (TableNotFoundException e) { |
| throw new AccumuloException(e); |
| } |
| } |
| |
| @Override |
| public void removeProperty(final String tableName, final String property) |
| throws AccumuloException, AccumuloSecurityException { |
| checkArgument(tableName != null, "tableName is null"); |
| checkArgument(property != null, "property is null"); |
| try { |
| MasterClient.executeTable(context, client -> client.removeTableProperty(Tracer.traceInfo(), |
| context.rpcCreds(), tableName, property)); |
| } catch (TableNotFoundException e) { |
| throw new AccumuloException(e); |
| } |
| } |
| |
| @Override |
| public Iterable<Entry<String,String>> getProperties(final String tableName) |
| throws AccumuloException, TableNotFoundException { |
| checkArgument(tableName != null, "tableName is null"); |
| try { |
| return ServerClient.executeRaw(context, |
| client -> client.getTableConfiguration(Tracer.traceInfo(), context.rpcCreds(), tableName)) |
| .entrySet(); |
| } catch (ThriftTableOperationException e) { |
| switch (e.getType()) { |
| case NOTFOUND: |
| throw new TableNotFoundException(e); |
| case NAMESPACE_NOTFOUND: |
| throw new TableNotFoundException(tableName, new NamespaceNotFoundException(e)); |
| default: |
| throw new AccumuloException(e.description, e); |
| } |
| } catch (AccumuloException e) { |
| throw e; |
| } catch (Exception e) { |
| throw new AccumuloException(e); |
| } |
| |
| } |
| |
| @Override |
| public void setLocalityGroups(String tableName, Map<String,Set<Text>> groups) |
| throws AccumuloException, AccumuloSecurityException, TableNotFoundException { |
| // ensure locality groups do not overlap |
| LocalityGroupUtil.ensureNonOverlappingGroups(groups); |
| |
| for (Entry<String,Set<Text>> entry : groups.entrySet()) { |
| Set<Text> colFams = entry.getValue(); |
| String value = LocalityGroupUtil.encodeColumnFamilies(colFams); |
| setProperty(tableName, Property.TABLE_LOCALITY_GROUP_PREFIX + entry.getKey(), value); |
| } |
| |
| try { |
| setProperty(tableName, Property.TABLE_LOCALITY_GROUPS.getKey(), |
| Joiner.on(",").join(groups.keySet())); |
| } catch (AccumuloException e) { |
| if (e.getCause() instanceof TableNotFoundException) |
| throw (TableNotFoundException) e.getCause(); |
| throw e; |
| } |
| |
| // remove anything extraneous |
| String prefix = Property.TABLE_LOCALITY_GROUP_PREFIX.getKey(); |
| for (Entry<String,String> entry : getProperties(tableName)) { |
| String property = entry.getKey(); |
| if (property.startsWith(prefix)) { |
| // this property configures a locality group, find out which |
| // one: |
| String[] parts = property.split("\\."); |
| String group = parts[parts.length - 1]; |
| |
| if (!groups.containsKey(group)) { |
| removeProperty(tableName, property); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public Map<String,Set<Text>> getLocalityGroups(String tableName) |
| throws AccumuloException, TableNotFoundException { |
| AccumuloConfiguration conf = new ConfigurationCopy(this.getProperties(tableName)); |
| Map<String,Set<ByteSequence>> groups = LocalityGroupUtil.getLocalityGroups(conf); |
| |
| Map<String,Set<Text>> groups2 = new HashMap<>(); |
| for (Entry<String,Set<ByteSequence>> entry : groups.entrySet()) { |
| |
| HashSet<Text> colFams = new HashSet<>(); |
| |
| for (ByteSequence bs : entry.getValue()) { |
| colFams.add(new Text(bs.toArray())); |
| } |
| |
| groups2.put(entry.getKey(), colFams); |
| } |
| |
| return groups2; |
| } |
| |
| @Override |
| public Set<Range> splitRangeByTablets(String tableName, Range range, int maxSplits) |
| throws AccumuloException, AccumuloSecurityException, TableNotFoundException { |
| checkArgument(tableName != null, "tableName is null"); |
| checkArgument(range != null, "range is null"); |
| if (maxSplits < 1) |
| throw new IllegalArgumentException("maximum splits must be >= 1"); |
| if (maxSplits == 1) |
| return Collections.singleton(range); |
| |
| Random random = new SecureRandom(); |
| Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>(); |
| Table.ID tableId = Tables.getTableId(context, tableName); |
| TabletLocator tl = TabletLocator.getLocator(context, tableId); |
| // its possible that the cache could contain complete, but old information about a tables |
| // tablets... so clear it |
| tl.invalidateCache(); |
| while (!tl.binRanges(context, Collections.singletonList(range), binnedRanges).isEmpty()) { |
| if (!Tables.exists(context, tableId)) |
| throw new TableDeletedException(tableId.canonicalID()); |
| if (Tables.getTableState(context, tableId) == TableState.OFFLINE) |
| throw new TableOfflineException(Tables.getTableOfflineMsg(context, tableId)); |
| |
| log.warn("Unable to locate bins for specified range. Retrying."); |
| // sleep randomly between 100 and 200ms |
| sleepUninterruptibly(100 + random.nextInt(100), TimeUnit.MILLISECONDS); |
| binnedRanges.clear(); |
| tl.invalidateCache(); |
| } |
| |
| // group key extents to get <= maxSplits |
| LinkedList<KeyExtent> unmergedExtents = new LinkedList<>(); |
| List<KeyExtent> mergedExtents = new ArrayList<>(); |
| |
| for (Map<KeyExtent,List<Range>> map : binnedRanges.values()) |
| unmergedExtents.addAll(map.keySet()); |
| |
| // the sort method is efficient for linked list |
| Collections.sort(unmergedExtents); |
| |
| while (unmergedExtents.size() + mergedExtents.size() > maxSplits) { |
| if (unmergedExtents.size() >= 2) { |
| KeyExtent first = unmergedExtents.removeFirst(); |
| KeyExtent second = unmergedExtents.removeFirst(); |
| first.setEndRow(second.getEndRow()); |
| mergedExtents.add(first); |
| } else { |
| mergedExtents.addAll(unmergedExtents); |
| unmergedExtents.clear(); |
| unmergedExtents.addAll(mergedExtents); |
| mergedExtents.clear(); |
| } |
| |
| } |
| |
| mergedExtents.addAll(unmergedExtents); |
| |
| Set<Range> ranges = new HashSet<>(); |
| for (KeyExtent k : mergedExtents) |
| ranges.add(k.toDataRange().clip(range)); |
| |
| return ranges; |
| } |
| |
| private Path checkPath(String dir, String kind, String type) |
| throws IOException, AccumuloException, AccumuloSecurityException { |
| Path ret; |
| Map<String,String> props = context.getClient().instanceOperations().getSystemConfiguration(); |
| AccumuloConfiguration conf = new ConfigurationCopy(props); |
| |
| FileSystem fs = VolumeConfiguration.getVolume(dir, CachedConfiguration.getInstance(), conf) |
| .getFileSystem(); |
| |
| if (dir.contains(":")) { |
| ret = new Path(dir); |
| } else { |
| ret = fs.makeQualified(new Path(dir)); |
| } |
| |
| try { |
| if (!fs.getFileStatus(ret).isDirectory()) { |
| throw new AccumuloException( |
| kind + " import " + type + " directory " + dir + " is not a directory!"); |
| } |
| } catch (FileNotFoundException fnf) { |
| throw new AccumuloException( |
| kind + " import " + type + " directory " + dir + " does not exist!"); |
| } |
| |
| if (type.equals("failure")) { |
| FileStatus[] listStatus = fs.listStatus(ret); |
| if (listStatus != null && listStatus.length != 0) { |
| throw new AccumuloException("Bulk import failure directory " + ret + " is not empty"); |
| } |
| } |
| |
| return ret; |
| } |
| |
| @Override |
| public void importDirectory(String tableName, String dir, String failureDir, boolean setTime) |
| throws IOException, AccumuloSecurityException, TableNotFoundException, AccumuloException { |
| checkArgument(tableName != null, "tableName is null"); |
| checkArgument(dir != null, "dir is null"); |
| checkArgument(failureDir != null, "failureDir is null"); |
| // check for table existence |
| Tables.getTableId(context, tableName); |
| |
| Path dirPath = checkPath(dir, "Bulk", ""); |
| Path failPath = checkPath(failureDir, "Bulk", "failure"); |
| |
| List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes(UTF_8)), |
| ByteBuffer.wrap(dirPath.toString().getBytes(UTF_8)), |
| ByteBuffer.wrap(failPath.toString().getBytes(UTF_8)), |
| ByteBuffer.wrap((setTime + "").getBytes(UTF_8))); |
| Map<String,String> opts = new HashMap<>(); |
| |
| try { |
| doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_BULK_IMPORT, |
| args, opts); |
| } catch (TableExistsException e) { |
| // should not happen |
| throw new AssertionError(e); |
| } |
| } |
| |
| private void waitForTableStateTransition(Table.ID tableId, TableState expectedState) |
| throws AccumuloException, TableNotFoundException, AccumuloSecurityException { |
| |
| Text startRow = null; |
| Text lastRow = null; |
| |
| while (true) { |
| |
| if (Tables.getTableState(context, tableId) != expectedState) { |
| Tables.clearCache(context); |
| TableState currentState = Tables.getTableState(context, tableId); |
| if (currentState != expectedState) { |
| if (!Tables.exists(context, tableId)) |
| throw new TableDeletedException(tableId.canonicalID()); |
| if (currentState == TableState.DELETING) |
| throw new TableNotFoundException(tableId.canonicalID(), "", "Table is being deleted."); |
| throw new AccumuloException("Unexpected table state " + tableId + " " |
| + Tables.getTableState(context, tableId) + " != " + expectedState); |
| } |
| } |
| |
| Range range; |
| if (startRow == null || lastRow == null) |
| range = new KeyExtent(tableId, null, null).toMetadataRange(); |
| else |
| range = new Range(startRow, lastRow); |
| |
| String metaTable = MetadataTable.NAME; |
| if (tableId.equals(MetadataTable.ID)) |
| metaTable = RootTable.NAME; |
| |
| Scanner scanner = createMetadataScanner(metaTable, range); |
| |
| RowIterator rowIter = new RowIterator(scanner); |
| |
| KeyExtent lastExtent = null; |
| |
| int total = 0; |
| int waitFor = 0; |
| int holes = 0; |
| Text continueRow = null; |
| MapCounter<String> serverCounts = new MapCounter<>(); |
| |
| while (rowIter.hasNext()) { |
| Iterator<Entry<Key,Value>> row = rowIter.next(); |
| |
| total++; |
| |
| KeyExtent extent = null; |
| String future = null; |
| String current = null; |
| |
| while (row.hasNext()) { |
| Entry<Key,Value> entry = row.next(); |
| Key key = entry.getKey(); |
| |
| if (key.getColumnFamily().equals(TabletsSection.FutureLocationColumnFamily.NAME)) |
| future = entry.getValue().toString(); |
| |
| if (key.getColumnFamily().equals(TabletsSection.CurrentLocationColumnFamily.NAME)) |
| current = entry.getValue().toString(); |
| |
| if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) |
| extent = new KeyExtent(key.getRow(), entry.getValue()); |
| } |
| |
| if ((expectedState == TableState.ONLINE && current == null) |
| || (expectedState == TableState.OFFLINE && (future != null || current != null))) { |
| if (continueRow == null) |
| continueRow = extent.getMetadataEntry(); |
| waitFor++; |
| lastRow = extent.getMetadataEntry(); |
| |
| if (current != null) |
| serverCounts.increment(current, 1); |
| if (future != null) |
| serverCounts.increment(future, 1); |
| } |
| |
| if (!extent.getTableId().equals(tableId)) { |
| throw new AccumuloException("Saw unexpected table Id " + tableId + " " + extent); |
| } |
| |
| if (lastExtent != null && !extent.isPreviousExtent(lastExtent)) { |
| holes++; |
| } |
| |
| lastExtent = extent; |
| } |
| |
| if (continueRow != null) { |
| startRow = continueRow; |
| } |
| |
| if (holes > 0 || total == 0) { |
| startRow = null; |
| lastRow = null; |
| } |
| |
| if (waitFor > 0 || holes > 0 || total == 0) { |
| long waitTime; |
| long maxPerServer = 0; |
| if (serverCounts.size() > 0) { |
| maxPerServer = Collections.max(serverCounts.values()); |
| waitTime = maxPerServer * 10; |
| } else |
| waitTime = waitFor * 10; |
| waitTime = Math.max(100, waitTime); |
| waitTime = Math.min(5000, waitTime); |
| log.trace("Waiting for {}({}) tablets, startRow = {} lastRow = {}, holes={} sleeping:{}ms", |
| waitFor, maxPerServer, startRow, lastRow, holes, waitTime); |
| sleepUninterruptibly(waitTime, TimeUnit.MILLISECONDS); |
| } else { |
| break; |
| } |
| |
| } |
| } |
| |
| /** |
| * Create an IsolatedScanner over the given table, fetching the columns necessary to determine |
| * when a table has transitioned to online or offline. |
| */ |
| protected IsolatedScanner createMetadataScanner(String metaTable, Range range) |
| throws TableNotFoundException, AccumuloException, AccumuloSecurityException { |
| Scanner scanner = context.getClient().createScanner(metaTable, Authorizations.EMPTY); |
| TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); |
| scanner.fetchColumnFamily(TabletsSection.FutureLocationColumnFamily.NAME); |
| scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME); |
| scanner.setRange(range); |
| return new IsolatedScanner(scanner); |
| } |
| |
| @Override |
| public void offline(String tableName) |
| throws AccumuloSecurityException, AccumuloException, TableNotFoundException { |
| offline(tableName, false); |
| } |
| |
| @Override |
| public void offline(String tableName, boolean wait) |
| throws AccumuloSecurityException, AccumuloException, TableNotFoundException { |
| |
| checkArgument(tableName != null, "tableName is null"); |
| Table.ID tableId = Tables.getTableId(context, tableName); |
| List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getUtf8())); |
| Map<String,String> opts = new HashMap<>(); |
| |
| try { |
| doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_OFFLINE, |
| args, opts); |
| } catch (TableExistsException e) { |
| // should not happen |
| throw new AssertionError(e); |
| } |
| |
| if (wait) |
| waitForTableStateTransition(tableId, TableState.OFFLINE); |
| } |
| |
| @Override |
| public void online(String tableName) |
| throws AccumuloSecurityException, AccumuloException, TableNotFoundException { |
| online(tableName, false); |
| } |
| |
| @Override |
| public void online(String tableName, boolean wait) |
| throws AccumuloSecurityException, AccumuloException, TableNotFoundException { |
| checkArgument(tableName != null, "tableName is null"); |
| |
| Table.ID tableId = Tables.getTableId(context, tableName); |
| |
| /** |
| * ACCUMULO-4574 if table is already online return without executing fate operation. |
| */ |
| |
| TableState expectedState = Tables.getTableState(context, tableId, true); |
| if (expectedState == TableState.ONLINE) { |
| if (wait) |
| waitForTableStateTransition(tableId, TableState.ONLINE); |
| return; |
| } |
| |
| List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getUtf8())); |
| Map<String,String> opts = new HashMap<>(); |
| |
| try { |
| doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_ONLINE, |
| args, opts); |
| } catch (TableExistsException e) { |
| // should not happen |
| throw new AssertionError(e); |
| } |
| |
| if (wait) |
| waitForTableStateTransition(tableId, TableState.ONLINE); |
| } |
| |
| @Override |
| public void clearLocatorCache(String tableName) throws TableNotFoundException { |
| checkArgument(tableName != null, "tableName is null"); |
| TabletLocator tabLocator = TabletLocator.getLocator(context, |
| Tables.getTableId(context, tableName)); |
| tabLocator.invalidateCache(); |
| } |
| |
| @Override |
| public Map<String,String> tableIdMap() { |
| return Tables.getNameToIdMap(context).entrySet().stream() |
| .collect(Collectors.toMap(Entry::getKey, e -> e.getValue().canonicalID(), (v1, v2) -> { |
| throw new RuntimeException(String.format("Duplicate key for values %s and %s", v1, v2)); |
| }, TreeMap::new)); |
| } |
| |
| @Override |
| public Text getMaxRow(String tableName, Authorizations auths, Text startRow, |
| boolean startInclusive, Text endRow, boolean endInclusive) |
| throws TableNotFoundException, AccumuloException, AccumuloSecurityException { |
| checkArgument(tableName != null, "tableName is null"); |
| checkArgument(auths != null, "auths is null"); |
| Scanner scanner = context.getClient().createScanner(tableName, auths); |
| return FindMax.findMax(scanner, startRow, startInclusive, endRow, endInclusive); |
| } |
| |
| @Override |
| public List<DiskUsage> getDiskUsage(Set<String> tableNames) |
| throws AccumuloException, AccumuloSecurityException, TableNotFoundException { |
| |
| List<TDiskUsage> diskUsages = null; |
| while (diskUsages == null) { |
| Pair<String,Client> pair = null; |
| try { |
| // this operation may us a lot of memory... its likely that connections to tabletservers |
| // hosting metadata tablets will be cached, so do not use cached |
| // connections |
| pair = ServerClient.getConnection(context, false); |
| diskUsages = pair.getSecond().getDiskUsage(tableNames, context.rpcCreds()); |
| } catch (ThriftTableOperationException e) { |
| switch (e.getType()) { |
| case NOTFOUND: |
| throw new TableNotFoundException(e); |
| case NAMESPACE_NOTFOUND: |
| throw new TableNotFoundException(e.getTableName(), new NamespaceNotFoundException(e)); |
| default: |
| throw new AccumuloException(e.description, e); |
| } |
| } catch (ThriftSecurityException e) { |
| throw new AccumuloSecurityException(e.getUser(), e.getCode()); |
| } catch (TTransportException e) { |
| // some sort of communication error occurred, retry |
| if (pair == null) { |
| log.debug("Disk usage request failed. Pair is null. Retrying request...", e); |
| } else { |
| log.debug("Disk usage request failed {}, retrying ... ", pair.getFirst(), e); |
| } |
| sleepUninterruptibly(100, TimeUnit.MILLISECONDS); |
| } catch (TException e) { |
| // may be a TApplicationException which indicates error on the server side |
| throw new AccumuloException(e); |
| } finally { |
| // must always return thrift connection |
| if (pair != null) |
| ServerClient.close(pair.getSecond()); |
| } |
| } |
| |
| List<DiskUsage> finalUsages = new ArrayList<>(); |
| for (TDiskUsage diskUsage : diskUsages) { |
| finalUsages.add(new DiskUsage(new TreeSet<>(diskUsage.getTables()), diskUsage.getUsage())); |
| } |
| |
| return finalUsages; |
| } |
| |
| public static Map<String,String> getExportedProps(FileSystem fs, Path path) throws IOException { |
| HashMap<String,String> props = new HashMap<>(); |
| |
| try (ZipInputStream zis = new ZipInputStream(fs.open(path))) { |
| ZipEntry zipEntry; |
| while ((zipEntry = zis.getNextEntry()) != null) { |
| if (zipEntry.getName().equals(Constants.EXPORT_TABLE_CONFIG_FILE)) { |
| try (BufferedReader in = new BufferedReader(new InputStreamReader(zis, UTF_8))) { |
| String line; |
| while ((line = in.readLine()) != null) { |
| String sa[] = line.split("=", 2); |
| props.put(sa[0], sa[1]); |
| } |
| } |
| |
| break; |
| } |
| } |
| } |
| return props; |
| } |
| |
| @Override |
| public void importTable(String tableName, String importDir) |
| throws TableExistsException, AccumuloException, AccumuloSecurityException { |
| checkArgument(tableName != null, "tableName is null"); |
| checkArgument(importDir != null, "importDir is null"); |
| |
| try { |
| importDir = checkPath(importDir, "Table", "").toString(); |
| } catch (IOException e) { |
| throw new AccumuloException(e); |
| } |
| |
| try { |
| FileSystem fs = new Path(importDir).getFileSystem(CachedConfiguration.getInstance()); |
| Map<String,String> props = getExportedProps(fs, new Path(importDir, Constants.EXPORT_FILE)); |
| |
| for (Entry<String,String> entry : props.entrySet()) { |
| if (Property.isClassProperty(entry.getKey()) |
| && !entry.getValue().contains(Constants.CORE_PACKAGE_NAME)) { |
| LoggerFactory.getLogger(this.getClass()).info( |
| "Imported table sets '{}' to '{}'. Ensure this class is on Accumulo classpath.", |
| sanitize(entry.getKey()), sanitize(entry.getValue())); |
| } |
| } |
| |
| } catch (IOException ioe) { |
| LoggerFactory.getLogger(this.getClass()).warn( |
| "Failed to check if imported table references external java classes : {}", |
| ioe.getMessage()); |
| } |
| |
| List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes(UTF_8)), |
| ByteBuffer.wrap(importDir.getBytes(UTF_8))); |
| |
| Map<String,String> opts = Collections.emptyMap(); |
| |
| try { |
| doTableFateOperation(tableName, AccumuloException.class, FateOperation.TABLE_IMPORT, args, |
| opts); |
| } catch (TableNotFoundException e) { |
| // should not happen |
| throw new AssertionError(e); |
| } |
| |
| } |
| |
| /** |
| * Prevent potential CRLF injection into logs from read in user data See |
| * https://find-sec-bugs.github.io/bugs.htm#CRLF_INJECTION_LOGS |
| */ |
| private String sanitize(String msg) { |
| return msg.replaceAll("[\r\n]", ""); |
| } |
| |
| @Override |
| public void exportTable(String tableName, String exportDir) |
| throws TableNotFoundException, AccumuloException, AccumuloSecurityException { |
| checkArgument(tableName != null, "tableName is null"); |
| checkArgument(exportDir != null, "exportDir is null"); |
| |
| List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes(UTF_8)), |
| ByteBuffer.wrap(exportDir.getBytes(UTF_8))); |
| |
| Map<String,String> opts = Collections.emptyMap(); |
| |
| try { |
| doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_EXPORT, |
| args, opts); |
| } catch (TableExistsException e) { |
| // should not happen |
| throw new AssertionError(e); |
| } |
| } |
| |
| @Override |
| public boolean testClassLoad(final String tableName, final String className, |
| final String asTypeName) |
| throws TableNotFoundException, AccumuloException, AccumuloSecurityException { |
| checkArgument(tableName != null, "tableName is null"); |
| checkArgument(className != null, "className is null"); |
| checkArgument(asTypeName != null, "asTypeName is null"); |
| |
| try { |
| return ServerClient.executeRaw(context, client -> client.checkTableClass(Tracer.traceInfo(), |
| context.rpcCreds(), tableName, className, asTypeName)); |
| } catch (ThriftTableOperationException e) { |
| switch (e.getType()) { |
| case NOTFOUND: |
| throw new TableNotFoundException(e); |
| case NAMESPACE_NOTFOUND: |
| throw new TableNotFoundException(tableName, new NamespaceNotFoundException(e)); |
| default: |
| throw new AccumuloException(e.description, e); |
| } |
| } catch (ThriftSecurityException e) { |
| throw new AccumuloSecurityException(e.user, e.code, e); |
| } catch (AccumuloException e) { |
| throw e; |
| } catch (Exception e) { |
| throw new AccumuloException(e); |
| } |
| } |
| |
| @Override |
| public void attachIterator(String tableName, IteratorSetting setting, |
| EnumSet<IteratorScope> scopes) |
| throws AccumuloSecurityException, AccumuloException, TableNotFoundException { |
| testClassLoad(tableName, setting.getIteratorClass(), SortedKeyValueIterator.class.getName()); |
| super.attachIterator(tableName, setting, scopes); |
| } |
| |
| @Override |
| public int addConstraint(String tableName, String constraintClassName) |
| throws AccumuloException, AccumuloSecurityException, TableNotFoundException { |
| testClassLoad(tableName, constraintClassName, Constraint.class.getName()); |
| return super.addConstraint(tableName, constraintClassName); |
| } |
| |
| private void doTableFateOperation(String tableOrNamespaceName, |
| Class<? extends Exception> namespaceNotFoundExceptionClass, FateOperation op, |
| List<ByteBuffer> args, Map<String,String> opts) throws AccumuloSecurityException, |
| AccumuloException, TableExistsException, TableNotFoundException { |
| try { |
| doFateOperation(op, args, opts, tableOrNamespaceName); |
| } catch (NamespaceExistsException e) { |
| // should not happen |
| throw new AssertionError(e); |
| } catch (NamespaceNotFoundException e) { |
| if (namespaceNotFoundExceptionClass == null) { |
| // should not happen |
| throw new AssertionError(e); |
| } else if (AccumuloException.class.isAssignableFrom(namespaceNotFoundExceptionClass)) { |
| throw new AccumuloException("Cannot create table in non-existent namespace", e); |
| } else if (TableNotFoundException.class.isAssignableFrom(namespaceNotFoundExceptionClass)) { |
| throw new TableNotFoundException(null, tableOrNamespaceName, "Namespace not found", e); |
| } else { |
| // should not happen |
| throw new AssertionError(e); |
| } |
| } |
| } |
| |
| private void clearSamplerOptions(String tableName) |
| throws AccumuloException, TableNotFoundException, AccumuloSecurityException { |
| String prefix = Property.TABLE_SAMPLER_OPTS.getKey(); |
| for (Entry<String,String> entry : getProperties(tableName)) { |
| String property = entry.getKey(); |
| if (property.startsWith(prefix)) { |
| removeProperty(tableName, property); |
| } |
| } |
| } |
| |
| @Override |
| public void setSamplerConfiguration(String tableName, SamplerConfiguration samplerConfiguration) |
| throws AccumuloException, TableNotFoundException, AccumuloSecurityException { |
| clearSamplerOptions(tableName); |
| |
| List<Pair<String,String>> props = new SamplerConfigurationImpl(samplerConfiguration) |
| .toTableProperties(); |
| for (Pair<String,String> pair : props) { |
| setProperty(tableName, pair.getFirst(), pair.getSecond()); |
| } |
| } |
| |
| @Override |
| public void clearSamplerConfiguration(String tableName) |
| throws AccumuloException, TableNotFoundException, AccumuloSecurityException { |
| removeProperty(tableName, Property.TABLE_SAMPLER.getKey()); |
| clearSamplerOptions(tableName); |
| } |
| |
| @Override |
| public SamplerConfiguration getSamplerConfiguration(String tableName) |
| throws TableNotFoundException, AccumuloException { |
| AccumuloConfiguration conf = new ConfigurationCopy(this.getProperties(tableName)); |
| SamplerConfigurationImpl sci = SamplerConfigurationImpl.newSamplerConfig(conf); |
| if (sci == null) { |
| return null; |
| } |
| return sci.toSamplerConfiguration(); |
| } |
| |
| private static class LoctionsImpl implements Locations { |
| |
| private Map<Range,List<TabletId>> groupedByRanges; |
| private Map<TabletId,List<Range>> groupedByTablets; |
| private Map<TabletId,String> tabletLocations; |
| |
| public LoctionsImpl(Map<String,Map<KeyExtent,List<Range>>> binnedRanges) { |
| groupedByTablets = new HashMap<>(); |
| groupedByRanges = null; |
| tabletLocations = new HashMap<>(); |
| |
| for (Entry<String,Map<KeyExtent,List<Range>>> entry : binnedRanges.entrySet()) { |
| String location = entry.getKey(); |
| |
| for (Entry<KeyExtent,List<Range>> entry2 : entry.getValue().entrySet()) { |
| TabletIdImpl tabletId = new TabletIdImpl(entry2.getKey()); |
| tabletLocations.put(tabletId, location); |
| List<Range> prev = groupedByTablets.put(tabletId, |
| Collections.unmodifiableList(entry2.getValue())); |
| if (prev != null) { |
| throw new RuntimeException( |
| "Unexpected : tablet at multiple locations : " + location + " " + tabletId); |
| } |
| } |
| } |
| |
| groupedByTablets = Collections.unmodifiableMap(groupedByTablets); |
| } |
| |
| @Override |
| public String getTabletLocation(TabletId tabletId) { |
| return tabletLocations.get(tabletId); |
| } |
| |
| @Override |
| public Map<Range,List<TabletId>> groupByRange() { |
| if (groupedByRanges == null) { |
| Map<Range,List<TabletId>> tmp = new HashMap<>(); |
| |
| for (Entry<TabletId,List<Range>> entry : groupedByTablets.entrySet()) { |
| for (Range range : entry.getValue()) { |
| List<TabletId> tablets = tmp.get(range); |
| if (tablets == null) { |
| tablets = new ArrayList<>(); |
| tmp.put(range, tablets); |
| } |
| |
| tablets.add(entry.getKey()); |
| } |
| } |
| |
| Map<Range,List<TabletId>> tmp2 = new HashMap<>(); |
| for (Entry<Range,List<TabletId>> entry : tmp.entrySet()) { |
| tmp2.put(entry.getKey(), Collections.unmodifiableList(entry.getValue())); |
| } |
| |
| groupedByRanges = Collections.unmodifiableMap(tmp2); |
| } |
| |
| return groupedByRanges; |
| } |
| |
| @Override |
| public Map<TabletId,List<Range>> groupByTablet() { |
| return groupedByTablets; |
| } |
| } |
| |
| @Override |
| public Locations locate(String tableName, Collection<Range> ranges) |
| throws AccumuloException, AccumuloSecurityException, TableNotFoundException { |
| requireNonNull(tableName, "tableName must be non null"); |
| requireNonNull(ranges, "ranges must be non null"); |
| |
| Table.ID tableId = Tables.getTableId(context, tableName); |
| TabletLocator locator = TabletLocator.getLocator(context, tableId); |
| |
| List<Range> rangeList = null; |
| if (ranges instanceof List) { |
| rangeList = (List<Range>) ranges; |
| } else { |
| rangeList = new ArrayList<>(ranges); |
| } |
| |
| Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>(); |
| |
| locator.invalidateCache(); |
| |
| Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS) |
| .incrementBy(100, MILLISECONDS).maxWait(2, SECONDS).logInterval(3, TimeUnit.MINUTES) |
| .createRetry(); |
| |
| while (!locator.binRanges(context, rangeList, binnedRanges).isEmpty()) { |
| |
| if (!Tables.exists(context, tableId)) |
| throw new TableNotFoundException(tableId.canonicalID(), tableName, null); |
| if (Tables.getTableState(context, tableId) == TableState.OFFLINE) |
| throw new TableOfflineException(Tables.getTableOfflineMsg(context, tableId)); |
| |
| binnedRanges.clear(); |
| |
| try { |
| retry.waitForNextAttempt(); |
| } catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } |
| |
| locator.invalidateCache(); |
| } |
| |
| return new LoctionsImpl(binnedRanges); |
| } |
| |
| @Override |
| public SummaryRetriever summaries(String tableName) { |
| |
| return new SummaryRetriever() { |
| |
| private Text startRow = null; |
| private Text endRow = null; |
| private List<TSummarizerConfiguration> summariesToFetch = Collections.emptyList(); |
| private String summarizerClassRegex; |
| private boolean flush = false; |
| |
| @Override |
| public SummaryRetriever startRow(Text startRow) { |
| Objects.requireNonNull(startRow); |
| if (endRow != null) { |
| Preconditions.checkArgument(startRow.compareTo(endRow) < 0, |
| "Start row must be less than end row : %s >= %s", startRow, endRow); |
| } |
| this.startRow = startRow; |
| return this; |
| } |
| |
| @Override |
| public SummaryRetriever startRow(CharSequence startRow) { |
| return startRow(new Text(startRow.toString())); |
| } |
| |
| @Override |
| public List<Summary> retrieve() |
| throws AccumuloException, AccumuloSecurityException, TableNotFoundException { |
| Table.ID tableId = Tables.getTableId(context, tableName); |
| if (Tables.getTableState(context, tableId) == TableState.OFFLINE) |
| throw new TableOfflineException(Tables.getTableOfflineMsg(context, tableId)); |
| |
| TRowRange range = new TRowRange(TextUtil.getByteBuffer(startRow), |
| TextUtil.getByteBuffer(endRow)); |
| TSummaryRequest request = new TSummaryRequest(tableId.canonicalID(), range, |
| summariesToFetch, summarizerClassRegex); |
| if (flush) { |
| _flush(tableId, startRow, endRow, true); |
| } |
| |
| TSummaries ret = ServerClient.execute(context, new TabletClientService.Client.Factory(), |
| client -> { |
| TSummaries tsr = client.startGetSummaries(Tracer.traceInfo(), context.rpcCreds(), |
| request); |
| while (!tsr.finished) { |
| tsr = client.contiuneGetSummaries(Tracer.traceInfo(), tsr.sessionId); |
| } |
| return tsr; |
| }); |
| return new SummaryCollection(ret).getSummaries(); |
| } |
| |
| @Override |
| public SummaryRetriever endRow(Text endRow) { |
| Objects.requireNonNull(endRow); |
| if (startRow != null) { |
| Preconditions.checkArgument(startRow.compareTo(endRow) < 0, |
| "Start row must be less than end row : %s >= %s", startRow, endRow); |
| } |
| this.endRow = endRow; |
| return this; |
| } |
| |
| @Override |
| public SummaryRetriever endRow(CharSequence endRow) { |
| return endRow(new Text(endRow.toString())); |
| } |
| |
| @Override |
| public SummaryRetriever withConfiguration(Collection<SummarizerConfiguration> configs) { |
| Objects.requireNonNull(configs); |
| summariesToFetch = configs.stream().map(SummarizerConfigurationUtil::toThrift) |
| .collect(Collectors.toList()); |
| return this; |
| } |
| |
| @Override |
| public SummaryRetriever withConfiguration(SummarizerConfiguration... config) { |
| Objects.requireNonNull(config); |
| return withConfiguration(Arrays.asList(config)); |
| } |
| |
| @Override |
| public SummaryRetriever withMatchingConfiguration(String regex) { |
| Objects.requireNonNull(regex); |
| // Do a sanity check here to make sure that regex compiles, instead of having it fail on a |
| // tserver. |
| Pattern.compile(regex); |
| this.summarizerClassRegex = regex; |
| return this; |
| } |
| |
| @Override |
| public SummaryRetriever flush(boolean b) { |
| this.flush = b; |
| return this; |
| } |
| }; |
| } |
| |
| @Override |
| public void addSummarizers(String tableName, SummarizerConfiguration... newConfigs) |
| throws AccumuloException, AccumuloSecurityException, TableNotFoundException { |
| HashSet<SummarizerConfiguration> currentConfigs = new HashSet<>( |
| SummarizerConfiguration.fromTableProperties(getProperties(tableName))); |
| HashSet<SummarizerConfiguration> newConfigSet = new HashSet<>(Arrays.asList(newConfigs)); |
| |
| newConfigSet.removeIf(currentConfigs::contains); |
| |
| Set<String> newIds = newConfigSet.stream().map(SummarizerConfiguration::getPropertyId) |
| .collect(toSet()); |
| |
| for (SummarizerConfiguration csc : currentConfigs) { |
| if (newIds.contains(csc.getPropertyId())) { |
| throw new IllegalArgumentException("Summarizer property id is in use by " + csc); |
| } |
| } |
| |
| Set<Entry<String,String>> es = SummarizerConfiguration.toTableProperties(newConfigSet) |
| .entrySet(); |
| for (Entry<String,String> entry : es) { |
| setProperty(tableName, entry.getKey(), entry.getValue()); |
| } |
| } |
| |
| @Override |
| public void removeSummarizers(String tableName, Predicate<SummarizerConfiguration> predicate) |
| throws AccumuloException, TableNotFoundException, AccumuloSecurityException { |
| Collection<SummarizerConfiguration> summarizerConfigs = SummarizerConfiguration |
| .fromTableProperties(getProperties(tableName)); |
| for (SummarizerConfiguration sc : summarizerConfigs) { |
| if (predicate.test(sc)) { |
| Set<String> ks = sc.toTableProperties().keySet(); |
| for (String key : ks) { |
| removeProperty(tableName, key); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public List<SummarizerConfiguration> listSummarizers(String tableName) |
| throws AccumuloException, TableNotFoundException, AccumuloSecurityException { |
| return new ArrayList<>(SummarizerConfiguration.fromTableProperties(getProperties(tableName))); |
| } |
| |
| @Override |
| public ImportDestinationArguments importDirectory(String directory) { |
| return new BulkImport(directory, context); |
| } |
| } |