ZOOKEEPER-4794: Reduce the ZKDatabase#committedLog memory usage.
Reduce the committed log memory usage.
Fix ci.
Reviewers: eolivelli, hangc0276, anmolnar
Author: horizonzy
Closes #2115 from horizonzy/reduce-committed-log-memory-usage
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
index 10111c8..27fa4e2 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
@@ -19,7 +19,6 @@
package org.apache.zookeeper.server;
import static java.nio.charset.StandardCharsets.UTF_8;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
@@ -169,24 +168,16 @@ public boolean isThrottlable() {
&& this.type != OpCode.createSession;
}
- private transient byte[] serializeData;
-
- @SuppressFBWarnings(value = "EI_EXPOSE_REP")
public byte[] getSerializeData() {
if (this.hdr == null) {
return null;
}
-
- if (this.serializeData == null) {
- try {
- this.serializeData = Util.marshallTxnEntry(this.hdr, this.txn, this.txnDigest);
- } catch (IOException e) {
- LOG.error("This really should be impossible.", e);
- this.serializeData = new byte[32];
- }
+ try {
+ return Util.marshallTxnEntry(this.hdr, this.txn, this.txnDigest);
+ } catch (IOException e) {
+ LOG.error("This really should be impossible.", e);
+ return new byte[32];
}
-
- return this.serializeData;
}
/**
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogProposalIterator.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogProposalIterator.java
index 847e3b2..2d6ecb6 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogProposalIterator.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogProposalIterator.java
@@ -58,20 +58,19 @@ public boolean hasNext() {
@Override
public Proposal next() {
- Proposal p = new Proposal();
+ Proposal p;
try {
byte[] serializedData = Util.marshallTxnEntry(itr.getHeader(), itr.getTxn(), itr.getDigest());
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, itr.getHeader().getZxid(), serializedData, null);
- p.packet = pp;
- p.request = null;
-
+ p = new Proposal(pp);
// This is the only place that can throw IO exception
hasNext = itr.next();
} catch (IOException e) {
LOG.error("Unable to read txnlog from disk", e);
hasNext = false;
+ p = new Proposal();
}
return p;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
index eaad05c..d98c97f 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
@@ -54,9 +54,8 @@
import org.apache.zookeeper.server.persistence.FileTxnSnapLog.PlayBackListener;
import org.apache.zookeeper.server.persistence.SnapStream;
import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator;
-import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.server.quorum.Leader.Proposal;
-import org.apache.zookeeper.server.quorum.QuorumPacket;
+import org.apache.zookeeper.server.quorum.Leader.PureRequestProposal;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.txn.TxnDigest;
@@ -323,19 +322,15 @@ public void addCommittedProposal(Request request) {
wl.lock();
if (committedLog.size() > commitLogCount) {
committedLog.remove();
- minCommittedLog = committedLog.peek().packet.getZxid();
+ minCommittedLog = committedLog.peek().getZxid();
}
if (committedLog.isEmpty()) {
minCommittedLog = request.zxid;
maxCommittedLog = request.zxid;
}
- byte[] data = request.getSerializeData();
- QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
- Proposal p = new Proposal();
- p.packet = pp;
- p.request = request;
+ PureRequestProposal p = new PureRequestProposal(request);
committedLog.add(p);
- maxCommittedLog = p.packet.getZxid();
+ maxCommittedLog = p.getZxid();
} finally {
wl.unlock();
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
index 3b9c827..0b57bb1 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
@@ -88,14 +88,60 @@ public class Leader extends LearnerMaster {
public static class Proposal extends SyncedLearnerTracker {
- public QuorumPacket packet;
- public Request request;
+ private QuorumPacket packet;
+ protected Request request;
+
+ public Proposal() {
+ }
+
+ public Proposal(QuorumPacket packet) {
+ this.packet = packet;
+ }
+
+ public Proposal(Request request, QuorumPacket packet) {
+ this.request = request;
+ this.packet = packet;
+ }
+
+ public QuorumPacket getQuorumPacket() {
+ return packet;
+ }
+
+ public Request getRequest() {
+ return request;
+ }
+
+ public long getZxid() {
+ return packet.getZxid();
+ }
@Override
public String toString() {
return packet.getType() + ", " + packet.getZxid() + ", " + request;
}
+ }
+ public static class PureRequestProposal extends Proposal {
+
+ public PureRequestProposal(Request request) {
+ this.request = request;
+ }
+
+ @Override
+ public QuorumPacket getQuorumPacket() {
+ byte[] data = request.getSerializeData();
+ return new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
+ }
+
+ @Override
+ public long getZxid() {
+ return request.zxid;
+ }
+
+ @Override
+ public String toString() {
+ return request.toString();
+ }
}
// log ack latency if zxid is a multiple of ackLoggingFrequency. If <=0, disable logging.
@@ -1258,9 +1304,7 @@ public Proposal propose(Request request) throws XidRolloverException {
proposalStats.setLastBufferSize(data.length);
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
- Proposal p = new Proposal();
- p.packet = pp;
- p.request = request;
+ Proposal p = new Proposal(request, pp);
synchronized (this) {
p.addQuorumVerifier(self.getQuorumVerifier());
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
index e9d5cd4..049336a 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
@@ -957,7 +957,7 @@ protected long queueCommittedProposals(Iterator<Proposal> itr, long peerLastZxid
while (itr.hasNext()) {
Proposal propose = itr.next();
- long packetZxid = propose.packet.getZxid();
+ long packetZxid = propose.getZxid();
// abort if we hit the limit
if ((maxZxid != null) && (packetZxid > maxZxid)) {
break;
@@ -1020,7 +1020,7 @@ protected long queueCommittedProposals(Iterator<Proposal> itr, long peerLastZxid
// Since this is already a committed proposal, we need to follow
// it by a commit packet
- queuePacket(propose.packet);
+ queuePacket(propose.getQuorumPacket());
queueOpPacket(Leader.COMMIT, packetZxid);
queuedZxid = packetZxid;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumOracleMaj.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumOracleMaj.java
index 01f3a82..b3e7fa2 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumOracleMaj.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/flexible/QuorumOracleMaj.java
@@ -123,18 +123,18 @@ public boolean revalidateOutstandingProp(Leader self, ArrayList<Leader.Proposal>
LOG.debug("Start Revalidation outstandingProposals");
try {
while (outstandingProposal.size() >= 1) {
- outstandingProposal.sort((o1, o2) -> (int) (o1.packet.getZxid() - o2.packet.getZxid()));
+ outstandingProposal.sort((o1, o2) -> (int) (o1.getZxid() - o2.getZxid()));
Leader.Proposal p;
int i = 0;
while (i < outstandingProposal.size()) {
p = outstandingProposal.get(i);
- if (p.request.zxid > lastCommitted) {
- LOG.debug("Re-validate outstanding proposal: 0x{} size:{} lastCommitted:{}", Long.toHexString(p.request.zxid), outstandingProposal.size(), Long.toHexString(lastCommitted));
- if (!self.tryToCommit(p, p.request.zxid, null)) {
+ if (p.getZxid() > lastCommitted) {
+ LOG.debug("Re-validate outstanding proposal: 0x{} size:{} lastCommitted:{}", Long.toHexString(p.getZxid()), outstandingProposal.size(), Long.toHexString(lastCommitted));
+ if (!self.tryToCommit(p, p.getZxid(), null)) {
break;
} else {
- lastCommitted = p.request.zxid;
+ lastCommitted = p.getZxid();
outstandingProposal.remove(p);
}
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java
index f9de451..518dcef 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LeaderWithObserverTest.java
@@ -25,6 +25,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.File;
+import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Set;
@@ -143,7 +144,9 @@ public void testWaitForNewLeaderAck() throws Exception {
long zxid = leader.zk.getZxid();
// things needed for waitForNewLeaderAck to run (usually in leader.lead(), but we're not running leader here)
- leader.newLeaderProposal.packet = new QuorumPacket(0, zxid, null, null);
+ Field field = Leader.Proposal.class.getDeclaredField("packet");
+ field.setAccessible(true);
+ field.set(leader.newLeaderProposal, new QuorumPacket(0, zxid, null, null));
leader.newLeaderProposal.addQuorumVerifier(peer.getQuorumVerifier());
Set<Long> ackSet = leader.newLeaderProposal.qvAcksetPairs.get(0).getAckset();
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
index bbf3636..4320271 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
@@ -86,14 +86,14 @@ public long getDataTreeLastProcessedZxid() {
public long getmaxCommittedLog() {
if (!committedLog.isEmpty()) {
- return committedLog.getLast().packet.getZxid();
+ return committedLog.getLast().getZxid();
}
return 0;
}
public long getminCommittedLog() {
if (!committedLog.isEmpty()) {
- return committedLog.getFirst().packet.getZxid();
+ return committedLog.getFirst().getZxid();
}
return 0;
}
@@ -107,7 +107,7 @@ public ReentrantReadWriteLock getLogLock() {
}
public Iterator<Proposal> getProposalsFromTxnLog(long peerZxid, long limit) {
- if (peerZxid >= txnLog.peekFirst().packet.getZxid()) {
+ if (peerZxid >= txnLog.peekFirst().getZxid()) {
return txnLog.iterator();
} else {
return Collections.emptyIterator();
@@ -150,10 +150,10 @@ public Long answer(InvocationOnMock invocation) {
}
Proposal createProposal(long zxid) {
- Proposal p = new Proposal();
- p.packet = new QuorumPacket();
- p.packet.setZxid(zxid);
- p.packet.setType(Leader.PROPOSAL);
+ QuorumPacket packet = new QuorumPacket();
+ packet.setZxid(zxid);
+ packet.setType(Leader.PROPOSAL);
+ Proposal p = new Proposal(packet);
return p;
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/GetProposalFromTxnTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/GetProposalFromTxnTest.java
index 010d69b..a85e76d 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/GetProposalFromTxnTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/GetProposalFromTxnTest.java
@@ -107,7 +107,7 @@ public void testGetProposalFromTxn() throws Exception {
while (itr.hasNext()) {
Proposal proposal = itr.next();
TxnLogEntry logEntry = SerializeUtils.deserializeTxn(
- proposal.packet.getData());
+ proposal.getQuorumPacket().getData());
TxnHeader hdr = logEntry.getHeader();
Record rec = logEntry.getTxn();
if (hdr.getType() == OpCode.create) {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/LocalSessionRequestTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/LocalSessionRequestTest.java
index 1c1c72e..16a470c 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/LocalSessionRequestTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/LocalSessionRequestTest.java
@@ -82,8 +82,8 @@ private void validateRequestLog(long sessionId, int peerId) {
QuorumPeer peer = qb.getPeerList().get(peerId);
ZKDatabase db = peer.getActiveServer().getZKDatabase();
for (Proposal p : db.getCommittedLog()) {
- assertFalse(p.request.sessionId == sessionId,
- "Should not see " + Request.op2String(p.request.type)
+ assertFalse(p.getRequest().sessionId == sessionId,
+ "Should not see " + Request.op2String(p.getRequest().type)
+ " request from local session 0x" + session + " on the " + peerType);
}
}