blob: 0ecf024b25035f7aceb71204aeba3fce6712735c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sentry.provider.db.service.persistent;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hive.com.esotericsoftware.minlog.Log;
import org.apache.sentry.SentryUserException;
import org.apache.sentry.provider.db.service.persistent.FileLog.Entry;
import org.apache.sentry.provider.db.service.thrift.TSentryStoreOp;
import org.apache.sentry.provider.db.service.thrift.TSentryStoreRecord;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IAtomicLong;
import com.hazelcast.core.IQueue;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.Member;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;
/**
* A special subclass of the {@link SentryStoreWithFileLog} that uses the
* same persistence strategy but in addition to logging to a local
* {@link FileLog} will also publish the record to a distributed topic to be
* consumed by other peer Stores in a Sentry cluster.
*
* The class uses the Hazelcast library for :
* 1) Distributed counter for sequence Id
* 2) Distributed Topic with global ordering to replicate log entries to peer
* SentryStores
*
* Consistency Guarantees:
* Hazelcast counters are globally consistent, but since it is optimized for
* availability rather than consistency it can suffer from split-brain issues
* in the event of a network partition. This should not be too much of an issue
* for small deployments of 2-3 instances deployed in the same availability
* zone, But strict consistency can be guaranteed by either:
* 1) Wrapping this store with a {@link SentryStoreWithDistributedLock} which
* uses the Curator library's distributed read write lock which is based
* on Zookeeper. Zookeper has a stricter consistency model since all writes
* go thru a master.
* 2) Deploying SentryService in active-standy mode which ensures writes are
* handled only by the active node. This can be trivially
* accomplished by setting the serivce discovery policy in the
* SentrtyPolicyServiceClient to 'STICKY' rather than the default
* 'ROUND-ROBIN'
*
* Other Considerations:
* This implementation also supports new peers joining an existing cluster.
* The new peer will be brought upto speed with the other members of the
* cluster in the following manner:
* 1) All members listen to a special 'catchupRequest' topic.
* 2) When a new member starts-up, it publishes a 'CatchupRequest' to the
* 'catchUpRequest' topic. The CatchupRequest contains the last seen
* record's sequence number from its local FileLog.
* 3) The new member also creates a uniquely named Distributed Queue and waits
* for messages on it. This queue name is also included in the
* CatchupRequest.
* 4) This request is received by all the existing members of the cluster, it
* is ignored by all members EXCEPT the oldest member of the cluster which
* responds to the request by pushing the required records to the
* Distributed queue.
*/
public class SentryStoreWithReplicatedLog extends SentryStoreWithFileLog {
private static final Logger LOGGER = LoggerFactory
.getLogger(SentryStoreWithReplicatedLog.class);
private static final int WAIT_SLEEP_MS = 500;
/**
* Only successful events are sent to the topic (failed records are sent as
* no-op so that no gaps exist in the seqId)
* Basically write behind logging.. If the publisher crashes before
* writing to log, the update is not applied to the remote nodes.
* Think this should be fine (It is similar to the case when Sentry
* Service receives a message but dies while/before opening a db transaction)
*/
public static class LogEntry implements Serializable {
private static final long serialVersionUID = 8798360797372277777L;
long seqId = -1;
long nodeId = -1;
byte[] recordBytes;
public LogEntry() {}
public LogEntry(long seqId, long nodeId, byte[] recordBytes) {
this.seqId = seqId;
this.nodeId = nodeId;
this.recordBytes = recordBytes;
}
}
public static class CatchUpRequest implements Serializable {
private static final long serialVersionUID = -3345746198249394847L;
long startSeqId = -1;
long endSeqId = -1;
long nodeId = -1;
public CatchUpRequest() {}
public CatchUpRequest(long startSeqId, long endSeqId, long nodeId) {
this.startSeqId = startSeqId;
this.endSeqId = endSeqId;
this.nodeId = nodeId;
}
}
class LogEntryWorker implements Runnable{
@Override
public void run() {
while(true) {
try {
processLogEntry(entryQueue.take());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (SentryUserException e) {
throw new RuntimeException(e);
}
}
}
}
private final TSerializer serializer;
private final TDeserializer deserializer;
private final BlockingQueue<LogEntry> entryQueue =
new LinkedBlockingQueue<LogEntry>();
private Thread entryWorker;
// Distributed stuff
private final HazelcastInstance hInst;
private final long nodeId;
private final IAtomicLong globalSeqId;
private final ITopic<LogEntry> dTopic;
private final ITopic<CatchUpRequest> catchUpReqTopic;
public SentryStoreWithReplicatedLog(SentryStore sentryStore)
throws FileNotFoundException, IOException, TException,
SentryUserException {
this(sentryStore,
DistributedUtils.getHazelcastInstance(sentryStore.getConfiguration(),
true));
}
public SentryStoreWithReplicatedLog(SentryStore sentryStore,
final HazelcastInstance hInst) throws FileNotFoundException, IOException,
TException, SentryUserException {
super(sentryStore);
this.hInst = hInst;
TProtocolFactory protoFactory = new TCompactProtocol.Factory();
serializer = new TSerializer(protoFactory);
deserializer = new TDeserializer(protoFactory);
nodeId = hInst.getIdGenerator(DistributedUtils.SENTRY_STORE_NODEID).newId();
globalSeqId = hInst.getAtomicLong(DistributedUtils.SENTRY_STORE_SEQID);
dTopic = hInst.getTopic(DistributedUtils.SENTRY_DISTRIBUTED_TOPIC);
dTopic.addMessageListener(new MessageListener<SentryStoreWithReplicatedLog.LogEntry>() {
@Override
public void onMessage(Message<LogEntry> msg) {
LogEntry ent = msg.getMessageObject();
try {
entryQueue.put(ent);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
catchUpReqTopic = hInst.getTopic(DistributedUtils.SENTRY_CATCHUP_REQUEST_TOPIC);
requestCatchUpIfRequired();
catchUpReqTopic.addMessageListener(new MessageListener<CatchUpRequest>() {
@Override
public void onMessage(final Message<CatchUpRequest> req) {
LOGGER.info("Catchup request received["
+ req.getMessageObject().startSeqId + ", "
+ req.getMessageObject().endSeqId + ", "
+ req.getMessageObject().nodeId + "]");
handleCatchupRequest(req.getMessageObject(), hInst);
}
});
entryWorker = new Thread(new LogEntryWorker(), "Log Entry Worker");
entryWorker.start();
}
private void handleCatchupRequest(CatchUpRequest req, HazelcastInstance hInst) {
Member oldestMember = hInst.getCluster().getMembers().iterator().next();
// Am I oldest member ?
if (oldestMember.localMember()) {
long myLastSeen = lastSeenSeqId.get();
if (req.endSeqId > myLastSeen) {
LOGGER.info("Waiting for seq Id[" + myLastSeen + ", " + req.endSeqId + "]");
try {
Thread.sleep(getConfiguration().getInt(
DistributedUtils.SENTRY_CATCHUP_WAIT_TIME,
DistributedUtils.SENTRY_CATCHUP_WAIT_TIME_DEF));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// Still not updated ?
if (req.endSeqId > myLastSeen) {
throw new RuntimeException(
"Havnt recieved latest updated.. cannot respond to catchup request !!");
}
FileLog tempLog = null;
try {
tempLog = new FileLog(getStore().getConfiguration());
} catch (Exception e) {
throw new RuntimeException("Could not open FileLog to send catchup entries !!");
}
IQueue<LogEntry> respQueue =
hInst.getQueue(DistributedUtils.SENTRY_CATCHUP_RESPONSE_QUEUE + req.nodeId);
while (tempLog.hasNext()) {
Entry entry = tempLog.next();
if ((entry.seqId >= req.startSeqId)&&(entry.seqId <= req.endSeqId)) {
try {
respQueue.offer(new LogEntry(entry.seqId, nodeId, serializer.serialize(entry.record)));
LOGGER.info("Sent Catchup entry [" + entry.seqId + ","
+ nodeId + ", " + req.nodeId + "]");
} catch (TException e) {
throw new RuntimeException("Could not send catchup entry !!", e);
}
}
}
}
}
private void requestCatchUpIfRequired() throws SentryUserException {
long startSeqId = lastSeenSeqId.get() + 1;
long endSeqId = globalSeqId.get();
if (startSeqId <= endSeqId) {
LOGGER.info("Sending Catchup request ["
+ startSeqId + ", "
+ endSeqId + ", "
+ nodeId + "]");
// Send request for catchup entries
IQueue<LogEntry> respQueue =
hInst.getQueue(DistributedUtils.SENTRY_CATCHUP_RESPONSE_QUEUE + nodeId);
catchUpReqTopic.publish(
new CatchUpRequest(startSeqId, endSeqId, nodeId));
receiveCatchUpEntries(respQueue, endSeqId);
}
}
private void receiveCatchUpEntries(IQueue<LogEntry> catchUpQueue,
long endSeqId) throws SentryUserException {
long currSeqId = -1;
while (currSeqId < endSeqId) {
try {
LogEntry entry =
catchUpQueue.poll(
getConfiguration().getInt(
DistributedUtils.SENTRY_CATCHUP_WAIT_TIME,
DistributedUtils.SENTRY_CATCHUP_WAIT_TIME_DEF),
TimeUnit.MILLISECONDS);
if (entry == null) {
String msg =
"Havnt received all catchup entries [" + currSeqId + ", "
+ endSeqId + "]!!";
LOGGER.error(msg);
throw new RuntimeException(msg);
}
LOGGER.info("Received catchup [" + entry.seqId + ", " + entry.nodeId + "]");
currSeqId = entry.seqId;
processLogEntry(entry);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
catchUpQueue.destroy();
}
private void processLogEntry(LogEntry ent) throws SentryUserException {
TSentryStoreRecord record = new TSentryStoreRecord();
try {
deserializer.deserialize(record, ent.recordBytes);
} catch (TException e) {
String msg = "Could not de-serialize record [" + ent.seqId + "]!!";
Log.error(msg, e);
throw new RuntimeException(msg, e);
}
// No need to update the publisher (Thats already done)
if (ent.nodeId != SentryStoreWithReplicatedLog.this.nodeId) {
try {
applyRecord(record);
fileLog.log(ent.seqId, getSnapshotIfRequired(ent.seqId, record));
lastSeenSeqId.set(ent.seqId);
} catch (SentryUserException e) {
String msg = "Could not apply de-serialized record [" + ent.seqId + "]!!";
Log.error(msg, e);
throw new RuntimeException(msg, e);
}
}
}
@Override
protected FileLogContext createRecord(TSentryStoreRecord record) {
return new FileLogContext(globalSeqId.incrementAndGet(), record);
}
@Override
protected void onSuccess(FileLogContext context) {
super.onSuccess(context);
lastSeenSeqId.set(context.seqId);
try {
dTopic.publish(new LogEntry(context.seqId, nodeId, serializer
.serialize(context.record)));
} catch (TException e) {
throw new RuntimeException("Could not serialize Sentry record !!");
}
}
@Override
protected void onFailure(FileLogContext context) {
// Publish a NO-OP record (since we dont want any gaps in the seqId)
super.onFailure(context);
lastSeenSeqId.set(context.seqId);
try {
dTopic.publish(new LogEntry(context.seqId, nodeId, serializer
.serialize(new TSentryStoreRecord(TSentryStoreOp.NO_OP))));
} catch (TException e) {
throw new RuntimeException("Could not serialize Sentry record !!");
}
}
public boolean waitForReplicattionToComplete(long timeInMs) {
long totalWait = 0;
while (true) {
if (lastSeenSeqId.get() == globalSeqId.get()) {
return true;
}
try {
Thread.sleep(WAIT_SLEEP_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
totalWait += WAIT_SLEEP_MS;
if (totalWait >= timeInMs) {
return false;
}
}
}
}