blob: 83e141d61320919c6c9b7e0809bfe73489abbca9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.datamap.status;
import java.io.*;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
import org.apache.carbondata.core.fileoperations.FileWriteOperation;
import org.apache.carbondata.core.locks.CarbonLockFactory;
import org.apache.carbondata.core.locks.CarbonLockUtil;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.locks.LockUsage;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import com.google.gson.Gson;
/**
* It saves/serializes the array of {{@link DataMapStatusDetail}} to disk in json format.
* It ensures the data consistance while concurrent write through write lock. It saves the status
* to the datamapstatus under the system folder.
*/
public class DiskBasedDataMapStatusProvider implements DataMapStatusStorageProvider {
private static final LogService LOG =
LogServiceFactory.getLogService(DiskBasedDataMapStatusProvider.class.getName());
private static final String DATAMAP_STATUS_FILE = "datamapstatus";
@Override
public DataMapStatusDetail[] getDataMapStatusDetails() throws IOException {
String statusPath = CarbonProperties.getInstance().getSystemFolderLocation()
+ CarbonCommonConstants.FILE_SEPARATOR + DATAMAP_STATUS_FILE;
Gson gsonObjectToRead = new Gson();
DataInputStream dataInputStream = null;
BufferedReader buffReader = null;
InputStreamReader inStream = null;
DataMapStatusDetail[] dataMapStatusDetails;
try {
if (!FileFactory.isFileExist(statusPath)) {
return new DataMapStatusDetail[0];
}
dataInputStream =
FileFactory.getDataInputStream(statusPath, FileFactory.getFileType(statusPath));
inStream = new InputStreamReader(dataInputStream,
Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
buffReader = new BufferedReader(inStream);
dataMapStatusDetails = gsonObjectToRead.fromJson(buffReader, DataMapStatusDetail[].class);
} catch (IOException e) {
LOG.error(e, "Failed to read datamap status");
throw e;
} finally {
CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
}
// if dataMapStatusDetails is null, return empty array
if (null == dataMapStatusDetails) {
return new DataMapStatusDetail[0];
}
return dataMapStatusDetails;
}
/**
* Update or add the status of passed datamaps with the given datamapstatus. If the datamapstatus
* given is enabled/disabled then updates/adds the datamap, in case of drop it just removes it
* from the file.
* This method always overwrites the old file.
* @param dataMapSchemas schemas of which are need to be updated in datamap status
* @param dataMapStatus status to be updated for the datamap schemas
* @throws IOException
*/
@Override
public void updateDataMapStatus(List<DataMapSchema> dataMapSchemas, DataMapStatus dataMapStatus)
throws IOException {
if (dataMapSchemas == null || dataMapSchemas.size() == 0) {
// There is nothing to update
return;
}
ICarbonLock carbonTableStatusLock = getDataMapStatusLock();
boolean locked = false;
try {
locked = carbonTableStatusLock.lockWithRetries();
if (locked) {
LOG.info("Datamap status lock has been successfully acquired.");
DataMapStatusDetail[] dataMapStatusDetails = getDataMapStatusDetails();
List<DataMapStatusDetail> dataMapStatusList = Arrays.asList(dataMapStatusDetails);
dataMapStatusList = new ArrayList<>(dataMapStatusList);
List<DataMapStatusDetail> changedStatusDetails = new ArrayList<>();
List<DataMapStatusDetail> newStatusDetails = new ArrayList<>();
for (DataMapSchema dataMapSchema : dataMapSchemas) {
boolean exists = false;
for (DataMapStatusDetail statusDetail : dataMapStatusList) {
if (statusDetail.getDataMapName().equals(dataMapSchema.getDataMapName())) {
statusDetail.setStatus(dataMapStatus);
changedStatusDetails.add(statusDetail);
exists = true;
}
}
if (!exists) {
newStatusDetails
.add(new DataMapStatusDetail(dataMapSchema.getDataMapName(), dataMapStatus));
}
}
// Add the newly added datamaps to the list.
if (newStatusDetails.size() > 0 && dataMapStatus != DataMapStatus.DROPPED) {
dataMapStatusList.addAll(newStatusDetails);
}
// In case of dropped datamap, just remove from the list.
if (dataMapStatus == DataMapStatus.DROPPED) {
dataMapStatusList.removeAll(changedStatusDetails);
}
writeLoadDetailsIntoFile(CarbonProperties.getInstance().getSystemFolderLocation()
+ CarbonCommonConstants.FILE_SEPARATOR + DATAMAP_STATUS_FILE,
dataMapStatusList.toArray(new DataMapStatusDetail[dataMapStatusList.size()]));
} else {
String errorMsg = "Upadating datamapstatus is failed due to another process taken the lock"
+ " for updating it";
LOG.audit(errorMsg);
LOG.error(errorMsg);
throw new IOException(errorMsg + " Please try after some time.");
}
} finally {
if (locked) {
CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.DATAMAP_STATUS_LOCK);
}
}
}
/**
* writes datamap status details
*
* @param dataMapStatusDetails
* @throws IOException
*/
private static void writeLoadDetailsIntoFile(String location,
DataMapStatusDetail[] dataMapStatusDetails) throws IOException {
AtomicFileOperations fileWrite =
new AtomicFileOperationsImpl(location, FileFactory.getFileType(location));
BufferedWriter brWriter = null;
DataOutputStream dataOutputStream = null;
Gson gsonObjectToWrite = new Gson();
// write the updated data into the datamap status file.
try {
dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
String metadataInstance = gsonObjectToWrite.toJson(dataMapStatusDetails);
brWriter.write(metadataInstance);
} catch (IOException ioe) {
LOG.error("Error message: " + ioe.getLocalizedMessage());
throw ioe;
} finally {
if (null != brWriter) {
brWriter.flush();
}
CarbonUtil.closeStreams(brWriter);
fileWrite.close();
}
}
private static ICarbonLock getDataMapStatusLock() {
return CarbonLockFactory
.getCarbonLockObj(CarbonProperties.getInstance().getSystemFolderLocation(),
LockUsage.DATAMAP_STATUS_LOCK);
}
}