blob: b3025c390a87f40eb432ea66e2ba1fb4c18bac51 [file] [log] [blame]
//go:build extensible_load_manager
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pulsar
import (
"context"
"fmt"
"net/http"
"sync"
"testing"
"time"
"github.com/apache/pulsar-client-go/pulsaradmin"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
uAtomic "go.uber.org/atomic"
)
type ExtensibleLoadManagerTestSuite struct {
suite.Suite
}
func TestExtensibleLoadManagerTestSuite(t *testing.T) {
suite.Run(t, new(ExtensibleLoadManagerTestSuite))
}
const (
tenant = utils.PUBLICTENANT
namespace = utils.DEFAULTNAMESPACE
broker1URL = "pulsar://broker-1:6650"
broker2URL = "pulsar://broker-2:6650"
broker1LookupURL = "broker-1:8080"
broker2LookupURL = "broker-2:8080"
)
type mockCounter struct {
prometheus.Counter
count uAtomic.Int32
}
func (m *mockCounter) Inc() {
m.count.Inc()
}
func (suite *ExtensibleLoadManagerTestSuite) TestTopicUnload() {
type topicUnloadTestCase struct {
testCaseName string
adminURL string
clientEndpointFunc func(utils.LookupData) string
unloadEndpointFunc func(utils.LookupData) string
}
for _, scenario := range []topicUnloadTestCase{
{
testCaseName: "directConnection",
adminURL: "http://broker-1:8080",
clientEndpointFunc: func(lookupResult utils.LookupData) string {
return lookupResult.BrokerURL
},
unloadEndpointFunc: func(lookupResult utils.LookupData) string {
return lookupResult.HTTPURL
},
},
{
testCaseName: "proxyConnection",
adminURL: "http://proxy:8080",
clientEndpointFunc: func(utils.LookupData) string {
return "pulsar://proxy:6650"
},
unloadEndpointFunc: func(utils.LookupData) string {
return "http://proxy:8080"
},
},
} {
suite.T().Run(scenario.testCaseName, func(t *testing.T) {
testTopicUnload(t, scenario.adminURL, scenario.clientEndpointFunc, scenario.unloadEndpointFunc)
})
}
}
func testTopicUnload(t *testing.T, adminURL string,
clientEndpointFunc func(utils.LookupData) string,
unloadEndpointFunc func(utils.LookupData) string) {
req := assert.New(t)
admin, err := pulsaradmin.NewClient(&pulsaradmin.Config{WebServiceURL: adminURL})
req.NoError(err)
topicName, err := utils.GetTopicName(newTopicName())
req.NoError(err)
req.NotNil(topicName)
err = admin.Topics().Create(*topicName, 0)
req.NoError(err)
lookupResult, err := admin.Topics().Lookup(*topicName)
req.NoError(err)
req.NotEmpty(lookupResult.BrokerURL)
srcTopicBrokerURL := lookupResult.BrokerURL
req.Contains([...]string{broker1URL, broker2URL}, srcTopicBrokerURL)
var dstTopicBrokerURL string
if srcTopicBrokerURL == broker1URL {
dstTopicBrokerURL = broker2LookupURL
} else {
dstTopicBrokerURL = broker1LookupURL
}
bundleRange, err := admin.Topics().GetBundleRange(*topicName)
req.NoError(err)
req.NotEmpty(bundleRange)
clientURL := clientEndpointFunc(lookupResult)
pulsarClient, err := NewClient(ClientOptions{URL: clientURL})
req.NoError(err)
defer pulsarClient.Close()
producer, err := pulsarClient.CreateProducer(ProducerOptions{
Topic: topicName.String(),
})
req.NoError(err)
defer producer.Close()
consumer, err := pulsarClient.Subscribe(ConsumerOptions{
Topic: topicName.String(),
SubscriptionName: fmt.Sprintf("my-sub-%v", time.Now().Nanosecond()),
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
req.NoError(err)
defer consumer.Close()
pulsarClientImpl := pulsarClient.(*client)
lookupRequestCounterMock := mockCounter{}
pulsarClientImpl.metrics.LookupRequestsCount = &lookupRequestCounterMock
messageCountBeforeUnload := 100
messageCountDuringUnload := 100
messageCountAfterUnload := 100
messageCount := messageCountBeforeUnload + messageCountDuringUnload + messageCountAfterUnload
// Signals all goroutines have completed
wgRoutines := sync.WaitGroup{}
wgRoutines.Add(2)
// Signals unload has completed
wgUnload := sync.WaitGroup{}
wgUnload.Add(1)
// Signals both producer and consumer have processed `messageCountBeforeUnload` messages
wgSendAndReceiveMessages := sync.WaitGroup{}
wgSendAndReceiveMessages.Add(2)
// Producer
go func() {
defer wgRoutines.Done()
for i := 0; i < messageCount; i++ {
if i == messageCountBeforeUnload+messageCountDuringUnload {
wgUnload.Wait()
}
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
pm := ProducerMessage{Payload: []byte(fmt.Sprintf("hello-%d", i))}
_, err := producer.Send(ctx, &pm)
req.NoError(err)
req.NoError(ctx.Err())
if i == messageCountBeforeUnload {
wgSendAndReceiveMessages.Done()
}
}
}()
// Consumer
go func() {
defer wgRoutines.Done()
for i := 0; i < messageCount; i++ {
if i == messageCountBeforeUnload+messageCountDuringUnload {
wgUnload.Wait()
}
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
_, err := consumer.Receive(ctx)
req.NoError(err)
req.NoError(ctx.Err())
if i == messageCountBeforeUnload {
wgSendAndReceiveMessages.Done()
}
}
}()
// Unload the bundle, triggering the producers and consumers to reconnect to the specified broker.
wgSendAndReceiveMessages.Wait()
unloadEndpoint := unloadEndpointFunc(lookupResult)
unloadURL := fmt.Sprintf(
"/admin/v2/namespaces/%s/%s/%s/unload?destinationBroker=%s", tenant, namespace, bundleRange, dstTopicBrokerURL)
makeHTTPCall(t, http.MethodPut, unloadEndpoint+unloadURL, "")
wgUnload.Done()
wgRoutines.Wait()
req.Equal(int32(0), lookupRequestCounterMock.count.Load())
}