blob: 4ebaab124a753be3e8f5acbff53329bdd782ef51 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.manager.tableOps.compact;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACT_ID;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
import org.apache.accumulo.core.clientImpl.Tables;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.util.MapCounter;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
import org.apache.accumulo.manager.tableOps.Utils;
import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection;
import org.apache.thrift.TException;
import org.slf4j.LoggerFactory;
class CompactionDriver extends ManagerRepo {
private static final long serialVersionUID = 1L;
private long compactId;
private final TableId tableId;
private final NamespaceId namespaceId;
private byte[] startRow;
private byte[] endRow;
public CompactionDriver(long compactId, NamespaceId namespaceId, TableId tableId, byte[] startRow,
byte[] endRow) {
this.compactId = compactId;
this.tableId = tableId;
this.namespaceId = namespaceId;
this.startRow = startRow;
this.endRow = endRow;
}
@Override
public long isReady(long tid, Manager manager) throws Exception {
if (tableId.equals(RootTable.ID)) {
// this codes not properly handle the root table. See #798
return 0;
}
String zCancelID = Constants.ZROOT + "/" + manager.getInstanceID() + Constants.ZTABLES + "/"
+ tableId + Constants.ZTABLE_COMPACT_CANCEL_ID;
ZooReaderWriter zoo = manager.getContext().getZooReaderWriter();
if (Long.parseLong(new String(zoo.getData(zCancelID))) >= compactId) {
// compaction was canceled
throw new AcceptableThriftTableOperationException(tableId.canonical(), null,
TableOperation.COMPACT, TableOperationExceptionType.OTHER, "Compaction canceled");
}
MapCounter<TServerInstance> serversToFlush = new MapCounter<>();
long t1 = System.currentTimeMillis();
int tabletsToWaitFor = 0;
int tabletCount = 0;
TabletsMetadata tablets =
TabletsMetadata.builder().forTable(tableId).overlapping(startRow, endRow)
.fetch(LOCATION, PREV_ROW, COMPACT_ID).build(manager.getContext());
for (TabletMetadata tablet : tablets) {
if (tablet.getCompactId().orElse(-1) < compactId) {
tabletsToWaitFor++;
if (tablet.hasCurrent()) {
serversToFlush.increment(tablet.getLocation(), 1);
}
}
tabletCount++;
}
long scanTime = System.currentTimeMillis() - t1;
Tables.clearCache(manager.getContext());
if (tabletCount == 0 && !Tables.exists(manager.getContext(), tableId))
throw new AcceptableThriftTableOperationException(tableId.canonical(), null,
TableOperation.COMPACT, TableOperationExceptionType.NOTFOUND, null);
if (serversToFlush.size() == 0
&& Tables.getTableState(manager.getContext(), tableId) == TableState.OFFLINE)
throw new AcceptableThriftTableOperationException(tableId.canonical(), null,
TableOperation.COMPACT, TableOperationExceptionType.OFFLINE, null);
if (tabletsToWaitFor == 0)
return 0;
for (TServerInstance tsi : serversToFlush.keySet()) {
try {
final TServerConnection server = manager.getConnection(tsi);
if (server != null)
server.compact(manager.getManagerLock(), tableId.canonical(), startRow, endRow);
} catch (TException ex) {
LoggerFactory.getLogger(CompactionDriver.class).error(ex.toString());
}
}
long sleepTime = 500;
// make wait time depend on the server with the most to compact
if (serversToFlush.size() > 0)
sleepTime = serversToFlush.max() * sleepTime;
sleepTime = Math.max(2 * scanTime, sleepTime);
sleepTime = Math.min(sleepTime, 30000);
return sleepTime;
}
@Override
public Repo<Manager> call(long tid, Manager env) throws Exception {
CompactRange.removeIterators(env, tid, tableId);
Utils.getReadLock(env, tableId, tid).unlock();
Utils.getReadLock(env, namespaceId, tid).unlock();
return null;
}
@Override
public void undo(long tid, Manager environment) {
}
}