blob: 005ba4b05bd98ddbe00acd73ee659a8303eb3d6d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package api
import (
"bytes"
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"testing"
"github.com/gin-gonic/gin"
"github.com/stretchr/testify/require"
"github.com/apache/kvrocks-controller/consts"
"github.com/apache/kvrocks-controller/server/middleware"
"github.com/apache/kvrocks-controller/store"
"github.com/apache/kvrocks-controller/store/engine"
)
func TestClusterBasics(t *testing.T) {
ns := "test-ns"
handler := &ClusterHandler{s: store.NewClusterStore(engine.NewMock())}
runCreate := func(t *testing.T, name string, expectedStatusCode int) {
testCreateRequest := &CreateClusterRequest{
Name: name,
Nodes: []string{"127.0.0.1:1234", "127.0.0.1:1235", "127.0.0.1:1236", "127.0.0.1:1237"},
Replicas: 2,
}
recorder := httptest.NewRecorder()
ctx := GetTestContext(recorder)
body, err := json.Marshal(testCreateRequest)
require.NoError(t, err)
ctx.Header(consts.HeaderDontCheckClusterMode, "yes")
ctx.Request.Body = io.NopCloser(bytes.NewBuffer(body))
ctx.Params = []gin.Param{{Key: "namespace", Value: ns}}
handler.Create(ctx)
require.Equal(t, expectedStatusCode, recorder.Code)
}
runGet := func(t *testing.T, name string, expectedStatusCode int) {
recorder := httptest.NewRecorder()
ctx := GetTestContext(recorder)
ctx.Set(consts.ContextKeyStore, handler.s)
ctx.Params = []gin.Param{{Key: "namespace", Value: ns}, {Key: "cluster", Value: name}}
middleware.RequiredCluster(ctx)
if recorder.Code != http.StatusOK {
return
}
handler.Get(ctx)
require.Equal(t, expectedStatusCode, recorder.Code)
}
runRemove := func(t *testing.T, name string, expectedStatusCode int) {
recorder := httptest.NewRecorder()
ctx := GetTestContext(recorder)
ctx.Set(consts.ContextKeyStore, handler.s)
ctx.Params = []gin.Param{{Key: "namespace", Value: ns}, {Key: "cluster", Value: name}}
middleware.RequiredCluster(ctx)
if recorder.Code != http.StatusOK {
return
}
handler.Remove(ctx)
require.Equal(t, expectedStatusCode, recorder.Code)
}
t.Run("create cluster", func(t *testing.T) {
runCreate(t, "test-cluster", http.StatusCreated)
runCreate(t, "test-cluster", http.StatusConflict)
})
t.Run("get cluster", func(t *testing.T) {
runGet(t, "test-cluster", http.StatusOK)
runGet(t, "not-exist", http.StatusNotFound)
})
t.Run("list cluster", func(t *testing.T) {
recorder := httptest.NewRecorder()
ctx := GetTestContext(recorder)
ctx.Set(consts.ContextKeyStore, handler.s)
ctx.Params = []gin.Param{{Key: "namespace", Value: ns}}
handler.List(ctx)
require.Equal(t, http.StatusOK, recorder.Code)
var rsp struct {
Data struct {
Clusters []string `json:"clusters"`
} `json:"data"`
}
require.NoError(t, json.Unmarshal(recorder.Body.Bytes(), &rsp))
require.ElementsMatch(t, []string{"test-cluster"}, rsp.Data.Clusters)
})
t.Run("migrate slot only", func(t *testing.T) {
recorder := httptest.NewRecorder()
ctx := GetTestContext(recorder)
ctx.Set(consts.ContextKeyStore, handler.s)
ctx.Params = []gin.Param{{Key: "namespace", Value: ns}, {Key: "cluster", Value: "test-cluster"}}
testMigrateReq := &MigrateSlotRequest{
Slot: 3,
SlotOnly: true,
Target: 1,
}
body, err := json.Marshal(testMigrateReq)
require.NoError(t, err)
ctx.Request.Body = io.NopCloser(bytes.NewBuffer(body))
before, err := handler.s.GetCluster(ctx, ns, "test-cluster")
require.NoError(t, err)
require.EqualValues(t, store.SlotRange{Start: 0, Stop: 8191}, before.Shards[0].SlotRanges[0])
require.EqualValues(t, store.SlotRange{Start: 8192, Stop: store.MaxSlotID}, before.Shards[1].SlotRanges[0])
middleware.RequiredCluster(ctx)
if recorder.Code != http.StatusOK {
return
}
handler.MigrateSlot(ctx)
require.Equal(t, http.StatusOK, recorder.Code)
after, err := handler.s.GetCluster(ctx, ns, "test-cluster")
require.NoError(t, err)
require.EqualValues(t, before.Version.Inc(), after.Version.Load())
require.Len(t, after.Shards[0].SlotRanges, 2)
require.EqualValues(t, store.SlotRange{Start: 0, Stop: 2}, after.Shards[0].SlotRanges[0])
require.EqualValues(t, store.SlotRange{Start: 4, Stop: 8191}, after.Shards[0].SlotRanges[1])
require.Len(t, after.Shards[1].SlotRanges, 2)
require.EqualValues(t, store.SlotRange{Start: 3, Stop: 3}, after.Shards[1].SlotRanges[0])
require.EqualValues(t, store.SlotRange{Start: 8192, Stop: store.MaxSlotID}, after.Shards[1].SlotRanges[1])
})
t.Run("remove cluster", func(t *testing.T) {
runRemove(t, "test-cluster", http.StatusNoContent)
runRemove(t, "not-exist", http.StatusNotFound)
})
}
func TestClusterImport(t *testing.T) {
ns := "test-ns"
clusterName := "test-cluster-import"
handler := &ClusterHandler{s: store.NewClusterStore(engine.NewMock())}
// cluster import must be done on a real cluster
testNodeAddr := "127.0.0.1:7770"
clusterNode := store.NewClusterNode(testNodeAddr, "")
cluster, err := store.NewCluster(clusterName, []string{testNodeAddr}, 1)
require.NoError(t, err)
ctx := context.Background()
require.NoError(t, cluster.Reset(ctx))
require.NoError(t, clusterNode.SyncClusterInfo(ctx, cluster))
defer func() {
// clean up the cluster information to avoid affecting other tests
require.NoError(t, cluster.Reset(ctx))
}()
var req struct {
Nodes []string `json:"nodes"`
}
req.Nodes = []string{testNodeAddr}
recorder := httptest.NewRecorder()
testCtx := GetTestContext(recorder)
body, err := json.Marshal(req)
require.NoError(t, err)
testCtx.Request.Body = io.NopCloser(bytes.NewBuffer(body))
testCtx.Params = []gin.Param{{Key: "namespace", Value: ns}, {Key: "cluster", Value: "test-cluster-import"}}
handler.Import(testCtx)
require.Equal(t, http.StatusOK, recorder.Code)
var rsp struct {
Data struct {
Cluster *store.Cluster `json:"cluster"`
} `json:"data"`
}
require.NoError(t, json.Unmarshal(recorder.Body.Bytes(), &rsp))
require.Len(t, rsp.Data.Cluster.Shards, 1)
require.Len(t, rsp.Data.Cluster.Shards[0].Nodes, 1)
require.Equal(t, testNodeAddr, rsp.Data.Cluster.Shards[0].Nodes[0].Addr())
}
// TestClusterMigrateData only test if the API works well here
// and for the migration status will be tested in the controller
func TestClusterMigrateData(t *testing.T) {
ns := "test-ns"
clusterName := "test-cluster"
handler := &ClusterHandler{s: store.NewClusterStore(engine.NewMock())}
cluster, err := store.NewCluster(clusterName, []string{"127.0.0.1:7770", "127.0.0.1:7771"}, 1)
require.NoError(t, err)
ctx := context.Background()
require.NoError(t, cluster.Reset(ctx))
defer func() {
require.NoError(t, cluster.Reset(ctx))
}()
for _, shard := range cluster.Shards {
for _, node := range shard.Nodes {
require.NoError(t, node.SyncClusterInfo(ctx, cluster))
}
}
recorder := httptest.NewRecorder()
reqCtx := GetTestContext(recorder)
reqCtx.Set(consts.ContextKeyStore, handler.s)
reqCtx.Params = []gin.Param{{Key: "namespace", Value: ns}, {Key: "cluster", Value: "test-cluster"}}
testMigrateReq := &MigrateSlotRequest{
Slot: 0,
Target: 1,
}
body, err := json.Marshal(testMigrateReq)
require.NoError(t, err)
reqCtx.Request.Body = io.NopCloser(bytes.NewBuffer(body))
middleware.RequiredCluster(reqCtx)
if recorder.Code != http.StatusOK {
return
}
handler.MigrateSlot(reqCtx)
require.Equal(t, http.StatusOK, recorder.Code)
gotCluster, err := handler.s.GetCluster(ctx, ns, "test-cluster")
require.NoError(t, err)
require.EqualValues(t, 1, gotCluster.Version.Load())
require.Len(t, gotCluster.Shards[0].SlotRanges, 1)
require.EqualValues(t, 0, gotCluster.Shards[0].MigratingSlot)
require.EqualValues(t, 1, gotCluster.Shards[0].TargetShardIndex)
}