blob: 10cd842ed79cc85e853df72dc6a726d1a2b8878d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// brpc - A framework to host and access services throughout Baidu.
// Date: Fri May 20 15:52:22 CST 2016
#include <sys/ioctl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <gtest/gtest.h>
#include <gflags/gflags.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
#include "butil/time.h"
#include "butil/macros.h"
#include "brpc/socket.h"
#include "brpc/acceptor.h"
#include "brpc/server.h"
#include "brpc/controller.h"
#include "brpc/rtmp.h"
#include "brpc/amf.h"
int main(int argc, char* argv[]) {
testing::InitGoogleTest(&argc, argv);
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
return RUN_ALL_TESTS();
}
class TestRtmpClientStream : public brpc::RtmpClientStream {
public:
TestRtmpClientStream()
: _called_on_stop(0)
, _called_on_first_message(0)
, _nvideomsg(0)
, _naudiomsg(0) {
LOG(INFO) << __FUNCTION__;
}
~TestRtmpClientStream() {
LOG(INFO) << __FUNCTION__;
assertions_on_stop();
}
void assertions_on_stop() {
ASSERT_EQ(1, _called_on_stop);
}
void assertions_on_successful_play() {
ASSERT_EQ(1, _called_on_first_message);
ASSERT_LT(0, _nvideomsg);
ASSERT_LT(0, _naudiomsg);
}
void assertions_on_failure() {
ASSERT_EQ(0, _called_on_first_message);
ASSERT_EQ(0, _nvideomsg);
ASSERT_EQ(0, _naudiomsg);
assertions_on_stop();
}
void OnFirstMessage() {
++_called_on_first_message;
}
void OnStop() {
++_called_on_stop;
}
void OnVideoMessage(brpc::RtmpVideoMessage* msg) {
++_nvideomsg;
// video data is ascii in UT, print it out.
LOG(INFO) << remote_side() << "|stream=" << stream_id()
<< ": Got " << *msg << " data=" << msg->data;
}
void OnAudioMessage(brpc::RtmpAudioMessage* msg) {
++_naudiomsg;
// audio data is ascii in UT, print it out.
LOG(INFO) << remote_side() << "|stream=" << stream_id()
<< ": Got " << *msg << " data=" << msg->data;
}
private:
int _called_on_stop;
int _called_on_first_message;
int _nvideomsg;
int _naudiomsg;
};
class TestRtmpRetryingClientStream
: public brpc::RtmpRetryingClientStream {
public:
TestRtmpRetryingClientStream()
: _called_on_stop(0)
, _called_on_first_message(0)
, _called_on_playable(0) {
LOG(INFO) << __FUNCTION__;
}
~TestRtmpRetryingClientStream() {
LOG(INFO) << __FUNCTION__;
assertions_on_stop();
}
void assertions_on_stop() {
ASSERT_EQ(1, _called_on_stop);
}
void OnStop() {
++_called_on_stop;
}
void OnFirstMessage() {
++_called_on_first_message;
}
void OnPlayable() {
++_called_on_playable;
}
void OnVideoMessage(brpc::RtmpVideoMessage* msg) {
// video data is ascii in UT, print it out.
LOG(INFO) << remote_side() << "|stream=" << stream_id()
<< ": Got " << *msg << " data=" << msg->data;
}
void OnAudioMessage(brpc::RtmpAudioMessage* msg) {
// audio data is ascii in UT, print it out.
LOG(INFO) << remote_side() << "|stream=" << stream_id()
<< ": Got " << *msg << " data=" << msg->data;
}
private:
int _called_on_stop;
int _called_on_first_message;
int _called_on_playable;
};
const char* UNEXIST_NAME = "unexist_stream";
class PlayingDummyStream : public brpc::RtmpServerStream {
public:
enum State {
STATE_UNPLAYING,
STATE_PLAYING,
STATE_STOPPED
};
PlayingDummyStream(int64_t sleep_ms)
: _state(STATE_UNPLAYING), _sleep_ms(sleep_ms) {
LOG(INFO) << __FUNCTION__ << "(" << this << ")";
}
~PlayingDummyStream() {
LOG(INFO) << __FUNCTION__ << "(" << this << ")";
}
void OnPlay(const brpc::RtmpPlayOptions& opt,
butil::Status* status,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
LOG(INFO) << remote_side() << "|stream=" << stream_id()
<< ": Got play{stream_name=" << opt.stream_name
<< " start=" << opt.start
<< " duration=" << opt.duration
<< " reset=" << opt.reset << '}';
if (opt.stream_name == UNEXIST_NAME) {
status->set_error(EPERM, "Unexist stream");
return;
}
if (_sleep_ms > 0) {
LOG(INFO) << "Sleep " << _sleep_ms
<< " ms before responding play request";
bthread_usleep(_sleep_ms * 1000L);
}
int rc = bthread_start_background(&_play_thread, NULL,
RunSendData, this);
if (rc) {
status->set_error(rc, "Fail to create thread");
return;
}
State expected = STATE_UNPLAYING;
if (!_state.compare_exchange_strong(expected, STATE_PLAYING)) {
if (expected == STATE_STOPPED) {
bthread_stop(_play_thread);
bthread_join(_play_thread, NULL);
} else {
CHECK(false) << "Impossible";
}
}
}
void OnStop() {
LOG(INFO) << "OnStop of PlayingDummyStream=" << this;
if (_state.exchange(STATE_STOPPED) == STATE_PLAYING) {
bthread_stop(_play_thread);
bthread_join(_play_thread, NULL);
}
}
void SendData();
private:
static void* RunSendData(void* arg) {
((PlayingDummyStream*)arg)->SendData();
return NULL;
}
butil::atomic<State> _state;
bthread_t _play_thread;
int64_t _sleep_ms;
};
void PlayingDummyStream::SendData() {
LOG(INFO) << "Enter SendData of PlayingDummyStream=" << this;
brpc::RtmpVideoMessage vmsg;
brpc::RtmpAudioMessage amsg;
vmsg.timestamp = 1000;
amsg.timestamp = 1000;
for (int i = 0; !bthread_stopped(bthread_self()); ++i) {
vmsg.timestamp += 20;
amsg.timestamp += 20;
vmsg.frame_type = brpc::FLV_VIDEO_FRAME_KEYFRAME;
vmsg.codec = brpc::FLV_VIDEO_AVC;
vmsg.data.clear();
vmsg.data.append(butil::string_printf("video_%d(ms_id=%u)",
i, stream_id()));
//failing to send is possible
SendVideoMessage(vmsg);
amsg.codec = brpc::FLV_AUDIO_AAC;
amsg.rate = brpc::FLV_SOUND_RATE_44100HZ;
amsg.bits = brpc::FLV_SOUND_16BIT;
amsg.type = brpc::FLV_SOUND_STEREO;
amsg.data.clear();
amsg.data.append(butil::string_printf("audio_%d(ms_id=%u)",
i, stream_id()));
SendAudioMessage(amsg);
bthread_usleep(1000000);
}
LOG(INFO) << "Quit SendData of PlayingDummyStream=" << this;
}
class PlayingDummyService : public brpc::RtmpService {
public:
PlayingDummyService(int64_t sleep_ms = 0) : _sleep_ms(sleep_ms) {}
private:
// Called to create a server-side stream.
virtual brpc::RtmpServerStream* NewStream(
const brpc::RtmpConnectRequest&) {
return new PlayingDummyStream(_sleep_ms);
}
int64_t _sleep_ms;
};
class PublishStream : public brpc::RtmpServerStream {
public:
PublishStream(int64_t sleep_ms)
: _sleep_ms(sleep_ms)
, _called_on_stop(0)
, _called_on_first_message(0)
, _nvideomsg(0)
, _naudiomsg(0) {
LOG(INFO) << __FUNCTION__ << "(" << this << ")";
}
~PublishStream() {
LOG(INFO) << __FUNCTION__ << "(" << this << ")";
assertions_on_stop();
}
void assertions_on_stop() {
ASSERT_EQ(1, _called_on_stop);
}
void OnPublish(const std::string& stream_name,
brpc::RtmpPublishType publish_type,
butil::Status* status,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
LOG(INFO) << remote_side() << "|stream=" << stream_id()
<< ": Got publish{stream_name=" << stream_name
<< " type=" << brpc::RtmpPublishType2Str(publish_type)
<< '}';
if (stream_name == UNEXIST_NAME) {
status->set_error(EPERM, "Unexist stream");
return;
}
if (_sleep_ms > 0) {
LOG(INFO) << "Sleep " << _sleep_ms
<< " ms before responding play request";
bthread_usleep(_sleep_ms * 1000L);
}
}
void OnFirstMessage() {
++_called_on_first_message;
}
void OnStop() {
LOG(INFO) << "OnStop of PublishStream=" << this;
++_called_on_stop;
}
void OnVideoMessage(brpc::RtmpVideoMessage* msg) {
++_nvideomsg;
// video data is ascii in UT, print it out.
LOG(INFO) << remote_side() << "|stream=" << stream_id()
<< ": Got " << *msg << " data=" << msg->data;
}
void OnAudioMessage(brpc::RtmpAudioMessage* msg) {
++_naudiomsg;
// audio data is ascii in UT, print it out.
LOG(INFO) << remote_side() << "|stream=" << stream_id()
<< ": Got " << *msg << " data=" << msg->data;
}
private:
int64_t _sleep_ms;
int _called_on_stop;
int _called_on_first_message;
int _nvideomsg;
int _naudiomsg;
};
class PublishService : public brpc::RtmpService {
public:
PublishService(int64_t sleep_ms = 0) : _sleep_ms(sleep_ms) {
pthread_mutex_init(&_mutex, NULL);
}
~PublishService() {
pthread_mutex_destroy(&_mutex);
}
void move_created_streams(
std::vector<butil::intrusive_ptr<PublishStream> >* out) {
out->clear();
BAIDU_SCOPED_LOCK(_mutex);
out->swap(_created_streams);
}
private:
// Called to create a server-side stream.
virtual brpc::RtmpServerStream* NewStream(
const brpc::RtmpConnectRequest&) {
PublishStream* stream = new PublishStream(_sleep_ms);
{
BAIDU_SCOPED_LOCK(_mutex);
_created_streams.push_back(stream);
}
return stream;
}
int64_t _sleep_ms;
pthread_mutex_t _mutex;
std::vector<butil::intrusive_ptr<PublishStream> > _created_streams;
};
class RtmpSubStream : public brpc::RtmpClientStream {
public:
explicit RtmpSubStream(brpc::RtmpMessageHandler* mh)
: _message_handler(mh) {}
// @RtmpStreamBase
void OnMetaData(brpc::RtmpMetaData*, const butil::StringPiece&);
void OnSharedObjectMessage(brpc::RtmpSharedObjectMessage* msg);
void OnAudioMessage(brpc::RtmpAudioMessage* msg);
void OnVideoMessage(brpc::RtmpVideoMessage* msg);
void OnFirstMessage();
void OnStop();
private:
std::unique_ptr<brpc::RtmpMessageHandler> _message_handler;
};
void RtmpSubStream::OnFirstMessage() {
_message_handler->OnPlayable();
}
void RtmpSubStream::OnMetaData(brpc::RtmpMetaData* obj, const butil::StringPiece& name) {
_message_handler->OnMetaData(obj, name);
}
void RtmpSubStream::OnSharedObjectMessage(brpc::RtmpSharedObjectMessage* msg) {
_message_handler->OnSharedObjectMessage(msg);
}
void RtmpSubStream::OnAudioMessage(brpc::RtmpAudioMessage* msg) {
_message_handler->OnAudioMessage(msg);
}
void RtmpSubStream::OnVideoMessage(brpc::RtmpVideoMessage* msg) {
_message_handler->OnVideoMessage(msg);
}
void RtmpSubStream::OnStop() {
_message_handler->OnSubStreamStop(this);
}
class RtmpSubStreamCreator : public brpc::SubStreamCreator {
public:
RtmpSubStreamCreator(const brpc::RtmpClient* client);
~RtmpSubStreamCreator();
// @SubStreamCreator
void NewSubStream(brpc::RtmpMessageHandler* message_handler,
butil::intrusive_ptr<brpc::RtmpStreamBase>* sub_stream);
void LaunchSubStream(brpc::RtmpStreamBase* sub_stream,
brpc::RtmpRetryingClientStreamOptions* options);
private:
const brpc::RtmpClient* _client;
};
RtmpSubStreamCreator::RtmpSubStreamCreator(const brpc::RtmpClient* client)
: _client(client) {}
RtmpSubStreamCreator::~RtmpSubStreamCreator() {}
void RtmpSubStreamCreator::NewSubStream(brpc::RtmpMessageHandler* message_handler,
butil::intrusive_ptr<brpc::RtmpStreamBase>* sub_stream) {
if (sub_stream) {
(*sub_stream).reset(new RtmpSubStream(message_handler));
}
return;
}
void RtmpSubStreamCreator::LaunchSubStream(
brpc::RtmpStreamBase* sub_stream,
brpc::RtmpRetryingClientStreamOptions* options) {
brpc::RtmpClientStreamOptions client_options = *options;
dynamic_cast<RtmpSubStream*>(sub_stream)->Init(_client, client_options);
}
TEST(RtmpTest, parse_rtmp_url) {
butil::StringPiece host;
butil::StringPiece vhost;
butil::StringPiece port;
butil::StringPiece app;
butil::StringPiece stream_name;
brpc::ParseRtmpURL("rtmp://HOST/APP/STREAM",
&host, &vhost, &port, &app, &stream_name);
ASSERT_EQ("HOST", host);
ASSERT_TRUE(vhost.empty());
ASSERT_EQ("1935", port);
ASSERT_EQ("APP", app);
ASSERT_EQ("STREAM", stream_name);
brpc::ParseRtmpURL("HOST/APP/STREAM",
&host, &vhost, &port, &app, &stream_name);
ASSERT_EQ("HOST", host);
ASSERT_TRUE(vhost.empty());
ASSERT_EQ("1935", port);
ASSERT_EQ("APP", app);
ASSERT_EQ("STREAM", stream_name);
brpc::ParseRtmpURL("rtmp://HOST:8765//APP?vhost=abc///STREAM?queries",
&host, &vhost, &port, &app, &stream_name);
ASSERT_EQ("HOST", host);
ASSERT_EQ("abc", vhost);
ASSERT_EQ("8765", port);
ASSERT_EQ("APP", app);
ASSERT_EQ("STREAM?queries", stream_name);
brpc::ParseRtmpURL("HOST:8765//APP?vhost=abc///STREAM?queries",
&host, &vhost, &port, &app, &stream_name);
ASSERT_EQ("HOST", host);
ASSERT_EQ("abc", vhost);
ASSERT_EQ("8765", port);
ASSERT_EQ("APP", app);
ASSERT_EQ("STREAM?queries", stream_name);
brpc::ParseRtmpURL("HOST:8765//APP?vhost=abc///STREAM?queries/",
&host, &vhost, &port, &app, &stream_name);
ASSERT_EQ("HOST", host);
ASSERT_EQ("abc", vhost);
ASSERT_EQ("8765", port);
ASSERT_EQ("APP", app);
ASSERT_EQ("STREAM?queries/", stream_name);
brpc::ParseRtmpURL("HOST:8765/APP?vhost=abc",
&host, &vhost, &port, &app, &stream_name);
ASSERT_EQ("HOST", host);
ASSERT_EQ("abc", vhost);
ASSERT_EQ("8765", port);
ASSERT_EQ("APP", app);
ASSERT_TRUE(stream_name.empty());
}
TEST(RtmpTest, amf) {
std::string req_buf;
brpc::RtmpInfo info;
brpc::AMFObject obj;
std::string dummy = "_result";
{
google::protobuf::io::StringOutputStream zc_stream(&req_buf);
brpc::AMFOutputStream ostream(&zc_stream);
brpc::WriteAMFString(dummy, &ostream);
brpc::WriteAMFUint32(17, &ostream);
info.set_code("NetConnection.Connect"); // TODO
info.set_level("error");
info.set_description("heheda hello foobar");
brpc::WriteAMFObject(info, &ostream);
ASSERT_TRUE(ostream.good());
obj.SetString("code", "foo");
obj.SetString("level", "bar");
obj.SetString("description", "heheda");
brpc::WriteAMFObject(obj, &ostream);
ASSERT_TRUE(ostream.good());
}
google::protobuf::io::ArrayInputStream zc_stream(req_buf.data(), req_buf.size());
brpc::AMFInputStream istream(&zc_stream);
std::string result;
ASSERT_TRUE(brpc::ReadAMFString(&result, &istream));
ASSERT_EQ(dummy, result);
uint32_t num = 0;
ASSERT_TRUE(brpc::ReadAMFUint32(&num, &istream));
ASSERT_EQ(17u, num);
brpc::RtmpInfo info2;
ASSERT_TRUE(brpc::ReadAMFObject(&info2, &istream));
ASSERT_EQ(info.code(), info2.code());
ASSERT_EQ(info.level(), info2.level());
ASSERT_EQ(info.description(), info2.description());
brpc::RtmpInfo info3;
ASSERT_TRUE(brpc::ReadAMFObject(&info3, &istream));
ASSERT_EQ("foo", info3.code());
ASSERT_EQ("bar", info3.level());
ASSERT_EQ("heheda", info3.description());
}
TEST(RtmpTest, successfully_play_streams) {
PlayingDummyService rtmp_service;
brpc::Server server;
brpc::ServerOptions server_opt;
server_opt.rtmp_service = &rtmp_service;
ASSERT_EQ(0, server.Start(8571, &server_opt));
brpc::RtmpClientOptions rtmp_opt;
rtmp_opt.app = "hello";
rtmp_opt.swfUrl = "anything";
rtmp_opt.tcUrl = "rtmp://heheda";
brpc::RtmpClient rtmp_client;
ASSERT_EQ(0, rtmp_client.Init("localhost:8571", rtmp_opt));
// Create multiple streams.
const int NSTREAM = 2;
brpc::DestroyingPtr<TestRtmpClientStream> cstreams[NSTREAM];
for (int i = 0; i < NSTREAM; ++i) {
cstreams[i].reset(new TestRtmpClientStream);
brpc::RtmpClientStreamOptions opt;
opt.play_name = butil::string_printf("play_name_%d", i);
//opt.publish_name = butil::string_printf("pub_name_%d", i);
opt.wait_until_play_or_publish_is_sent = true;
cstreams[i]->Init(&rtmp_client, opt);
}
sleep(5);
for (int i = 0; i < NSTREAM; ++i) {
cstreams[i]->assertions_on_successful_play();
}
LOG(INFO) << "Quiting program...";
}
TEST(RtmpTest, fail_to_play_streams) {
PlayingDummyService rtmp_service;
brpc::Server server;
brpc::ServerOptions server_opt;
server_opt.rtmp_service = &rtmp_service;
ASSERT_EQ(0, server.Start(8571, &server_opt));
brpc::RtmpClientOptions rtmp_opt;
rtmp_opt.app = "hello";
rtmp_opt.swfUrl = "anything";
rtmp_opt.tcUrl = "rtmp://heheda";
brpc::RtmpClient rtmp_client;
ASSERT_EQ(0, rtmp_client.Init("localhost:8571", rtmp_opt));
// Create multiple streams.
const int NSTREAM = 2;
brpc::DestroyingPtr<TestRtmpClientStream> cstreams[NSTREAM];
for (int i = 0; i < NSTREAM; ++i) {
cstreams[i].reset(new TestRtmpClientStream);
brpc::RtmpClientStreamOptions opt;
opt.play_name = UNEXIST_NAME;
opt.wait_until_play_or_publish_is_sent = true;
cstreams[i]->Init(&rtmp_client, opt);
}
sleep(1);
for (int i = 0; i < NSTREAM; ++i) {
cstreams[i]->assertions_on_failure();
}
LOG(INFO) << "Quiting program...";
}
TEST(RtmpTest, successfully_publish_streams) {
PublishService rtmp_service;
brpc::Server server;
brpc::ServerOptions server_opt;
server_opt.rtmp_service = &rtmp_service;
ASSERT_EQ(0, server.Start(8571, &server_opt));
brpc::RtmpClientOptions rtmp_opt;
rtmp_opt.app = "hello";
rtmp_opt.swfUrl = "anything";
rtmp_opt.tcUrl = "rtmp://heheda";
brpc::RtmpClient rtmp_client;
ASSERT_EQ(0, rtmp_client.Init("localhost:8571", rtmp_opt));
// Create multiple streams.
const int NSTREAM = 2;
brpc::DestroyingPtr<TestRtmpClientStream> cstreams[NSTREAM];
for (int i = 0; i < NSTREAM; ++i) {
cstreams[i].reset(new TestRtmpClientStream);
brpc::RtmpClientStreamOptions opt;
opt.publish_name = butil::string_printf("pub_name_%d", i);
opt.wait_until_play_or_publish_is_sent = true;
cstreams[i]->Init(&rtmp_client, opt);
}
const int REP = 5;
for (int i = 0; i < REP; ++i) {
brpc::RtmpVideoMessage vmsg;
vmsg.timestamp = 1000 + i * 20;
vmsg.frame_type = brpc::FLV_VIDEO_FRAME_KEYFRAME;
vmsg.codec = brpc::FLV_VIDEO_AVC;
vmsg.data.append(butil::string_printf("video_%d", i));
for (int j = 0; j < NSTREAM; j += 2) {
ASSERT_EQ(0, cstreams[j]->SendVideoMessage(vmsg));
}
brpc::RtmpAudioMessage amsg;
amsg.timestamp = 1000 + i * 20;
amsg.codec = brpc::FLV_AUDIO_AAC;
amsg.rate = brpc::FLV_SOUND_RATE_44100HZ;
amsg.bits = brpc::FLV_SOUND_16BIT;
amsg.type = brpc::FLV_SOUND_STEREO;
amsg.data.append(butil::string_printf("audio_%d", i));
for (int j = 1; j < NSTREAM; j += 2) {
ASSERT_EQ(0, cstreams[j]->SendAudioMessage(amsg));
}
bthread_usleep(500000);
}
std::vector<butil::intrusive_ptr<PublishStream> > created_streams;
rtmp_service.move_created_streams(&created_streams);
ASSERT_EQ(NSTREAM, (int)created_streams.size());
for (int i = 0; i < NSTREAM; ++i) {
EXPECT_EQ(1, created_streams[i]->_called_on_first_message);
}
for (int j = 0; j < NSTREAM; j += 2) {
ASSERT_EQ(REP, created_streams[j]->_nvideomsg);
}
for (int j = 1; j < NSTREAM; j += 2) {
ASSERT_EQ(REP, created_streams[j]->_naudiomsg);
}
LOG(INFO) << "Quiting program...";
}
TEST(RtmpTest, failed_to_publish_streams) {
PublishService rtmp_service;
brpc::Server server;
brpc::ServerOptions server_opt;
server_opt.rtmp_service = &rtmp_service;
ASSERT_EQ(0, server.Start(8575, &server_opt));
brpc::RtmpClientOptions rtmp_opt;
rtmp_opt.app = "hello";
rtmp_opt.swfUrl = "anything";
rtmp_opt.tcUrl = "rtmp://heheda";
brpc::RtmpClient rtmp_client;
ASSERT_EQ(0, rtmp_client.Init("localhost:8575", rtmp_opt));
// Create multiple streams.
const int NSTREAM = 2;
brpc::DestroyingPtr<TestRtmpClientStream> cstreams[NSTREAM];
for (int i = 0; i < NSTREAM; ++i) {
cstreams[i].reset(new TestRtmpClientStream);
brpc::RtmpClientStreamOptions opt;
opt.publish_name = UNEXIST_NAME;
opt.wait_until_play_or_publish_is_sent = true;
cstreams[i]->Init(&rtmp_client, opt);
}
sleep(1);
for (int i = 0; i < NSTREAM; ++i) {
cstreams[i]->assertions_on_failure();
}
std::vector<butil::intrusive_ptr<PublishStream> > created_streams;
rtmp_service.move_created_streams(&created_streams);
ASSERT_EQ(NSTREAM, (int)created_streams.size());
for (int i = 0; i < NSTREAM; ++i) {
ASSERT_EQ(0, created_streams[i]->_called_on_first_message);
ASSERT_EQ(0, created_streams[i]->_nvideomsg);
ASSERT_EQ(0, created_streams[i]->_naudiomsg);
}
LOG(INFO) << "Quiting program...";
}
TEST(RtmpTest, failed_to_connect_client_streams) {
brpc::RtmpClientOptions rtmp_opt;
rtmp_opt.app = "hello";
rtmp_opt.swfUrl = "anything";
rtmp_opt.tcUrl = "rtmp://heheda";
brpc::RtmpClient rtmp_client;
ASSERT_EQ(0, rtmp_client.Init("localhost:8572", rtmp_opt));
// Create multiple streams.
const int NSTREAM = 2;
brpc::DestroyingPtr<TestRtmpClientStream> cstreams[NSTREAM];
for (int i = 0; i < NSTREAM; ++i) {
cstreams[i].reset(new TestRtmpClientStream);
brpc::RtmpClientStreamOptions opt;
opt.play_name = butil::string_printf("play_name_%d", i);
opt.wait_until_play_or_publish_is_sent = true;
cstreams[i]->Init(&rtmp_client, opt);
cstreams[i]->assertions_on_failure();
}
LOG(INFO) << "Quiting program...";
}
TEST(RtmpTest, destroy_client_streams_before_init) {
brpc::RtmpClientOptions rtmp_opt;
rtmp_opt.app = "hello";
rtmp_opt.swfUrl = "anything";
rtmp_opt.tcUrl = "rtmp://heheda";
brpc::RtmpClient rtmp_client;
ASSERT_EQ(0, rtmp_client.Init("localhost:8573", rtmp_opt));
// Create multiple streams.
const int NSTREAM = 2;
butil::intrusive_ptr<TestRtmpClientStream> cstreams[NSTREAM];
for (int i = 0; i < NSTREAM; ++i) {
cstreams[i].reset(new TestRtmpClientStream);
cstreams[i]->Destroy();
ASSERT_EQ(1, cstreams[i]->_called_on_stop);
ASSERT_EQ(brpc::RtmpClientStream::STATE_DESTROYING, cstreams[i]->_state);
brpc::RtmpClientStreamOptions opt;
opt.play_name = butil::string_printf("play_name_%d", i);
opt.wait_until_play_or_publish_is_sent = true;
cstreams[i]->Init(&rtmp_client, opt);
cstreams[i]->assertions_on_failure();
}
LOG(INFO) << "Quiting program...";
}
TEST(RtmpTest, destroy_retrying_client_streams_before_init) {
brpc::RtmpClientOptions rtmp_opt;
rtmp_opt.app = "hello";
rtmp_opt.swfUrl = "anything";
rtmp_opt.tcUrl = "rtmp://heheda";
brpc::RtmpClient rtmp_client;
ASSERT_EQ(0, rtmp_client.Init("localhost:8573", rtmp_opt));
// Create multiple streams.
const int NSTREAM = 2;
butil::intrusive_ptr<TestRtmpRetryingClientStream> cstreams[NSTREAM];
for (int i = 0; i < NSTREAM; ++i) {
cstreams[i].reset(new TestRtmpRetryingClientStream);
cstreams[i]->Destroy();
ASSERT_EQ(1, cstreams[i]->_called_on_stop);
brpc::RtmpRetryingClientStreamOptions opt;
opt.play_name = butil::string_printf("play_name_%d", i);
brpc::SubStreamCreator* sc = new RtmpSubStreamCreator(&rtmp_client);
cstreams[i]->Init(sc, opt);
ASSERT_EQ(1, cstreams[i]->_called_on_stop);
}
LOG(INFO) << "Quiting program...";
}
TEST(RtmpTest, destroy_client_streams_during_creation) {
PlayingDummyService rtmp_service(2000/*sleep 2s*/);
brpc::Server server;
brpc::ServerOptions server_opt;
server_opt.rtmp_service = &rtmp_service;
ASSERT_EQ(0, server.Start(8574, &server_opt));
brpc::RtmpClientOptions rtmp_opt;
rtmp_opt.app = "hello";
rtmp_opt.swfUrl = "anything";
rtmp_opt.tcUrl = "rtmp://heheda";
brpc::RtmpClient rtmp_client;
ASSERT_EQ(0, rtmp_client.Init("localhost:8574", rtmp_opt));
// Create multiple streams.
const int NSTREAM = 2;
butil::intrusive_ptr<TestRtmpClientStream> cstreams[NSTREAM];
for (int i = 0; i < NSTREAM; ++i) {
cstreams[i].reset(new TestRtmpClientStream);
brpc::RtmpClientStreamOptions opt;
opt.play_name = butil::string_printf("play_name_%d", i);
cstreams[i]->Init(&rtmp_client, opt);
ASSERT_EQ(0, cstreams[i]->_called_on_stop);
usleep(500*1000);
ASSERT_EQ(0, cstreams[i]->_called_on_stop);
cstreams[i]->Destroy();
usleep(10*1000);
ASSERT_EQ(1, cstreams[i]->_called_on_stop);
}
LOG(INFO) << "Quiting program...";
}
TEST(RtmpTest, destroy_retrying_client_streams_during_creation) {
PlayingDummyService rtmp_service(2000/*sleep 2s*/);
brpc::Server server;
brpc::ServerOptions server_opt;
server_opt.rtmp_service = &rtmp_service;
ASSERT_EQ(0, server.Start(8574, &server_opt));
brpc::RtmpClientOptions rtmp_opt;
rtmp_opt.app = "hello";
rtmp_opt.swfUrl = "anything";
rtmp_opt.tcUrl = "rtmp://heheda";
brpc::RtmpClient rtmp_client;
ASSERT_EQ(0, rtmp_client.Init("localhost:8574", rtmp_opt));
// Create multiple streams.
const int NSTREAM = 2;
butil::intrusive_ptr<TestRtmpRetryingClientStream> cstreams[NSTREAM];
for (int i = 0; i < NSTREAM; ++i) {
cstreams[i].reset(new TestRtmpRetryingClientStream);
brpc::RtmpRetryingClientStreamOptions opt;
opt.play_name = butil::string_printf("play_name_%d", i);
brpc::SubStreamCreator* sc = new RtmpSubStreamCreator(&rtmp_client);
cstreams[i]->Init(sc, opt);
ASSERT_EQ(0, cstreams[i]->_called_on_stop);
usleep(500*1000);
ASSERT_EQ(0, cstreams[i]->_called_on_stop);
cstreams[i]->Destroy();
usleep(10*1000);
ASSERT_EQ(1, cstreams[i]->_called_on_stop);
}
LOG(INFO) << "Quiting program...";
}
TEST(RtmpTest, retrying_stream) {
PlayingDummyService rtmp_service;
brpc::Server server;
brpc::ServerOptions server_opt;
server_opt.rtmp_service = &rtmp_service;
ASSERT_EQ(0, server.Start(8576, &server_opt));
brpc::RtmpClientOptions rtmp_opt;
rtmp_opt.app = "hello";
rtmp_opt.swfUrl = "anything";
rtmp_opt.tcUrl = "rtmp://heheda";
brpc::RtmpClient rtmp_client;
ASSERT_EQ(0, rtmp_client.Init("localhost:8576", rtmp_opt));
// Create multiple streams.
const int NSTREAM = 2;
brpc::DestroyingPtr<TestRtmpRetryingClientStream> cstreams[NSTREAM];
for (int i = 0; i < NSTREAM; ++i) {
cstreams[i].reset(new TestRtmpRetryingClientStream);
brpc::Controller cntl;
brpc::RtmpRetryingClientStreamOptions opt;
opt.play_name = butil::string_printf("name_%d", i);
brpc::SubStreamCreator* sc = new RtmpSubStreamCreator(&rtmp_client);
cstreams[i]->Init(sc, opt);
}
sleep(3);
LOG(INFO) << "Stopping server";
server.Stop(0);
server.Join();
LOG(INFO) << "Stopped server and sleep for a while";
sleep(3);
ASSERT_EQ(0, server.Start(8576, &server_opt));
sleep(3);
for (int i = 0; i < NSTREAM; ++i) {
ASSERT_EQ(1, cstreams[i]->_called_on_first_message);
ASSERT_EQ(2, cstreams[i]->_called_on_playable);
}
LOG(INFO) << "Quiting program...";
}