blob: 2704e6429b21273774ef0ad4ef171c18935f6aca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.common.context;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.common.transactions.AbstractOperationCallback;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.utils.TransactionUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.IoOperationCompleteListener;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.util.ExitUtil;
import org.apache.hyracks.util.annotations.NotThreadSafe;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@NotThreadSafe
public class PrimaryIndexOperationTracker extends BaseOperationTracker implements IoOperationCompleteListener {
private static final Logger LOGGER = LogManager.getLogger();
private final int partition;
// Number of active operations on an ILSMIndex instance.
private final AtomicInteger numActiveOperations;
private final ILogManager logManager;
private final ILSMComponentIdGenerator idGenerator;
private boolean flushOnExit = false;
private boolean flushLogCreated = false;
private final Map<String, FlushOperation> scheduledFlushes = new HashMap<>();
private long lastFlushTime = System.nanoTime();
public PrimaryIndexOperationTracker(int datasetID, int partition, ILogManager logManager, DatasetInfo dsInfo,
ILSMComponentIdGenerator idGenerator) {
super(datasetID, dsInfo);
this.partition = partition;
this.logManager = logManager;
this.numActiveOperations = new AtomicInteger();
this.idGenerator = idGenerator;
}
@Override
public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
super.beforeOperation(index, opType, searchCallback, modificationCallback);
if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
incrementNumActiveOperations(modificationCallback);
}
}
@Override
public synchronized void completeOperation(ILSMIndex index, LSMOperationType opType,
ISearchOperationCallback searchCallback, IModificationOperationCallback modificationCallback)
throws HyracksDataException {
super.completeOperation(index, opType, searchCallback, modificationCallback);
if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
decrementNumActiveOperations(modificationCallback);
flushIfNeeded();
}
}
public synchronized void flushIfNeeded() throws HyracksDataException {
if (canSafelyFlush()) {
flushIfRequested();
}
}
public void flushIfRequested() throws HyracksDataException {
// If we need a flush, and this is the last completing operation, then schedule the flush,
// or if there is a flush scheduled by the checkpoint (flushOnExit), then schedule it
boolean needsFlush = false;
Set<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition);
if (!flushOnExit) {
for (ILSMIndex lsmIndex : indexes) {
if (lsmIndex.hasFlushRequestForCurrentMutableComponent()) {
needsFlush = true;
break;
}
}
}
ILSMIndex primaryLsmIndex = null;
if (needsFlush || flushOnExit) {
flushOnExit = false;
// make the current mutable components READABLE_UNWRITABLE to stop coming modify operations from entering
// them until the current flush is scheduled.
LSMComponentId primaryId = null;
//Double check that the primary index has been modified
synchronized (this) {
if (numActiveOperations.get() > 0) {
throw new IllegalStateException(
"Can't request a flush on an index with active operations: " + numActiveOperations.get());
}
if (indexes.isEmpty()) {
LOGGER.debug("no open indexes on dataset {} and partition {}... skipping flush",
dsInfo.getDatasetID(), partition);
return;
}
for (ILSMIndex lsmIndex : indexes) {
if (lsmIndex.isPrimaryIndex()) {
if (lsmIndex.isCurrentMutableComponentEmpty()) {
LOGGER.debug("Primary index on dataset {} and partition {} is empty... skipping flush",
dsInfo.getDatasetID(), partition);
return;
}
primaryLsmIndex = lsmIndex;
break;
}
}
}
if (primaryLsmIndex == null) {
LOGGER.fatal(
"Primary index not found in dataset {} and partition {} open indexes {}; halting to clear memory state",
dsInfo.getDatasetID(), partition, indexes);
ExitUtil.halt(ExitUtil.EC_INCONSISTENT_STORAGE_REFERENCES);
}
for (ILSMIndex lsmIndex : indexes) {
ILSMOperationTracker opTracker = lsmIndex.getOperationTracker();
synchronized (opTracker) {
ILSMMemoryComponent memComponent = lsmIndex.getCurrentMemoryComponent();
if (memComponent.getWriterCount() > 0) {
throw new IllegalStateException(
"Can't request a flush on a component with writers inside: Index:" + lsmIndex
+ " Component:" + memComponent);
}
if (memComponent.getState() == ComponentState.READABLE_WRITABLE && memComponent.isModified()) {
memComponent.setUnwritable();
}
if (lsmIndex.isPrimaryIndex()) {
primaryId = (LSMComponentId) memComponent.getId();
}
}
}
if (primaryId == null) {
throw new IllegalStateException("Primary index found in dataset " + dsInfo.getDatasetID()
+ " and partition " + partition + " and is modified but its component id is null");
}
LogRecord logRecord = new LogRecord();
if (dsInfo.isDurable()) {
/*
* Generate a FLUSH log.
* Flush will be triggered when the log is written to disk by LogFlusher.
*/
TransactionUtil.formFlushLogRecord(logRecord, datasetID, partition, primaryId.getMinId(),
primaryId.getMaxId(), this);
try {
logManager.log(logRecord);
} catch (ACIDException e) {
throw new IllegalStateException("could not write flush log", e);
}
flushLogCreated = true;
} else {
//trigger flush for temporary indexes without generating a FLUSH log.
triggerScheduleFlush(logRecord);
}
}
}
//This method is called sequentially by LogPage.notifyFlushTerminator in the sequence flushes were scheduled.
public synchronized void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException {
try {
if (!canSafelyFlush()) {
// if a force modification operation started before the flush is scheduled, this flush will fail
// and a next attempt will be made when that operation completes. This is only expected for metadata
// datasets since they always use force modification
if (MetadataIndexImmutableProperties.isMetadataDataset(datasetID)) {
return;
}
throw new IllegalStateException("Operation started while index was pending scheduling a flush");
}
idGenerator.refresh();
long flushLsn = logRecord.getLSN();
if (flushLsn == 0) {
LOGGER.warn("flushing an index with LSN 0. Flush log record: {}", logRecord::getLogRecordForDisplay);
}
ILSMComponentId nextComponentId = idGenerator.getId();
Map<String, Object> flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextComponentId);
synchronized (scheduledFlushes) {
for (ILSMIndex lsmIndex : dsInfo.getDatasetPartitionOpenIndexes(partition)) {
ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
accessor.getOpContext().setParameters(flushMap);
ILSMIOOperation flush = accessor.scheduleFlush();
lastFlushTime = System.nanoTime();
scheduledFlushes.put(flush.getTarget().getRelativePath(), (FlushOperation) flush);
flush.addCompleteListener(this);
}
}
} finally {
flushLogCreated = false;
}
}
@Override
public void completed(ILSMIOOperation operation) {
synchronized (scheduledFlushes) {
scheduledFlushes.remove(operation.getTarget().getRelativePath());
}
}
public List<FlushOperation> getScheduledFlushes() {
synchronized (scheduledFlushes) {
Collection<FlushOperation> scheduled = scheduledFlushes.values();
List<FlushOperation> flushes = new ArrayList<FlushOperation>(scheduled.size());
flushes.addAll(scheduled);
return flushes;
}
}
public int getNumActiveOperations() {
return numActiveOperations.get();
}
private void incrementNumActiveOperations(IModificationOperationCallback modificationCallback) {
//modificationCallback can be NoOpOperationCallback when redo/undo operations are executed.
if (modificationCallback != NoOpOperationCallback.INSTANCE) {
numActiveOperations.incrementAndGet();
((AbstractOperationCallback) modificationCallback).beforeOperation();
}
}
private void decrementNumActiveOperations(IModificationOperationCallback modificationCallback) {
//modificationCallback can be NoOpOperationCallback when redo/undo operations are executed.
if (modificationCallback != NoOpOperationCallback.INSTANCE) {
if (numActiveOperations.decrementAndGet() < 0) {
throw new IllegalStateException("The number of active operations cannot be negative!");
}
((AbstractOperationCallback) modificationCallback).afterOperation();
}
}
public boolean isFlushOnExit() {
return flushOnExit;
}
public void setFlushOnExit(boolean flushOnExit) {
this.flushOnExit = flushOnExit;
}
public boolean isFlushLogCreated() {
return flushLogCreated;
}
public int getPartition() {
return partition;
}
public long getLastFlushTime() {
return lastFlushTime;
}
@Override
public String toString() {
return "Dataset (" + datasetID + "), Partition (" + partition + ")";
}
public void deleteMemoryComponent() throws HyracksDataException {
Set<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition);
ILSMIndex primaryLsmIndex = null;
for (ILSMIndex lsmIndex : indexes) {
if (lsmIndex.isPrimaryIndex()) {
if (lsmIndex.isCurrentMutableComponentEmpty()) {
LOGGER.info("Primary index on dataset {} and partition {} is empty... skipping delete",
dsInfo.getDatasetID(), partition);
return;
}
primaryLsmIndex = lsmIndex;
break;
}
}
Objects.requireNonNull(primaryLsmIndex, "no primary index found in " + indexes);
idGenerator.refresh();
ILSMComponentId nextComponentId = idGenerator.getId();
Map<String, Object> flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, 0L);
flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextComponentId);
ILSMIndexAccessor accessor = primaryLsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
accessor.getOpContext().setParameters(flushMap);
accessor.deleteComponents(c -> c.getType() == ILSMComponent.LSMComponentType.MEMORY);
}
private boolean canSafelyFlush() {
return numActiveOperations.get() == 0;
}
}