blob: b4267c0c3c7a1e749dff629d70c846d778d7dfc3 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.impala.thrift.TCatalogObject;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
/**
* Represents a log of deleted catalog objects.
*
* There are currently two use cases for this log:
*
* a) Processing catalog updates in the impalads
* The impalad catalog cache can be modified by either a state store update or by a
* direct update that applies the result of a catalog operation to the cache
* out-of-band of a state store update. This thread safe log tracks the divergence
* (due to direct updates to the cache) of this impalad's cache from the last state
* store update. This log is needed to ensure work is never undone. For example,
* consider the following sequence of events:
* t1: [Direct Update] - Add item A - (Catalog Version 9)
* t2: [Direct Update] - Drop item A - (Catalog Version 10)
* t3: [StateStore Update] - (From Catalog Version 9)
* This log is used to ensure the state store update in t3 does not undo the drop in t2.
* Currently this only tracks objects that were dropped, since the catalog cache can be
* queried to check if an object was added. TODO: Also track object additions from async
* operations. This could be used to to "replay" the log in the case of a catalog reset
* ("invalidate metadata"). Currently, the catalog may briefly go back in time if
* "invalidate metadata" is run concurrently with async catalog operations.
*
* b) Building catalog topic updates in the catalogd
* The catalogd uses this log to identify deleted catalog objects that have been deleted
* since the last catalog topic update. Once the catalog topic update is constructed,
* the old entries in the log are garbage collected to prevent the log from growing
* indefinitely.
*/
public class CatalogDeltaLog {
// Map of the catalog version an object was removed from the catalog
// to the catalog object, ordered by catalog version.
private SortedMap<Long, TCatalogObject> removedCatalogObjects_ =
new TreeMap<Long, TCatalogObject>();
/**
* Adds a new item to the map of removed catalog objects.
*/
public synchronized void addRemovedObject(TCatalogObject catalogObject) {
Preconditions.checkNotNull(catalogObject);
removedCatalogObjects_.put(catalogObject.getCatalog_version(), catalogObject);
}
/**
* Retrieve all the removed catalog objects with versions in range
* (fromVersion, toVersion].
*/
public synchronized List<TCatalogObject> retrieveObjects(long fromVersion,
long toVersion) {
SortedMap<Long, TCatalogObject> objects =
removedCatalogObjects_.subMap(fromVersion + 1, toVersion + 1);
return ImmutableList.<TCatalogObject>copyOf(objects.values());
}
/**
* Given the current catalog version, removes all items with catalogVersion <
* currectCatalogVersion. Such objects do not need to be tracked in the delta
* log anymore because they are consistent with the state store's view of the
* catalog.
*/
public synchronized void garbageCollect(long currentCatalogVersion) {
// Nothing will be garbage collected so avoid creating a new object.
if (!removedCatalogObjects_.isEmpty() &&
removedCatalogObjects_.firstKey() < currentCatalogVersion) {
removedCatalogObjects_ = new TreeMap<Long, TCatalogObject>(
removedCatalogObjects_.tailMap(currentCatalogVersion));
}
}
/**
* Checks if a matching catalog object was removed in a catalog version after this
* object's catalog version. Returns true if there was a matching object that was
* removed after this object, false otherwise.
*/
public synchronized boolean wasObjectRemovedAfter(TCatalogObject catalogObject) {
Preconditions.checkNotNull(catalogObject);
if (removedCatalogObjects_.isEmpty()) return false;
// Get all the items that were removed after the catalog version of this object.
SortedMap<Long, TCatalogObject> candidateObjects =
removedCatalogObjects_.tailMap(catalogObject.getCatalog_version());
for (Map.Entry<Long, TCatalogObject> entry: candidateObjects.entrySet()) {
if (Catalog.keyEquals(catalogObject, entry.getValue())) return true;
}
return false;
}
}