| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.core.view; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.carbondata.common.annotations.InterfaceAudience; |
| import org.apache.carbondata.common.logging.LogServiceFactory; |
| import org.apache.carbondata.core.constants.CarbonCommonConstants; |
| import org.apache.carbondata.core.locks.ICarbonLock; |
| import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; |
| import org.apache.carbondata.core.metadata.CarbonMetadata; |
| import org.apache.carbondata.core.metadata.schema.table.CarbonTable; |
| import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier; |
| import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; |
| import org.apache.carbondata.core.statusmanager.SegmentStatus; |
| import org.apache.carbondata.core.statusmanager.SegmentStatusManager; |
| import org.apache.carbondata.core.util.path.CarbonTablePath; |
| |
| import com.google.gson.Gson; |
| import org.apache.log4j.Logger; |
| |
| /** |
| * It maintains all the mv schemas in it. |
| */ |
| @InterfaceAudience.Internal |
| public abstract class MVManager { |
| |
| private static final Logger LOGGER = |
| LogServiceFactory.getLogService(MVManager.class.getName()); |
| |
| private final MVProvider schemaProvider = new MVProvider(); |
| |
| private volatile MVCatalog<?> catalog; |
| |
| private final Object lock = new Object(); |
| |
| public MVManager() { |
| |
| } |
| |
| public abstract List<String> getDatabases(); |
| |
| public abstract String getDatabaseLocation(String databaseName); |
| |
| public boolean hasSchemaOnTable(CarbonTable table) { |
| return !table.getMVTablesMap().isEmpty(); |
| } |
| |
| public boolean isMVInSyncWithParentTables(MVSchema mvSchema) throws IOException { |
| return schemaProvider.isViewCanBeEnabled(mvSchema, true); |
| } |
| |
| /** |
| * It gives all mv schemas of a given table. |
| * For show mv command. |
| */ |
| public List<MVSchema> getSchemasOnTable(CarbonTable table) |
| throws IOException { |
| return getSchemas(table.getMVTablesMap()); |
| } |
| |
| /** |
| * It gives all mv schemas of a given table. |
| * For show mv command. |
| */ |
| public List<MVSchema> getSchemasOnTable(String databaseName, |
| CarbonTable carbonTable) throws IOException { |
| return schemaProvider.getSchemas(this, databaseName, carbonTable); |
| } |
| |
| /** |
| * It gives all mv schemas from store. |
| */ |
| public List<MVSchema> getSchemas() throws IOException { |
| List<MVSchema> schemas = new ArrayList<>(); |
| for (String database : this.getDatabases()) { |
| try { |
| schemas.addAll(this.getSchemas(database)); |
| } catch (IOException ex) { |
| throw ex; |
| } catch (Exception ex) { |
| LOGGER.error("Exception Occurred: Skipping MV schemas from database: " + database); |
| if (LOGGER.isDebugEnabled()) { |
| LOGGER.debug(ex.getMessage()); |
| } |
| } |
| } |
| return schemas; |
| } |
| |
| /** |
| * It gives all mv schemas from given databases in the store |
| */ |
| public List<MVSchema> getSchemas(Map<String, List<String>> mvTablesMap) throws IOException { |
| List<MVSchema> schemas = new ArrayList<>(); |
| for (Map.Entry<String, List<String>> databaseEntry : mvTablesMap.entrySet()) { |
| String database = databaseEntry.getKey(); |
| List<String> mvTables = databaseEntry.getValue(); |
| for (String mvTable : mvTables) { |
| try { |
| schemas.add(this.getSchema(database, mvTable)); |
| } catch (IOException ex) { |
| LOGGER.error("Error while fetching MV schema " + mvTable + " from database: " + database); |
| throw ex; |
| } catch (Exception ex) { |
| LOGGER.error( |
| "Exception Occurred: Skipping MV schema " + mvTable + " from database: " + database); |
| if (LOGGER.isDebugEnabled()) { |
| LOGGER.debug(ex.getMessage()); |
| } |
| } |
| } |
| } |
| return schemas; |
| } |
| |
| /** |
| * It gives all mv schemas from store. |
| */ |
| public List<MVSchema> getSchemas(String databaseName) throws IOException { |
| return schemaProvider.getSchemas(this, databaseName); |
| } |
| |
| public MVSchema getSchema(String databaseName, String viewName) throws IOException { |
| return schemaProvider.getSchema(this, databaseName, viewName, false); |
| } |
| |
| public MVSchema getSchema(String databaseName, String viewName, boolean isRegisterMV) |
| throws IOException { |
| return schemaProvider.getSchema(this, databaseName, viewName, isRegisterMV); |
| } |
| |
| /** |
| * Saves the mv schema to storage |
| * |
| * @param viewSchema mv schema |
| */ |
| public void createSchema(String databaseName, MVSchema viewSchema) |
| throws IOException { |
| schemaProvider.saveSchema(this, databaseName, viewSchema); |
| } |
| |
| /** |
| * Drops the mv schema from storage |
| * |
| * @param viewName mv name |
| */ |
| public void deleteSchema(String databaseName, String viewName) throws IOException { |
| schemaProvider.dropSchema(this, databaseName, viewName); |
| } |
| |
| /** |
| * Get the mv catalog. |
| */ |
| public MVCatalog<?> getCatalog() { |
| return catalog; |
| } |
| |
| /** |
| * Get the mv catalog. |
| */ |
| public MVCatalog<?> getCatalog( |
| MVCatalogFactory<?> catalogFactory, |
| List<MVSchema> currSchemas) throws IOException { |
| MVCatalog<?> catalog = this.catalog; |
| synchronized (lock) { |
| catalog = this.catalog; |
| if (catalog == null) { |
| catalog = catalogFactory.newCatalog(); |
| } |
| List<MVSchema> schemas = getSchemas(); |
| if (schemas.size() == currSchemas.size() && currSchemas.containsAll(schemas)) { |
| return catalog; |
| } |
| for (MVSchema schema : schemas) { |
| try { |
| // register the schemas that are not already present in catalog. |
| if (!currSchemas.contains(schema)) { |
| catalog.registerSchema(schema); |
| } |
| } catch (Exception e) { |
| // Ignore the schema |
| LOGGER.error( |
| "Error while registering schema for mv: " + schema.getIdentifier().getTableName()); |
| if (LOGGER.isDebugEnabled()) { |
| LOGGER.debug(e.getMessage()); |
| } |
| } |
| } |
| for (MVSchema currSchema : currSchemas) { |
| try { |
| // deregister the schemas from catalog if not present in the path. |
| if (!schemas.contains(currSchema)) { |
| catalog.deregisterSchema(currSchema.getIdentifier()); |
| } |
| } catch (Exception e) { |
| // Ignore the schema |
| LOGGER.error("Error while deregistering schema for mv: " + currSchema.getIdentifier() |
| .getTableName()); |
| if (LOGGER.isDebugEnabled()) { |
| LOGGER.debug(e.getMessage()); |
| } |
| } |
| } |
| this.catalog = catalog; |
| } |
| return catalog; |
| } |
| |
| /** |
| * In case of compaction on mv table,this method will merge the segment list of main table |
| * and return updated segment mapping |
| * |
| * @param mergedLoadName to find which all segments are merged to new compacted segment |
| * @param viewSchema of mv table |
| * @param viewLoadMetadataDetails of mv table |
| * @return updated segment map after merging segment list |
| */ |
| @SuppressWarnings("unchecked") |
| public static String getUpdatedSegmentMap(String mergedLoadName, |
| MVSchema viewSchema, |
| LoadMetadataDetails[] viewLoadMetadataDetails) { |
| Map<String, List<String>> segmentMapping = new HashMap<>(); |
| List<RelationIdentifier> relationIdentifiers = viewSchema.getRelatedTables(); |
| for (RelationIdentifier relationIdentifier : relationIdentifiers) { |
| for (LoadMetadataDetails loadMetadataDetail : viewLoadMetadataDetails) { |
| if (loadMetadataDetail.getSegmentStatus() == SegmentStatus.COMPACTED) { |
| if (mergedLoadName.equalsIgnoreCase(loadMetadataDetail.getMergedLoadName())) { |
| Map segmentMap = new Gson().fromJson(loadMetadataDetail.getExtraInfo(), Map.class); |
| if (segmentMapping.isEmpty()) { |
| segmentMapping.putAll(segmentMap); |
| } else { |
| segmentMapping.get(relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT |
| + relationIdentifier.getTableName()).addAll( |
| (List<String>) segmentMap.get( |
| relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT |
| + relationIdentifier.getTableName())); |
| } |
| } |
| } |
| } |
| } |
| Gson gson = new Gson(); |
| return gson.toJson(segmentMapping); |
| } |
| |
| /** |
| * Get enabled mv status details |
| */ |
| public List<MVStatusDetail> getEnabledStatusDetails() throws IOException { |
| List<MVStatusDetail> statusDetails = new ArrayList<>(); |
| for (String database : this.getDatabases()) { |
| try { |
| statusDetails.addAll(this.getEnabledStatusDetails(database)); |
| } catch (IOException ex) { |
| throw ex; |
| } catch (Exception ex) { |
| if (LOGGER.isDebugEnabled()) { |
| LOGGER.debug(ex.getMessage()); |
| } |
| } |
| } |
| return statusDetails; |
| } |
| |
| /** |
| * Get enabled mv status details |
| */ |
| List<MVStatusDetail> getEnabledStatusDetails(String databaseName) |
| throws IOException { |
| List<MVStatusDetail> statusDetails = schemaProvider.getStatusDetails(this, databaseName); |
| List<MVStatusDetail> enabledStatusDetails = new ArrayList<>(statusDetails.size()); |
| for (MVStatusDetail statusDetail : statusDetails) { |
| if (statusDetail.getStatus() == MVStatus.ENABLED) { |
| enabledStatusDetails.add(statusDetail); |
| } |
| } |
| return enabledStatusDetails; |
| } |
| |
| public void setStatus(RelationIdentifier viewIdentifier, MVStatus viewStatus) |
| throws IOException { |
| MVSchema schema = getSchema( |
| viewIdentifier.getDatabaseName(), viewIdentifier.getTableName()); |
| if (schema != null) { |
| schemaProvider.updateStatus(this, Collections.singletonList(schema), viewStatus); |
| } |
| } |
| |
| public void setStatus(List<MVSchema> viewSchemas, MVStatus viewStatus) |
| throws IOException { |
| if (viewSchemas != null && !viewSchemas.isEmpty()) { |
| schemaProvider.updateStatus(this, viewSchemas, viewStatus); |
| } |
| } |
| |
| public void onDrop(String databaseName, String viewName) |
| throws IOException { |
| MVSchema viewSchema = getSchema(databaseName, viewName); |
| if (viewSchema != null) { |
| schemaProvider.updateStatus( |
| this, Collections.singletonList(viewSchema), MVStatus.DROPPED); |
| } |
| } |
| |
| /** |
| * This method will remove all segments of MV table in case of Insert-Overwrite/Update/Delete |
| * operations on main table |
| * |
| * @param schemas mv schemas |
| */ |
| public void onTruncate(List<MVSchema> schemas) |
| throws IOException { |
| for (MVSchema schema : schemas) { |
| if (!schema.isRefreshOnManual()) { |
| setStatus(schema.identifier, MVStatus.DISABLED); |
| } |
| RelationIdentifier relationIdentifier = schema.getIdentifier(); |
| CarbonTable carbonTable = CarbonMetadata.getInstance() |
| .getCarbonTable(relationIdentifier.getDatabaseName(), relationIdentifier.getTableName()); |
| String tblStatusVersion = ""; |
| if (null != carbonTable) { |
| tblStatusVersion = carbonTable.getTableStatusVersion(); |
| } |
| SegmentStatusManager segmentStatusManager = new SegmentStatusManager(AbsoluteTableIdentifier |
| .from(relationIdentifier.getTablePath(), |
| relationIdentifier.getDatabaseName(), |
| relationIdentifier.getTableName()), tblStatusVersion); |
| ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); |
| try { |
| if (carbonLock.lockWithRetries()) { |
| LOGGER.info("Acquired lock for table" + relationIdentifier.getDatabaseName() + "." |
| + relationIdentifier.getTableName() + " for table status update"); |
| String metaDataPath = |
| CarbonTablePath.getMetadataPath(relationIdentifier.getTablePath()); |
| LoadMetadataDetails[] loadMetadataDetails = |
| SegmentStatusManager.readLoadMetadata(metaDataPath, tblStatusVersion); |
| for (LoadMetadataDetails entry : loadMetadataDetails) { |
| entry.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE); |
| } |
| SegmentStatusManager.writeLoadDetailsIntoFile( |
| CarbonTablePath.getTableStatusFilePath(relationIdentifier.getTablePath(), |
| tblStatusVersion), loadMetadataDetails); |
| } else { |
| LOGGER.error("Not able to acquire the lock for Table status update for table " |
| + relationIdentifier.getDatabaseName() + "." + relationIdentifier |
| .getTableName()); |
| } |
| } finally { |
| if (carbonLock.unlock()) { |
| LOGGER.info( |
| "Table unlocked successfully after table status update" + relationIdentifier |
| .getDatabaseName() + "." + relationIdentifier.getTableName()); |
| } else { |
| LOGGER.error( |
| "Unable to unlock Table lock for table" + relationIdentifier.getDatabaseName() |
| + "." + relationIdentifier.getTableName() |
| + " during table status update"); |
| } |
| } |
| } |
| } |
| |
| } |