blob: 3f3d3ff5aed5488a77414ca3cabd91ef77f245f3 [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package grpc_test
import (
"context"
"encoding/base64"
"io"
"path/filepath"
"runtime"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
grpclib "google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
"github.com/apache/skywalking-banyandb/banyand/measure"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/query"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/stream"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/test"
teststream "github.com/apache/skywalking-banyandb/pkg/test/stream"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
var _ = Describe("Stream", func() {
var path string
var gracefulStop, deferFunc func()
var conn *grpclib.ClientConn
var listenClientURL, listenPeerURL string
BeforeEach(func() {
var err error
path, deferFunc, err = test.NewSpace()
Expect(err).NotTo(HaveOccurred())
listenClientURL, listenPeerURL, err = test.NewEtcdListenUrls()
Expect(err).NotTo(HaveOccurred())
})
It("is a plain server", func() {
By("Verifying an empty server")
flags := []string{
"--stream-root-path=" + path, "--measure-root-path=" + path, "--metadata-root-path=" + path,
"--etcd-listen-client-url=" + listenClientURL, "--etcd-listen-peer-url=" + listenPeerURL,
}
gracefulStop = setup(true, flags)
var err error
conn, err = grpclib.Dial("localhost:17912", grpclib.WithTransportCredentials(insecure.NewCredentials()))
Expect(err).NotTo(HaveOccurred())
streamWrite(conn)
Eventually(func() (int, error) {
return streamQuery(conn)
}).Should(Equal(1))
_ = conn.Close()
gracefulStop()
By("Verifying an existing server")
gracefulStop = setup(false, flags)
conn, err = grpclib.Dial("localhost:17912", grpclib.WithTransportCredentials(insecure.NewCredentials()))
Expect(err).NotTo(HaveOccurred())
Eventually(func() int {
num, err := streamQuery(conn)
if err != nil {
GinkgoWriter.Printf("stream query err: %v \n", err)
return 0
}
return num
}, defaultEventuallyTimeout).Should(Equal(1))
})
It("is a TLS server", func() {
flags := []string{
"--tls=true", "--stream-root-path=" + path, "--measure-root-path=" + path,
"--metadata-root-path=" + path, "--etcd-listen-client-url=" + listenClientURL,
"--etcd-listen-peer-url=" + listenPeerURL,
}
_, currentFile, _, _ := runtime.Caller(0)
basePath := filepath.Dir(currentFile)
certFile := filepath.Join(basePath, "testdata/server_cert.pem")
keyFile := filepath.Join(basePath, "testdata/server_key.pem")
flags = append(flags, "--cert-file="+certFile)
flags = append(flags, "--key-file="+keyFile)
addr := "localhost:17913"
flags = append(flags, "--addr="+addr)
gracefulStop = setup(true, flags)
creds, err := credentials.NewClientTLSFromFile(certFile, "localhost")
Expect(err).NotTo(HaveOccurred())
conn, err = grpclib.Dial(addr, grpclib.WithTransportCredentials(creds))
Expect(err).NotTo(HaveOccurred())
streamWrite(conn)
Eventually(func() (int, error) {
return streamQuery(conn)
}, defaultEventuallyTimeout).Should(Equal(1))
})
AfterEach(func() {
_ = conn.Close()
gracefulStop()
deferFunc()
})
})
func setup(loadMetadata bool, flags []string) func() {
// Init `Discovery` module
repo, err := discovery.NewServiceRepo(context.Background())
Expect(err).NotTo(HaveOccurred())
// Init `Queue` module
pipeline, err := queue.NewQueue(context.TODO(), repo)
Expect(err).NotTo(HaveOccurred())
// Init `Metadata` module
metaSvc, err := metadata.NewService(context.TODO())
Expect(err).NotTo(HaveOccurred())
// Init `Stream` module
streamSvc, err := stream.NewService(context.TODO(), metaSvc, repo, pipeline)
Expect(err).NotTo(HaveOccurred())
// Init `Measure` module
measureSvc, err := measure.NewService(context.TODO(), metaSvc, repo, pipeline)
Expect(err).NotTo(HaveOccurred())
// Init `Query` module
q, err := query.NewExecutor(context.TODO(), streamSvc, measureSvc, metaSvc, repo, pipeline)
Expect(err).NotTo(HaveOccurred())
tcp := grpc.NewServer(context.TODO(), pipeline, repo, metaSvc)
if loadMetadata {
return test.SetUpModules(
flags,
repo,
pipeline,
metaSvc,
&preloadStreamService{metaSvc: metaSvc},
&preloadMeasureService{metaSvc: metaSvc},
streamSvc,
measureSvc,
q,
tcp,
)
}
return test.SetUpModules(
flags,
repo,
pipeline,
metaSvc,
streamSvc,
measureSvc,
q,
tcp,
)
}
type preloadStreamService struct {
metaSvc metadata.Service
}
func (p *preloadStreamService) Name() string {
return "preload-stream"
}
func (p *preloadStreamService) PreRun() error {
return teststream.PreloadSchema(p.metaSvc.SchemaRegistry())
}
func writeStreamData() *streamv1.WriteRequest {
bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+")
return pbv1.NewStreamWriteRequestBuilder().
ID("1").
Metadata("default", "sw").
Timestamp(timestamp.NowMilli()).
TagFamily(bb).
TagFamily(
"trace_id-xxfff.111",
0,
"webapp_id",
"10.0.0.1_id",
"/home_id",
300,
1622933202000000000,
).
Build()
}
func queryStreamCriteria(baseTs time.Time) *streamv1.QueryRequest {
return pbv1.NewStreamQueryRequestBuilder().
Limit(10).
Offset(0).
Metadata("default", "sw").
TimeRange(baseTs.Add(-10*time.Minute), baseTs.Add(10*time.Minute)).
Projection("searchable", "trace_id").
Build()
}
func streamWrite(conn *grpclib.ClientConn) {
c := streamv1.NewStreamServiceClient(conn)
ctx := context.Background()
var writeClient streamv1.StreamService_WriteClient
Eventually(func(g Gomega) {
var err error
writeClient, err = c.Write(ctx)
g.Expect(err).NotTo(HaveOccurred())
}).Should(Succeed())
Eventually(func() error {
return writeClient.Send(writeStreamData())
}).ShouldNot(HaveOccurred())
Expect(writeClient.CloseSend()).Should(Succeed())
Eventually(func() error {
_, err := writeClient.Recv()
return err
}).Should(Equal(io.EOF))
}
func streamQuery(conn *grpclib.ClientConn) (int, error) {
c := streamv1.NewStreamServiceClient(conn)
ctx := context.Background()
resp, err := c.Query(ctx, queryStreamCriteria(timestamp.NowMilli()))
return len(resp.GetElements()), err
}