| // Copyright 2010 Google Inc. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include "net/instaweb/apache/serf_url_async_fetcher.h" |
| |
| #include <algorithm> |
| #include <string> |
| #include <vector> |
| #include "apr_atomic.h" |
| #include "apr_pools.h" |
| #include "apr_strings.h" |
| #include "apr_version.h" |
| #include "base/basictypes.h" |
| #include "base/scoped_ptr.h" |
| #include "base/stl_util-inl.h" |
| #include "net/instaweb/apache/apr_file_system.h" |
| #include "net/instaweb/apache/apr_mutex.h" |
| #include "net/instaweb/apache/apr_timer.h" |
| #include "net/instaweb/util/public/google_message_handler.h" |
| #include "net/instaweb/util/public/gzip_inflater.h" |
| #include "net/instaweb/util/public/mock_timer.h" |
| #include "net/instaweb/util/public/string_writer.h" |
| #include "net/instaweb/util/public/simple_meta_data.h" |
| #include "net/instaweb/util/public/simple_stats.h" |
| #include "testing/gtest/include/gtest/gtest.h" |
| #include "third_party/serf/src/serf.h" |
| |
| namespace net_instaweb { |
| |
| namespace { |
| const char kProxy[] = ""; |
| const int kMaxMs = 10000; |
| const int kThreadedPollMs = 1000; |
| const int kWaitTimeoutMs = 5 * 1000; |
| const int kTimerAdvanceMs = 10; |
| |
| class SerfTestCallback : public UrlAsyncFetcher::Callback { |
| public: |
| explicit SerfTestCallback(AprMutex* mutex, const std::string& url) |
| : done_(false), |
| mutex_(mutex), |
| url_(url), |
| enable_threaded_(false) { |
| } |
| virtual ~SerfTestCallback() {} |
| virtual void Done(bool success) { |
| ScopedMutex lock(mutex_); |
| done_ = true; |
| } |
| bool IsDone() const { |
| ScopedMutex lock(mutex_); |
| return done_; |
| } |
| virtual bool EnableThreaded() const { |
| return enable_threaded_; |
| } |
| void set_enable_threaded(bool b) { enable_threaded_ = b; } |
| private: |
| bool done_; |
| AprMutex* mutex_; |
| std::string url_; |
| bool enable_threaded_; |
| |
| DISALLOW_COPY_AND_ASSIGN(SerfTestCallback); |
| }; |
| |
| } // namespace |
| |
| class SerfUrlAsyncFetcherTest: public ::testing::Test { |
| public: |
| static void SetUpTestCase() { |
| apr_initialize(); |
| atexit(apr_terminate); |
| } |
| |
| protected: |
| SerfUrlAsyncFetcherTest() { } |
| |
| virtual void SetUp() { |
| apr_pool_create(&pool_, NULL); |
| timer_.reset(new MockTimer(MockTimer::kApr_5_2010_ms)); |
| SerfUrlAsyncFetcher::Initialize(&statistics_); |
| serf_url_async_fetcher_.reset( |
| new SerfUrlAsyncFetcher(kProxy, pool_, &statistics_, |
| timer_.get())); |
| mutex_ = new AprMutex(pool_); |
| AddTestUrl("http://www.google.com/", "<!doctype html>"); |
| AddTestUrl("http://www.google.com/favicon.ico", |
| std::string("\000\000\001\000", 4)); |
| AddTestUrl("http://www.google.com/intl/en_ALL/images/logo.gif", "GIF"); |
| prev_done_count = 0; |
| } |
| |
| virtual void TearDown() { |
| // Need to free the fetcher before destroy the pool. |
| serf_url_async_fetcher_.reset(NULL); |
| timer_.reset(NULL); |
| STLDeleteElements(&request_headers_); |
| STLDeleteElements(&response_headers_); |
| STLDeleteElements(&contents_); |
| STLDeleteElements(&writers_); |
| STLDeleteElements(&callbacks_); |
| apr_pool_destroy(pool_); |
| delete mutex_; |
| } |
| |
| void AddTestUrl(const std::string& url, |
| const std::string& content_start) { |
| urls_.push_back(url); |
| content_starts_.push_back(content_start); |
| request_headers_.push_back(new SimpleMetaData); |
| response_headers_.push_back(new SimpleMetaData); |
| contents_.push_back(new std::string); |
| writers_.push_back(new StringWriter(contents_.back())); |
| callbacks_.push_back(new SerfTestCallback(mutex_, url)); |
| } |
| |
| void StartFetches(size_t begin, size_t end, bool enable_threaded) { |
| for (size_t idx = begin; idx < end; ++idx) { |
| SerfTestCallback* callback = callbacks_[idx]; |
| callback->set_enable_threaded(enable_threaded); |
| serf_url_async_fetcher_->StreamingFetch( |
| urls_[idx], *request_headers_[idx], response_headers_[idx], |
| writers_[idx], &message_handler_, callback); |
| } |
| } |
| |
| int CountCompletedFetches(size_t begin, size_t end) { |
| int completed = 0; |
| for (size_t idx = begin; idx < end; ++idx) { |
| if (callbacks_[idx]->IsDone()) { |
| ++completed; |
| } |
| } |
| return completed; |
| } |
| |
| void ValidateFetches(size_t begin, size_t end) { |
| for (size_t idx = begin; idx < end; ++idx) { |
| ASSERT_TRUE(callbacks_[idx]->IsDone()); |
| EXPECT_LT(static_cast<size_t>(0), contents_[idx]->size()); |
| EXPECT_EQ(200, response_headers_[idx]->status_code()); |
| EXPECT_EQ(content_starts_[idx], |
| contents_[idx]->substr(0, content_starts_[idx].size())); |
| } |
| } |
| |
| bool WaitTillDone(size_t begin, size_t end, int64 delay_ms) { |
| AprTimer timer; |
| bool done = false; |
| int64 now_ms = timer.NowMs(); |
| int64 end_ms = now_ms + delay_ms; |
| while (!done && (now_ms < end_ms)) { |
| int64 remaining_ms = end_ms - now_ms; |
| serf_url_async_fetcher_->Poll(1000 * remaining_ms); |
| size_t done_count = 0; |
| for (size_t idx = begin; idx < end; ++idx) { |
| if (callbacks_[idx]->IsDone()) { |
| ++done_count; |
| } |
| } |
| if (done_count != prev_done_count) { |
| prev_done_count = done_count; |
| done = (done_count == (end - begin)); |
| } |
| now_ms = timer.NowMs(); |
| } |
| return done; |
| } |
| |
| bool TestFetch(size_t begin, size_t end) { |
| StartFetches(begin, end, false); |
| timer_->advance_ms(kTimerAdvanceMs); |
| bool done = WaitTillDone(begin, end, kMaxMs); |
| ValidateFetches(begin, end); |
| return done; |
| } |
| |
| apr_pool_t* pool_; |
| std::vector<std::string> urls_; |
| std::vector<std::string> content_starts_; |
| std::vector<SimpleMetaData*> request_headers_; |
| std::vector<SimpleMetaData*> response_headers_; |
| std::vector<std::string*> contents_; |
| std::vector<StringWriter*> writers_; |
| std::vector<SerfTestCallback*> callbacks_; |
| // The fetcher to be tested. |
| scoped_ptr<SerfUrlAsyncFetcher> serf_url_async_fetcher_; |
| scoped_ptr<MockTimer> timer_; |
| SimpleStats statistics_; |
| GoogleMessageHandler message_handler_; |
| size_t prev_done_count; |
| AprMutex* mutex_; |
| |
| private: |
| DISALLOW_COPY_AND_ASSIGN(SerfUrlAsyncFetcherTest); |
| }; |
| |
| TEST_F(SerfUrlAsyncFetcherTest, FetchOneURL) { |
| EXPECT_TRUE(TestFetch(0, 1)); |
| EXPECT_FALSE(response_headers_[0]->IsGzipped()); |
| int request_count = |
| statistics_.GetVariable(SerfStats::kSerfFetchRequestCount)->Get(); |
| EXPECT_EQ(1, request_count); |
| int bytes_count = |
| statistics_.GetVariable(SerfStats::kSerfFetchByteCount)->Get(); |
| // google.com is changing every time, check if we get a rough number. |
| EXPECT_LT(8000, bytes_count); |
| int time_duration = |
| statistics_.GetVariable(SerfStats::kSerfFetchTimeDurationMs)->Get(); |
| EXPECT_EQ(kTimerAdvanceMs, time_duration); |
| } |
| |
| TEST_F(SerfUrlAsyncFetcherTest, FetchOneURLGzipped) { |
| request_headers_[0]->Add(HttpAttributes::kAcceptEncoding, |
| HttpAttributes::kGzip); |
| |
| // www.google.com doesn't respect our 'gzip' encoding request unless |
| // we have a reasonable user agent. |
| const char kDefaultUserAgent[] = |
| "Mozilla/5.0 (X11; U; Linux x86_64; en-US) " |
| "AppleWebKit/534.0 (KHTML, like Gecko) Chrome/6.0.408.1 Safari/534.0"; |
| |
| request_headers_[0]->Add(HttpAttributes::kUserAgent, |
| kDefaultUserAgent); |
| StartFetches(0, 1, false); |
| ASSERT_TRUE(WaitTillDone(0, 1, kMaxMs)); |
| ASSERT_TRUE(callbacks_[0]->IsDone()); |
| EXPECT_LT(static_cast<size_t>(0), contents_[0]->size()); |
| EXPECT_EQ(200, response_headers_[0]->status_code()); |
| ASSERT_TRUE(response_headers_[0]->IsGzipped()); |
| |
| GzipInflater inflater(GzipInflater::kGzip); |
| ASSERT_TRUE(inflater.Init()); |
| ASSERT_TRUE(inflater.SetInput(contents_[0]->data(), contents_[0]->size())); |
| ASSERT_TRUE(inflater.HasUnconsumedInput()); |
| int size = content_starts_[0].size(); |
| scoped_array<char> buf(new char[size]); |
| ASSERT_EQ(size, inflater.InflateBytes(buf.get(), size)); |
| EXPECT_EQ(content_starts_[0], std::string(buf.get(), size)); |
| } |
| |
| TEST_F(SerfUrlAsyncFetcherTest, FetchTwoURLs) { |
| EXPECT_TRUE(TestFetch(1, 3)); |
| int request_count = |
| statistics_.GetVariable(SerfStats::kSerfFetchRequestCount)->Get(); |
| EXPECT_EQ(2, request_count); |
| int bytes_count = |
| statistics_.GetVariable(SerfStats::kSerfFetchByteCount)->Get(); |
| // Maybe also need a rough number here. We will break if google's icon or logo |
| // changes. |
| EXPECT_EQ(9708, bytes_count); |
| int time_duration = |
| statistics_.GetVariable(SerfStats::kSerfFetchTimeDurationMs)->Get(); |
| EXPECT_EQ(2 * kTimerAdvanceMs, time_duration); |
| } |
| |
| TEST_F(SerfUrlAsyncFetcherTest, TestCancelThreeThreaded) { |
| StartFetches(0, 3, true); |
| } |
| |
| TEST_F(SerfUrlAsyncFetcherTest, TestCancelOneThreadedTwoSync) { |
| StartFetches(0, 1, true); |
| StartFetches(1, 3, false); |
| } |
| |
| TEST_F(SerfUrlAsyncFetcherTest, TestCancelTwoThreadedOneSync) { |
| StartFetches(0, 1, false), |
| StartFetches(1, 3, true); |
| } |
| |
| TEST_F(SerfUrlAsyncFetcherTest, TestWaitThreeThreaded) { |
| StartFetches(0, 3, true); |
| serf_url_async_fetcher_->WaitForInProgressFetches( |
| kWaitTimeoutMs, &message_handler_, |
| SerfUrlAsyncFetcher::kThreadedOnly); |
| } |
| |
| TEST_F(SerfUrlAsyncFetcherTest, TestThreeThreadedAsync) { |
| StartFetches(0, 1, true); |
| serf_url_async_fetcher_->WaitForInProgressFetches( |
| 10 /* milliseconds */, &message_handler_, |
| SerfUrlAsyncFetcher::kThreadedOnly); |
| StartFetches(1, 3, true); |
| |
| // In this test case, we are not going to call the explicit threaded |
| // wait function, WaitForInProgressFetches. We have initiated async |
| // fetches and we are hoping they will complete within a certain amount |
| // of time. If the system is running well then we they will finish |
| // within a 100ms or so, so we'll loop in 50ms sleep intervals until |
| // we hit a max. We'll give it 5 seconds before declaring failure. |
| const int kMaxSeconds = 5; |
| const int kPollTimeUs = 50000; |
| const int kPollsPerSecond = 1000000 / kPollTimeUs; |
| const int kMaxIters = kMaxSeconds * kPollsPerSecond; |
| int completed = 0; |
| for (int i = 0; (completed < 3) && (i < kMaxIters); ++i) { |
| usleep(kPollTimeUs); |
| completed = CountCompletedFetches(0, 3); |
| } |
| ASSERT_EQ(3, completed) << "Async fetches times out before completing"; |
| EXPECT_TRUE(TestFetch(0, 3)); |
| } |
| |
| TEST_F(SerfUrlAsyncFetcherTest, TestWaitOneThreadedTwoSync) { |
| StartFetches(0, 1, true); |
| StartFetches(1, 3, false); |
| serf_url_async_fetcher_->WaitForInProgressFetches( |
| kWaitTimeoutMs, &message_handler_, |
| SerfUrlAsyncFetcher::kThreadedAndMainline); |
| } |
| |
| TEST_F(SerfUrlAsyncFetcherTest, TestWaitTwoThreadedOneSync) { |
| StartFetches(0, 1, false), |
| StartFetches(1, 3, true); |
| serf_url_async_fetcher_->WaitForInProgressFetches( |
| kWaitTimeoutMs, &message_handler_, |
| SerfUrlAsyncFetcher::kThreadedAndMainline); |
| } |
| |
| TEST_F(SerfUrlAsyncFetcherTest, TestThreeThreaded) { |
| StartFetches(0, 3, true); |
| bool done = false; |
| for (int i = 0; !done && (i < 100); ++i) { |
| done = WaitTillDone(0, 3, kThreadedPollMs); |
| } |
| EXPECT_TRUE(done); |
| ValidateFetches(0, 3); |
| } |
| |
| TEST_F(SerfUrlAsyncFetcherTest, TestOneThreadedTwoSync) { |
| StartFetches(0, 1, true); |
| StartFetches(1, 3, false); |
| bool done = false; |
| for (int i = 0; !done && (i < 100); ++i) { |
| done = WaitTillDone(0, 3, kThreadedPollMs); |
| } |
| EXPECT_TRUE(done); |
| ValidateFetches(0, 3); |
| } |
| |
| TEST_F(SerfUrlAsyncFetcherTest, TestTwoThreadedOneSync) { |
| StartFetches(0, 1, false), |
| StartFetches(1, 3, true); |
| bool done = false; |
| for (int i = 0; !done && (i < 100); ++i) { |
| done = WaitTillDone(0, 3, kThreadedPollMs); |
| } |
| EXPECT_TRUE(done); |
| ValidateFetches(0, 3); |
| } |
| |
| } // namespace net_instaweb |