blob: bdbc69d0d1c431b451eb16d82cdb3d598989138e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.secondaryindex.Jobs;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
/**
* The class provides the map of table blocks already submitted for DataMap load
*/
public class CarbonBlockLoaderHelper {
private static final CarbonBlockLoaderHelper carbonBlockLoaderHelper =
new CarbonBlockLoaderHelper();
/**
* maintains the map of segments already considered for the btree load
*/
private ConcurrentMap<AbsoluteTableIdentifier, CopyOnWriteArraySet<String>> tableBlockMap;
private CarbonBlockLoaderHelper() {
tableBlockMap = new ConcurrentHashMap<>();
}
/**
* return's instance of the CarbonBlockLoaderHelper
*
* @return
*/
public static CarbonBlockLoaderHelper getInstance() {
return carbonBlockLoaderHelper;
}
private Set<String> getTableblocks(AbsoluteTableIdentifier absoluteTableIdentifier) {
CopyOnWriteArraySet<String> blockSet = tableBlockMap.get(absoluteTableIdentifier);
if (null == blockSet) {
CopyOnWriteArraySet<String> newBlockSet = new CopyOnWriteArraySet<String>();
blockSet = tableBlockMap.putIfAbsent(absoluteTableIdentifier, newBlockSet);
if (null == blockSet) {
blockSet = newBlockSet;
}
}
return blockSet;
}
/**
* The method check the tableBlockMap to know weather the block is already submitted/ considered
* for the DataMap loading.
*
* @param uniqueBlockId <String> Uniquely identify the block
* @return <false> if uniqueSegmentId is mapped to any of the key present in the
* segmentsMap map else <true>
*/
public Boolean checkAlreadySubmittedBlock(final AbsoluteTableIdentifier absoluteTableIdentifier,
final String uniqueBlockId) {
Set<String> tableBlocks = getTableblocks(absoluteTableIdentifier);
// tableBlocks is a type of CopyOnWriteArraySet, so avoided taking lock during write/add
return tableBlocks.add(uniqueBlockId);
}
/**
* This api is used to clear the tableBlockMap so that if there is cache mis then the
* the table blocks should be considered as not already submitted for the DataMap load.
*
* @param absoluteTableIdentifier Identifies table uniquely
* @param uniqueBlockId Set<String> Set of blockId
*/
public void clear(final AbsoluteTableIdentifier absoluteTableIdentifier,
final Set<String> uniqueBlockId) {
CopyOnWriteArraySet<String> blockSet = tableBlockMap.get(absoluteTableIdentifier);
if (null != blockSet) {
for (String block : uniqueBlockId) {
blockSet.remove(block);
}
if (blockSet.isEmpty()) {
tableBlockMap.remove(absoluteTableIdentifier);
}
}
}
}