blob: 5b8da53bf9dbd7ab4389c6099d0a8d861e0c9e72 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hyracks.storage.am.lsm.common.impls;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.api.IndexException;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
public class PrefixMergePolicy implements ILSMMergePolicy {
private long maxMergableComponentSize;
private int maxToleranceComponentCount;
@Override
public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws HyracksDataException,
IndexException {
// 1. Look at the candidate components for merging in oldest-first order. If one exists, identify the prefix of the sequence of
// all such components for which the sum of their sizes exceeds MaxMrgCompSz. Schedule a merge of those components into a new component.
// 2. If a merge from 1 doesn't happen, see if the set of candidate components for merging exceeds MaxTolCompCnt. If so, schedule
// a merge all of the current candidates into a new single component.
List<ILSMComponent> immutableComponents = new ArrayList<ILSMComponent>(index.getImmutableComponents());
// Reverse the components order so that we look at components from oldest to newest.
Collections.reverse(immutableComponents);
for (ILSMComponent c : immutableComponents) {
if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
return;
}
}
if (fullMergeIsRequested) {
ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
accessor.scheduleFullMerge(index.getIOOperationCallback());
return;
}
long totalSize = 0;
int startIndex = -1;
for (int i = 0; i < immutableComponents.size(); i++) {
ILSMComponent c = immutableComponents.get(i);
long componentSize = ((AbstractDiskLSMComponent) c).getComponentSize();
if (componentSize > maxMergableComponentSize) {
startIndex = i;
totalSize = 0;
continue;
}
totalSize += componentSize;
boolean isLastComponent = i + 1 == immutableComponents.size() ? true : false;
if (totalSize > maxMergableComponentSize
|| (isLastComponent && i - startIndex >= maxToleranceComponentCount)) {
List<ILSMComponent> mergableComponents = new ArrayList<ILSMComponent>();
for (int j = startIndex + 1; j <= i; j++) {
mergableComponents.add(immutableComponents.get(j));
}
// Reverse the components order back to its original order
Collections.reverse(mergableComponents);
ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
accessor.scheduleMerge(index.getIOOperationCallback(), mergableComponents);
break;
}
}
}
@Override
public void configure(Map<String, String> properties) {
maxMergableComponentSize = Long.parseLong(properties.get("max-mergable-component-size"));
maxToleranceComponentCount = Integer.parseInt(properties.get("max-tolerance-component-count"));
}
}