blob: 3b81c5c0ceeb1ea1b35c3c36c9c24e5f8b12964d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.index.status;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
import org.apache.carbondata.core.fileoperations.FileWriteOperation;
import org.apache.carbondata.core.index.IndexUtil;
import org.apache.carbondata.core.locks.CarbonLockFactory;
import org.apache.carbondata.core.locks.CarbonLockUtil;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.locks.LockUsage;
import org.apache.carbondata.core.metadata.schema.table.IndexSchema;
import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import com.google.gson.Gson;
import org.apache.log4j.Logger;
/**
* It saves/serializes the array of {{@link IndexStatusDetail}} to disk in json format.
* It ensures the data consistance while concurrent write through write lock. It saves the status
* to the index status under the system folder.
*/
public class DiskBasedIndexStatusProvider implements IndexStatusStorageProvider {
private static final Logger LOG =
LogServiceFactory.getLogService(DiskBasedIndexStatusProvider.class.getName());
private static final String INDEX_STATUS_FILE = "indexstatus";
@Override
public IndexStatusDetail[] getIndexStatusDetails() throws IOException {
String statusPath = CarbonProperties.getInstance().getSystemFolderLocation()
+ CarbonCommonConstants.FILE_SEPARATOR + INDEX_STATUS_FILE;
Gson gsonObjectToRead = new Gson();
DataInputStream dataInputStream = null;
BufferedReader buffReader = null;
InputStreamReader inStream = null;
IndexStatusDetail[] indexStatusDetails;
try {
if (!FileFactory.isFileExist(statusPath)) {
return new IndexStatusDetail[0];
}
dataInputStream = FileFactory.getDataInputStream(statusPath);
inStream = new InputStreamReader(dataInputStream,
Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
buffReader = new BufferedReader(inStream);
indexStatusDetails = gsonObjectToRead.fromJson(buffReader, IndexStatusDetail[].class);
} catch (IOException e) {
LOG.error("Failed to read index status", e);
throw e;
} finally {
CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
}
// if indexStatusDetails is null, return empty array
if (null == indexStatusDetails) {
return new IndexStatusDetail[0];
}
return indexStatusDetails;
}
/**
* Update or add the status of passed indexes with the given indexstatus. If the indexstatus
* given is enabled/disabled then updates/adds the index, in case of drop it just removes it
* from the file.
* This method always overwrites the old file.
* @param indexSchemas schemas of which are need to be updated in index status
* @param indexStatus status to be updated for the index schemas
* @throws IOException
*/
@Override
public void updateIndexStatus(List<IndexSchema> indexSchemas, IndexStatus indexStatus)
throws IOException {
if (indexSchemas == null || indexSchemas.size() == 0) {
// There is nothing to update
return;
}
ICarbonLock carbonTableStatusLock = getIndexStatusLock();
boolean locked = false;
try {
locked = carbonTableStatusLock.lockWithRetries();
if (locked) {
LOG.info("index status lock has been successfully acquired.");
if (indexStatus == IndexStatus.ENABLED && !indexSchemas.get(0).isIndex()) {
// Enable index only if index tables and main table are in sync
if (!canIndexBeEnabled(indexSchemas.get(0))) {
return;
}
}
IndexStatusDetail[] indexStatusDetails = getIndexStatusDetails();
List<IndexStatusDetail> indexStatusList = Arrays.asList(indexStatusDetails);
indexStatusList = new ArrayList<>(indexStatusList);
List<IndexStatusDetail> changedStatusDetails = new ArrayList<>();
List<IndexStatusDetail> newStatusDetails = new ArrayList<>();
for (IndexSchema indexSchema : indexSchemas) {
boolean exists = false;
for (IndexStatusDetail statusDetail : indexStatusList) {
if (statusDetail.getIndexName().equals(indexSchema.getIndexName())) {
statusDetail.setStatus(indexStatus);
changedStatusDetails.add(statusDetail);
exists = true;
}
}
if (!exists) {
newStatusDetails
.add(new IndexStatusDetail(indexSchema.getIndexName(), indexStatus));
}
}
// Add the newly added index to the list.
if (newStatusDetails.size() > 0 && indexStatus != IndexStatus.DROPPED) {
indexStatusList.addAll(newStatusDetails);
}
// In case of dropped index, just remove from the list.
if (indexStatus == IndexStatus.DROPPED) {
indexStatusList.removeAll(changedStatusDetails);
}
writeLoadDetailsIntoFile(CarbonProperties.getInstance().getSystemFolderLocation()
+ CarbonCommonConstants.FILE_SEPARATOR + INDEX_STATUS_FILE,
indexStatusList.toArray(new IndexStatusDetail[indexStatusList.size()]));
} else {
String errorMsg = "Upadating index status is failed due to another process taken the lock"
+ " for updating it";
LOG.error(errorMsg);
throw new IOException(errorMsg + " Please try after some time.");
}
} finally {
if (locked) {
CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.INDEX_STATUS_LOCK);
}
}
}
/**
* This method checks if main table and index table are synchronised or not. If synchronised
* return true to enable the index
*
* @param indexSchema of index to be disabled or enabled
* @return flag to enable or disable index
* @throws IOException
*/
private boolean canIndexBeEnabled(IndexSchema indexSchema) throws IOException {
boolean isIndexInSync = true;
String metaDataPath =
CarbonTablePath.getMetadataPath(indexSchema.getRelationIdentifier().getTablePath());
LoadMetadataDetails[] indexLoadMetadataDetails =
SegmentStatusManager.readLoadMetadata(metaDataPath);
Map<String, List<String>> indexSegmentMap = new HashMap<>();
for (LoadMetadataDetails loadMetadataDetail : indexLoadMetadataDetails) {
if (loadMetadataDetail.getSegmentStatus() == SegmentStatus.SUCCESS) {
Map<String, List<String>> segmentMap =
MVSegmentStatusUtil.getSegmentMap(loadMetadataDetail.getExtraInfo());
if (indexSegmentMap.isEmpty()) {
indexSegmentMap.putAll(segmentMap);
} else {
for (Map.Entry<String, List<String>> entry : segmentMap.entrySet()) {
if (null != indexSegmentMap.get(entry.getKey())) {
indexSegmentMap.get(entry.getKey()).addAll(entry.getValue());
}
}
}
}
}
List<RelationIdentifier> parentTables = indexSchema.getParentTables();
for (RelationIdentifier parentTable : parentTables) {
List<String> mainTableValidSegmentList =
IndexUtil.getMainTableValidSegmentList(parentTable);
if (!mainTableValidSegmentList.isEmpty() && !indexSegmentMap.isEmpty()) {
isIndexInSync = indexSegmentMap.get(
parentTable.getDatabaseName() + CarbonCommonConstants.POINT + parentTable
.getTableName()).containsAll(mainTableValidSegmentList);
} else if (indexSegmentMap.isEmpty() && !mainTableValidSegmentList.isEmpty()) {
isIndexInSync = false;
}
}
return isIndexInSync;
}
/**
* writes index status details
*
* @param indexStatusDetails
* @throws IOException
*/
private static void writeLoadDetailsIntoFile(String location,
IndexStatusDetail[] indexStatusDetails) throws IOException {
AtomicFileOperations fileWrite = AtomicFileOperationFactory.getAtomicFileOperations(location);
BufferedWriter brWriter = null;
DataOutputStream dataOutputStream = null;
Gson gsonObjectToWrite = new Gson();
// write the updated data into the index status file.
try {
dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
String metadataInstance = gsonObjectToWrite.toJson(indexStatusDetails);
brWriter.write(metadataInstance);
} catch (IOException ioe) {
LOG.error("Error message: " + ioe.getLocalizedMessage());
fileWrite.setFailed();
throw ioe;
} finally {
if (null != brWriter) {
brWriter.flush();
}
CarbonUtil.closeStreams(brWriter);
fileWrite.close();
}
}
private static ICarbonLock getIndexStatusLock() {
return CarbonLockFactory
.getSystemLevelCarbonLockObj(CarbonProperties.getInstance().getSystemFolderLocation(),
LockUsage.INDEX_STATUS_LOCK);
}
}