| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.core.view; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.carbondata.common.annotations.InterfaceAudience; |
| import org.apache.carbondata.common.logging.LogServiceFactory; |
| import org.apache.carbondata.core.constants.CarbonCommonConstants; |
| import org.apache.carbondata.core.locks.ICarbonLock; |
| import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; |
| import org.apache.carbondata.core.metadata.schema.table.CarbonTable; |
| import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier; |
| import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; |
| import org.apache.carbondata.core.statusmanager.SegmentStatus; |
| import org.apache.carbondata.core.statusmanager.SegmentStatusManager; |
| import org.apache.carbondata.core.util.path.CarbonTablePath; |
| |
| import com.google.gson.Gson; |
| import org.apache.log4j.Logger; |
| |
| /** |
| * It maintains all the mv schemas in it. |
| */ |
| @InterfaceAudience.Internal |
| public abstract class MVManager { |
| |
| private static final Logger LOGGER = |
| LogServiceFactory.getLogService(MVManager.class.getName()); |
| |
| private final MVProvider schemaProvider = new MVProvider(); |
| |
| private volatile MVCatalog<?> catalog; |
| |
| private final Object lock = new Object(); |
| |
| public MVManager() { |
| |
| } |
| |
| public abstract List<String> getDatabases(); |
| |
| public abstract String getDatabaseLocation(String databaseName); |
| |
| public boolean hasSchemaOnTable(CarbonTable table) throws IOException { |
| List<MVSchema> schemas = getSchemas(); |
| for (MVSchema schema : schemas) { |
| for (RelationIdentifier relatedTable : schema.getRelatedTables()) { |
| if (relatedTable.getDatabaseName().equalsIgnoreCase(table.getDatabaseName()) && |
| relatedTable.getTableName().equalsIgnoreCase(table.getTableName())) { |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * It gives all mv schemas of a given table. |
| * For show mv command. |
| */ |
| public List<MVSchema> getSchemasOnTable(CarbonTable table) |
| throws IOException { |
| List<MVSchema> schemasOnTable = new ArrayList<>(); |
| List<MVSchema> schemas = getSchemas(); |
| for (MVSchema schema : schemas) { |
| boolean isSchemaOnTable = false; |
| for (RelationIdentifier relatedTable : schema.getRelatedTables()) { |
| if (relatedTable.getDatabaseName().equalsIgnoreCase(table.getDatabaseName()) && |
| relatedTable.getTableName().equalsIgnoreCase(table.getTableName())) { |
| isSchemaOnTable = true; |
| break; |
| } |
| } |
| if (isSchemaOnTable) { |
| schemasOnTable.add(schema); |
| } |
| } |
| return schemasOnTable; |
| } |
| |
| /** |
| * It gives all mv schemas of a given table. |
| * For show mv command. |
| */ |
| public List<MVSchema> getSchemasOnTable(String databaseName, |
| CarbonTable carbonTable) throws IOException { |
| return schemaProvider.getSchemas(this, databaseName, carbonTable); |
| } |
| |
| /** |
| * It gives all mv schemas from store. |
| */ |
| public List<MVSchema> getSchemas() throws IOException { |
| List<MVSchema> schemas = new ArrayList<>(); |
| for (String database : this.getDatabases()) { |
| schemas.addAll(this.getSchemas(database)); |
| } |
| return schemas; |
| } |
| |
| /** |
| * It gives all mv schemas from store. |
| */ |
| public List<MVSchema> getSchemas(String databaseName) throws IOException { |
| return schemaProvider.getSchemas(this, databaseName); |
| } |
| |
| public MVSchema getSchema(String databaseName, String viewName) throws IOException { |
| return schemaProvider.getSchema(this, databaseName, viewName); |
| } |
| |
| /** |
| * Saves the mv schema to storage |
| * |
| * @param viewSchema mv schema |
| */ |
| public void createSchema(String databaseName, MVSchema viewSchema) |
| throws IOException { |
| schemaProvider.saveSchema(this, databaseName, viewSchema); |
| } |
| |
| /** |
| * Drops the mv schema from storage |
| * |
| * @param viewName index name |
| */ |
| public void deleteSchema(String databaseName, String viewName) throws IOException { |
| schemaProvider.dropSchema(this, databaseName, viewName); |
| } |
| |
| /** |
| * Get the mv catalog. |
| */ |
| public MVCatalog<?> getCatalog() { |
| return catalog; |
| } |
| |
| /** |
| * Get the mv catalog. |
| */ |
| public MVCatalog<?> getCatalog( |
| MVCatalogFactory<?> catalogFactory, |
| boolean reload) throws IOException { |
| MVCatalog<?> catalog = this.catalog; |
| if (reload || catalog == null) { |
| synchronized (lock) { |
| catalog = this.catalog; |
| if (reload || catalog == null) { |
| catalog = catalogFactory.newCatalog(); |
| List<MVSchema> schemas = getSchemas(); |
| if (null == catalog) { |
| throw new RuntimeException("Internal Error."); |
| } |
| for (MVSchema schema : schemas) { |
| try { |
| catalog.registerSchema(schema); |
| } catch (Exception e) { |
| // Ignore the schema |
| LOGGER.error("Error while registering schema", e); |
| } |
| } |
| this.catalog = catalog; |
| } |
| } |
| } |
| return catalog; |
| } |
| |
| /** |
| * In case of compaction on mv table,this method will merge the segment list of main table |
| * and return updated segment mapping |
| * |
| * @param mergedLoadName to find which all segments are merged to new compacted segment |
| * @param viewSchema of mv table |
| * @param viewLoadMetadataDetails of mv table |
| * @return updated segment map after merging segment list |
| */ |
| @SuppressWarnings("unchecked") |
| public static String getUpdatedSegmentMap(String mergedLoadName, |
| MVSchema viewSchema, |
| LoadMetadataDetails[] viewLoadMetadataDetails) { |
| Map<String, List<String>> segmentMapping = new HashMap<>(); |
| List<RelationIdentifier> relationIdentifiers = viewSchema.getRelatedTables(); |
| for (RelationIdentifier relationIdentifier : relationIdentifiers) { |
| for (LoadMetadataDetails loadMetadataDetail : viewLoadMetadataDetails) { |
| if (loadMetadataDetail.getSegmentStatus() == SegmentStatus.COMPACTED) { |
| if (mergedLoadName.equalsIgnoreCase(loadMetadataDetail.getMergedLoadName())) { |
| Map segmentMap = new Gson().fromJson(loadMetadataDetail.getExtraInfo(), Map.class); |
| if (segmentMapping.isEmpty()) { |
| segmentMapping.putAll(segmentMap); |
| } else { |
| segmentMapping.get(relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT |
| + relationIdentifier.getTableName()).addAll( |
| (List<String>) segmentMap.get( |
| relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT |
| + relationIdentifier.getTableName())); |
| } |
| } |
| } |
| } |
| } |
| Gson gson = new Gson(); |
| return gson.toJson(segmentMapping); |
| } |
| |
| /** |
| * Get enabled mv status details |
| */ |
| public List<MVStatusDetail> getEnabledStatusDetails() throws IOException { |
| List<MVStatusDetail> statusDetails = new ArrayList<>(); |
| for (String database : this.getDatabases()) { |
| statusDetails.addAll(this.getEnabledStatusDetails(database)); |
| } |
| return statusDetails; |
| } |
| |
| /** |
| * Get enabled mv status details |
| */ |
| List<MVStatusDetail> getEnabledStatusDetails(String databaseName) |
| throws IOException { |
| List<MVStatusDetail> statusDetails = schemaProvider.getStatusDetails(this, databaseName); |
| List<MVStatusDetail> enabledStatusDetails = new ArrayList<>(statusDetails.size()); |
| for (MVStatusDetail statusDetail : statusDetails) { |
| if (statusDetail.getStatus() == MVStatus.ENABLED) { |
| enabledStatusDetails.add(statusDetail); |
| } |
| } |
| return enabledStatusDetails; |
| } |
| |
| public void setStatus(RelationIdentifier viewIdentifier, MVStatus viewStatus) |
| throws IOException { |
| MVSchema schema = getSchema( |
| viewIdentifier.getDatabaseName(), viewIdentifier.getTableName()); |
| if (schema != null) { |
| schemaProvider.updateStatus(this, Collections.singletonList(schema), viewStatus); |
| } |
| } |
| |
| public void setStatus(List<MVSchema> viewSchemas, MVStatus viewStatus) |
| throws IOException { |
| if (viewSchemas != null && !viewSchemas.isEmpty()) { |
| schemaProvider.updateStatus(this, viewSchemas, viewStatus); |
| } |
| } |
| |
| public void onDrop(String databaseName, String viewName) |
| throws IOException { |
| MVSchema viewSchema = getSchema(databaseName, viewName); |
| if (viewSchema != null) { |
| schemaProvider.updateStatus( |
| this, Collections.singletonList(viewSchema), MVStatus.DROPPED); |
| } |
| } |
| |
| /** |
| * This method will remove all segments of MV table in case of Insert-Overwrite/Update/Delete |
| * operations on main table |
| * |
| * @param schemas mv schemas |
| */ |
| public void onTruncate(List<MVSchema> schemas) |
| throws IOException { |
| for (MVSchema schema : schemas) { |
| if (!schema.isRefreshOnManual()) { |
| setStatus(schema.identifier, MVStatus.DISABLED); |
| } |
| RelationIdentifier relationIdentifier = schema.getIdentifier(); |
| SegmentStatusManager segmentStatusManager = new SegmentStatusManager(AbsoluteTableIdentifier |
| .from(relationIdentifier.getTablePath(), |
| relationIdentifier.getDatabaseName(), |
| relationIdentifier.getTableName())); |
| ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); |
| try { |
| if (carbonLock.lockWithRetries()) { |
| LOGGER.info("Acquired lock for table" + relationIdentifier.getDatabaseName() + "." |
| + relationIdentifier.getTableName() + " for table status updation"); |
| String metaDataPath = |
| CarbonTablePath.getMetadataPath(relationIdentifier.getTablePath()); |
| LoadMetadataDetails[] loadMetadataDetails = |
| SegmentStatusManager.readLoadMetadata(metaDataPath); |
| for (LoadMetadataDetails entry : loadMetadataDetails) { |
| entry.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE); |
| } |
| SegmentStatusManager.writeLoadDetailsIntoFile( |
| CarbonTablePath.getTableStatusFilePath(relationIdentifier.getTablePath()), |
| loadMetadataDetails); |
| } else { |
| LOGGER.error("Not able to acquire the lock for Table status updation for table " |
| + relationIdentifier.getDatabaseName() + "." + relationIdentifier |
| .getTableName()); |
| } |
| } finally { |
| if (carbonLock.unlock()) { |
| LOGGER.info( |
| "Table unlocked successfully after table status updation" + relationIdentifier |
| .getDatabaseName() + "." + relationIdentifier.getTableName()); |
| } else { |
| LOGGER.error( |
| "Unable to unlock Table lock for table" + relationIdentifier.getDatabaseName() |
| + "." + relationIdentifier.getTableName() |
| + " during table status updation"); |
| } |
| } |
| } |
| } |
| |
| } |