blob: 8f640a0d846a1b36c4fcab69c15f2bd5cfc63d3d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sentry.provider.db.service.persistent;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.sentry.SentryUserException;
import org.apache.sentry.provider.db.service.persistent.FileLog.Entry;
import org.apache.sentry.provider.db.service.thrift.TSentryStoreOp;
import org.apache.sentry.provider.db.service.thrift.TSentryStoreRecord;
import org.apache.thrift.TException;
/**
* An implementation of the {@link PersistentSentryStore}. The Persistence
* strategy used by this class is to log all write operations to a local file
* using the {@link FileLog} log abstraction. Each write operation is stamp
* with a monotonically +1 incrementing sequence Id. This guarantees that, after
* a restart, the Sentry Store can read the log in the same order it was
* written and would return to the same state it was before it was brought down.
* The logging is also write-behind (operations are logged only after it has
* been accepted by the underlying store) to ensure that erroneous operations
* that would bring down the Store are not logged.
*
* To limit the size of the log file, it requests the underlying SentryStore
* to provide it with a snapshot of the store after a configurable number of
* operations, which it writes to a new log file and and truncates the old one.
*
*/
public class SentryStoreWithFileLog extends
PersistentSentryStore<SentryStoreWithFileLog.FileLogContext> {
public static final int SENTRY_STORE_FILE_LOG_SNAPSHOT_THRESHOLD_DEF = 100;
public static final String SENTRY_STORE_FILE_LOG_SNAPSHOT_THRESHOLD =
"sentry.store.file.log.snapshot.threshold";
/**
* An implementation of the {@link PersistentContext} that is created prior
* to the operation and stores the write record.
*/
public static class FileLogContext implements
PersistentSentryStore.PersistentContext {
final long seqId;
final TSentryStoreRecord record;
FileLogContext(long seqId, TSentryStoreRecord record) {
this.seqId = seqId;
this.record = record;
}
}
protected final FileLog fileLog;
protected final AtomicLong lastSeenSeqId = new AtomicLong(0);
protected final int snapshotThreshold;
public SentryStoreWithFileLog(SentryStore sentryStore)
throws FileNotFoundException, IOException, TException, SentryUserException {
super(sentryStore);
snapshotThreshold =
getConfiguration().getInt(
SENTRY_STORE_FILE_LOG_SNAPSHOT_THRESHOLD,
SENTRY_STORE_FILE_LOG_SNAPSHOT_THRESHOLD_DEF);
fileLog = new FileLog(getConfiguration());
Entry ent = null;
while (fileLog.hasNext()) {
ent = fileLog.next();
applyRecord(ent.record);
}
if (ent != null) {
lastSeenSeqId.set(ent.seqId);
}
}
@Override
protected FileLogContext createRecord(TSentryStoreRecord record) {
return new FileLogContext(lastSeenSeqId.incrementAndGet(), record);
}
@Override
protected void onSuccess(FileLogContext context) {
fileLog.log(context.seqId,
getSnapshotIfRequired(context.seqId, context.record));
}
@Override
protected void onFailure(FileLogContext context) {
fileLog.log(context.seqId,
getSnapshotIfRequired(context.seqId,
new TSentryStoreRecord(TSentryStoreOp.NO_OP)));
}
protected TSentryStoreRecord getSnapshotIfRequired(long seqId, TSentryStoreRecord record) {
if ((seqId > 0) && (seqId % snapshotThreshold == 0)) {
if (record.getStoreOp() == TSentryStoreOp.SNAPSHOT) {
return record;
}
TSentryStoreRecord snapshotRecord = new TSentryStoreRecord(TSentryStoreOp.SNAPSHOT);
snapshotRecord.setSnapshot(getStore().toSnapshot());
return snapshotRecord;
} else {
return record;
}
}
@Override
public void stop() {
super.stop();
fileLog.close();
}
}