blob: 9ace37cac8f96f749473828b6a9dcc0600d38a47 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sentry.provider.db.service.persistent;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.provider.db.service.thrift.SentryConfigurationException;
import org.apache.sentry.provider.db.service.thrift.TSentryStoreRecord;
import org.apache.sentry.provider.db.service.thrift.TStoreSnapshot;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
/**
* The FileLog class abstracts out the log file and all operations on it
* When the FileLog is instantiated, It first checks if there is an existing
* log file associated at the configured path. If yes, then the Client will
* not be allowed to write to the log unless all existing LogEntries have been
* iterated over using the standard iterator abstractions of {@link #hasNext()}
* and {@link #next()}. Once all the entries are read out, it will open an
* Output stream to the log file and clients can start logging.
* The FileLog also support Log snapshots. If it see a special snapshot
* record, the current log file is first closed and renamed. Once the snapshot
* entry has been successfully written to a new log, it will delete the
* old log file.
*/
public class FileLog {
// The default Java ObjectOutputStream cannot be appended to since it
// writes a special header whenever you open the stream. We need this header
// only when a new file is opened. When a store is shutdown and re-opened
// again, subsequent appends should not write the header, else reading the
// file again will throw a StreamCorruptedException.
public class AppendingObjectOutputStream extends ObjectOutputStream {
public AppendingObjectOutputStream(OutputStream out) throws IOException {
super(out);
}
@Override
protected void writeStreamHeader() throws IOException {
// do not write a header
reset();
}
}
public static String SENTRY_FILE_LOG_STORE_LOCATION =
"sentry.file.log.store.location";
private static String COMMIT_LOG_FILE = "commit.log";
public static class Entry {
public final long seqId;
public final TSentryStoreRecord record;
public Entry(long seqId, TSentryStoreRecord record) {
this.seqId = seqId;
this.record = record;
}
}
private volatile boolean isReady = false;
private ObjectInputStream commitOis = null;
private Entry nextEntry = null;
private File logDir;
private ObjectOutputStream commitLog;
private final TSerializer serializer;
private final TDeserializer deserializer;
public FileLog(Configuration conf)
throws SentryConfigurationException, FileNotFoundException, IOException {
String currentDir = System.getProperty("user.dir");
logDir = new File(conf.get(SENTRY_FILE_LOG_STORE_LOCATION, currentDir));
TProtocolFactory protoFactory = new TCompactProtocol.Factory();
serializer = new TSerializer(protoFactory);
deserializer = new TDeserializer(protoFactory);
if (logDir.exists()) {
if (!logDir.isDirectory()) {
throw new SentryConfigurationException(
"Dir [" + logDir.getAbsolutePath() + "] exists and is not a directory !!");
} else {
if (new File(logDir, COMMIT_LOG_FILE).exists()) {
commitOis =
new ObjectInputStream(
new FileInputStream(new File(logDir, COMMIT_LOG_FILE)));
} else {
commitLog =
new ObjectOutputStream(
new FileOutputStream(new File(logDir, COMMIT_LOG_FILE)));
isReady = true;
}
}
} else {
isReady = true;
boolean created = logDir.mkdirs();
if (!created) {
throw new RuntimeException(
"Could not create store directory [" + logDir.getAbsolutePath() + "]");
}
commitLog =
new ObjectOutputStream(new FileOutputStream(new File(logDir,
COMMIT_LOG_FILE)));
}
}
public boolean hasNext() {
if (isReady) {
return false;
}
if (nextEntry != null) {
return true;
}
try {
nextEntry = getNextEntry();
} catch (EOFException e) {
isReady = true;
try {
commitOis.close();
} catch (Exception e2) {
System.out.println("Got ex : " + e2.getMessage());
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TException e) {
throw new RuntimeException(e);
}
return !isReady;
}
private Entry getNextEntry() throws IOException, TException {
long seqId = commitOis.readLong();
int numRecordBytes = commitOis.readInt();
byte[] recBytes = new byte[numRecordBytes];
commitOis.readFully(recBytes);
TSentryStoreRecord record = new TSentryStoreRecord();
deserializer.deserialize(record, recBytes);
return new Entry(seqId, record);
}
// Must be called only after a hasNext();
public Entry next() {
Entry e = nextEntry;
nextEntry = null;
return e;
}
public void log(long seqId, TSentryStoreRecord record) {
if (!isReady) {
throw new RuntimeException("FileLog is not ready for writing yet !!");
}
try {
prepareForSnapshotIfNeeded(seqId, record);
if (commitLog == null) {
commitLog =
new AppendingObjectOutputStream(
new FileOutputStream(new File(logDir, COMMIT_LOG_FILE), true));
}
byte[] recBytes = serializer.serialize(record);
commitLog.writeLong(seqId);
commitLog.writeInt(recBytes.length);
commitLog.write(recBytes);
commitLog.flush();
} catch (Exception e) {
throw new RuntimeException(
"Could not log record with id [" + seqId + "] !!", e);
}
commitIfSnapshot(seqId, record);
}
// Truncate current log file and write the snapshot record
private void prepareForSnapshotIfNeeded(long seqId, TSentryStoreRecord record)
throws IOException {
if (record.getSnapshot() != null) {
// Close current log
if (commitLog != null) {
commitLog.flush();
commitLog.close();
}
// Copy current log to temp
boolean renameSuccess =
new File(logDir, COMMIT_LOG_FILE)
.renameTo(
new File(logDir, COMMIT_LOG_FILE + "_tmp_" + seqId));
if (!renameSuccess) {
throw new IOException("Could not Prepare for snapshot !!");
}
commitLog = new ObjectOutputStream(
new FileOutputStream(new File(logDir, COMMIT_LOG_FILE), true));;
}
}
private void commitIfSnapshot(long seqId, TSentryStoreRecord record) {
if (record.getSnapshot() != null) {
new File(logDir, COMMIT_LOG_FILE + "_tmp_" + seqId).delete();
}
}
public void close() {
if (commitLog != null) {
try {
commitLog.flush();
commitLog.close();
} catch (IOException e) {
System.out.println("Cound not close file : " + e.getMessage());
}
}
}
}