[ISSUE #293] reset pull call back by new pull request event time
[ISSUE #293] reset pull call back by new pull request event time
diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp
index 12fddd3..39b3ccd 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -790,7 +790,8 @@
m_PullCallback[msgQueue] = new AsyncPullCallback(this, request);
}
AsyncPullCallback* asyncPullCallback = m_PullCallback[msgQueue];
- if (asyncPullCallback && asyncPullCallback->getPullRequest().expired()) {
+ if (asyncPullCallback) {
+ // maybe the pull request has dropped before, replace event time.
asyncPullCallback->setPullRequest(pullRequest);
}
return asyncPullCallback;
diff --git a/src/consumer/PullRequest.cpp b/src/consumer/PullRequest.cpp
index e578840..9795880 100644
--- a/src/consumer/PullRequest.cpp
+++ b/src/consumer/PullRequest.cpp
@@ -208,7 +208,8 @@
uint64 interval = m_lastPullTimestamp + MAX_PULL_IDLE_TIME;
if (interval <= UtilAll::currentTimeMillis()) {
LOG_WARN("PullRequest for [%s] has been expired %lld ms,m_lastPullTimestamp = %lld ms",
- m_messageQueue.toString().c_str(), UtilAll::currentTimeMillis() - interval, m_lastPullTimestamp);
+ m_messageQueue.toString().c_str(), UtilAll::currentTimeMillis() - m_lastPullTimestamp,
+ m_lastPullTimestamp);
return true;
}
return false;
diff --git a/src/consumer/Rebalance.cpp b/src/consumer/Rebalance.cpp
index 6dbb35c..18c8b2f 100644
--- a/src/consumer/Rebalance.cpp
+++ b/src/consumer/Rebalance.cpp
@@ -522,6 +522,7 @@
int64 nextOffset = computePullFromWhere(*itAdd);
if (nextOffset >= 0) {
pullRequest->setNextOffset(nextOffset);
+ pullRequest->setDropped(false);
changed = true;
addPullRequest(*itAdd, pullRequest);
pullRequestsToAdd.push_back(pullRequest);