blob: b74b84b158ec0cbdcc7acd261f3769f24de33107 [file] [log] [blame]
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tester
import (
"fmt"
"time"
"github.com/coreos/etcd/functional/rpcpb"
"go.uber.org/zap"
)
// Stresser defines stressing client operations.
type Stresser interface {
// Stress starts to stress the etcd cluster
Stress() error
// Pause stops the stresser from sending requests to etcd. Resume by calling Stress.
Pause() map[string]int
// Close releases all of the Stresser's resources.
Close() map[string]int
// ModifiedKeys reports the number of keys created and deleted by stresser
ModifiedKeys() int64
}
// newStresser creates stresser from a comma separated list of stresser types.
func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) {
stressers = make([]Stresser, len(clus.Tester.Stressers))
for i, stype := range clus.Tester.Stressers {
clus.lg.Info(
"creating stresser",
zap.String("type", stype),
zap.String("endpoint", m.EtcdClientEndpoint),
)
switch stype {
case "KV":
// TODO: Too intensive stressing clients can panic etcd member with
// 'out of memory' error. Put rate limits in server side.
stressers[i] = &keyStresser{
stype: rpcpb.Stresser_KV,
lg: clus.lg,
m: m,
keySize: int(clus.Tester.StressKeySize),
keyLargeSize: int(clus.Tester.StressKeySizeLarge),
keySuffixRange: int(clus.Tester.StressKeySuffixRange),
keyTxnSuffixRange: int(clus.Tester.StressKeySuffixRangeTxn),
keyTxnOps: int(clus.Tester.StressKeyTxnOps),
clientsN: int(clus.Tester.StressClients),
rateLimiter: clus.rateLimiter,
}
case "LEASE":
stressers[i] = &leaseStresser{
stype: rpcpb.Stresser_LEASE,
lg: clus.lg,
m: m,
numLeases: 10, // TODO: configurable
keysPerLease: 10, // TODO: configurable
rateLimiter: clus.rateLimiter,
}
case "ELECTION_RUNNER":
reqRate := 100
args := []string{
"election",
fmt.Sprintf("%v", time.Now().UnixNano()), // election name as current nano time
"--dial-timeout=10s",
"--endpoints", m.EtcdClientEndpoint,
"--total-client-connections=10",
"--rounds=0", // runs forever
"--req-rate", fmt.Sprintf("%v", reqRate),
}
stressers[i] = newRunnerStresser(
rpcpb.Stresser_ELECTION_RUNNER,
m.EtcdClientEndpoint,
clus.lg,
clus.Tester.RunnerExecPath,
args,
clus.rateLimiter,
reqRate,
)
case "WATCH_RUNNER":
reqRate := 100
args := []string{
"watcher",
"--prefix", fmt.Sprintf("%v", time.Now().UnixNano()), // prefix all keys with nano time
"--total-keys=1",
"--total-prefixes=1",
"--watch-per-prefix=1",
"--endpoints", m.EtcdClientEndpoint,
"--rounds=0", // runs forever
"--req-rate", fmt.Sprintf("%v", reqRate),
}
stressers[i] = newRunnerStresser(
rpcpb.Stresser_WATCH_RUNNER,
m.EtcdClientEndpoint,
clus.lg,
clus.Tester.RunnerExecPath,
args,
clus.rateLimiter,
reqRate,
)
case "LOCK_RACER_RUNNER":
reqRate := 100
args := []string{
"lock-racer",
fmt.Sprintf("%v", time.Now().UnixNano()), // locker name as current nano time
"--endpoints", m.EtcdClientEndpoint,
"--total-client-connections=10",
"--rounds=0", // runs forever
"--req-rate", fmt.Sprintf("%v", reqRate),
}
stressers[i] = newRunnerStresser(
rpcpb.Stresser_LOCK_RACER_RUNNER,
m.EtcdClientEndpoint,
clus.lg,
clus.Tester.RunnerExecPath,
args,
clus.rateLimiter,
reqRate,
)
case "LEASE_RUNNER":
args := []string{
"lease-renewer",
"--ttl=30",
"--endpoints", m.EtcdClientEndpoint,
}
stressers[i] = newRunnerStresser(
rpcpb.Stresser_LEASE_RUNNER,
m.EtcdClientEndpoint,
clus.lg,
clus.Tester.RunnerExecPath,
args,
clus.rateLimiter,
0,
)
}
}
return stressers
}