blob: 01f68411340a5d6ad520891ddb735d72967bc668 [file] [log] [blame]
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rpcpb
import (
"context"
"crypto/tls"
"fmt"
"net/url"
"os"
"time"
"github.com/coreos/etcd/clientv3"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/snapshot"
"github.com/dustin/go-humanize"
"go.uber.org/zap"
grpc "google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
// ElectionTimeout returns an election timeout duration.
func (m *Member) ElectionTimeout() time.Duration {
return time.Duration(m.Etcd.ElectionTimeoutMs) * time.Millisecond
}
// DialEtcdGRPCServer creates a raw gRPC connection to an etcd member.
func (m *Member) DialEtcdGRPCServer(opts ...grpc.DialOption) (*grpc.ClientConn, error) {
dialOpts := []grpc.DialOption{
grpc.WithTimeout(5 * time.Second),
grpc.WithBlock(),
}
secure := false
for _, cu := range m.Etcd.AdvertiseClientURLs {
u, err := url.Parse(cu)
if err != nil {
return nil, err
}
if u.Scheme == "https" { // TODO: handle unix
secure = true
}
}
if secure {
// assume save TLS assets are already stord on disk
tlsInfo := transport.TLSInfo{
CertFile: m.ClientCertPath,
KeyFile: m.ClientKeyPath,
TrustedCAFile: m.ClientTrustedCAPath,
// TODO: remove this with generated certs
// only need it for auto TLS
InsecureSkipVerify: true,
}
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return nil, err
}
creds := credentials.NewTLS(tlsConfig)
dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))
} else {
dialOpts = append(dialOpts, grpc.WithInsecure())
}
dialOpts = append(dialOpts, opts...)
return grpc.Dial(m.EtcdClientEndpoint, dialOpts...)
}
// CreateEtcdClientConfig creates a client configuration from member.
func (m *Member) CreateEtcdClientConfig(opts ...grpc.DialOption) (cfg *clientv3.Config, err error) {
secure := false
for _, cu := range m.Etcd.AdvertiseClientURLs {
var u *url.URL
u, err = url.Parse(cu)
if err != nil {
return nil, err
}
if u.Scheme == "https" { // TODO: handle unix
secure = true
}
}
cfg = &clientv3.Config{
Endpoints: []string{m.EtcdClientEndpoint},
DialTimeout: 10 * time.Second,
DialOptions: opts,
}
if secure {
// assume save TLS assets are already stord on disk
tlsInfo := transport.TLSInfo{
CertFile: m.ClientCertPath,
KeyFile: m.ClientKeyPath,
TrustedCAFile: m.ClientTrustedCAPath,
// TODO: remove this with generated certs
// only need it for auto TLS
InsecureSkipVerify: true,
}
var tlsConfig *tls.Config
tlsConfig, err = tlsInfo.ClientConfig()
if err != nil {
return nil, err
}
cfg.TLS = tlsConfig
}
return cfg, err
}
// CreateEtcdClient creates a client from member.
func (m *Member) CreateEtcdClient(opts ...grpc.DialOption) (*clientv3.Client, error) {
cfg, err := m.CreateEtcdClientConfig(opts...)
if err != nil {
return nil, err
}
return clientv3.New(*cfg)
}
// CheckCompact ensures that historical data before given revision has been compacted.
func (m *Member) CheckCompact(rev int64) error {
cli, err := m.CreateEtcdClient()
if err != nil {
return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
wch := cli.Watch(ctx, "\x00", clientv3.WithFromKey(), clientv3.WithRev(rev-1))
wr, ok := <-wch
cancel()
if !ok {
return fmt.Errorf("watch channel terminated (endpoint %q)", m.EtcdClientEndpoint)
}
if wr.CompactRevision != rev {
return fmt.Errorf("got compact revision %v, wanted %v (endpoint %q)", wr.CompactRevision, rev, m.EtcdClientEndpoint)
}
return nil
}
// Defrag runs defragmentation on this member.
func (m *Member) Defrag() error {
cli, err := m.CreateEtcdClient()
if err != nil {
return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
_, err = cli.Defragment(ctx, m.EtcdClientEndpoint)
cancel()
return err
}
// RevHash fetches current revision and hash on this member.
func (m *Member) RevHash() (int64, int64, error) {
conn, err := m.DialEtcdGRPCServer()
if err != nil {
return 0, 0, err
}
defer conn.Close()
mt := pb.NewMaintenanceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
resp, err := mt.Hash(ctx, &pb.HashRequest{}, grpc.FailFast(false))
cancel()
if err != nil {
return 0, 0, err
}
return resp.Header.Revision, int64(resp.Hash), nil
}
// Rev fetches current revision on this member.
func (m *Member) Rev(ctx context.Context) (int64, error) {
cli, err := m.CreateEtcdClient()
if err != nil {
return 0, fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
}
defer cli.Close()
resp, err := cli.Status(ctx, m.EtcdClientEndpoint)
if err != nil {
return 0, err
}
return resp.Header.Revision, nil
}
// Compact compacts member storage with given revision.
// It blocks until it's physically done.
func (m *Member) Compact(rev int64, timeout time.Duration) error {
cli, err := m.CreateEtcdClient()
if err != nil {
return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), timeout)
_, err = cli.Compact(ctx, rev, clientv3.WithCompactPhysical())
cancel()
return err
}
// IsLeader returns true if this member is the current cluster leader.
func (m *Member) IsLeader() (bool, error) {
cli, err := m.CreateEtcdClient()
if err != nil {
return false, fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
}
defer cli.Close()
resp, err := cli.Status(context.Background(), m.EtcdClientEndpoint)
if err != nil {
return false, err
}
return resp.Header.MemberId == resp.Leader, nil
}
// WriteHealthKey writes a health key to this member.
func (m *Member) WriteHealthKey() error {
cli, err := m.CreateEtcdClient()
if err != nil {
return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
}
defer cli.Close()
// give enough time-out in case expensive requests (range/delete) are pending
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, err = cli.Put(ctx, "health", "good")
cancel()
if err != nil {
return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
}
return nil
}
// SaveSnapshot downloads a snapshot file from this member, locally.
// It's meant to requested remotely, so that local member can store
// snapshot file on local disk.
func (m *Member) SaveSnapshot(lg *zap.Logger) (err error) {
// remove existing snapshot first
if err = os.RemoveAll(m.SnapshotPath); err != nil {
return err
}
var ccfg *clientv3.Config
ccfg, err = m.CreateEtcdClientConfig()
if err != nil {
return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
}
lg.Info(
"snapshot save START",
zap.String("member-name", m.Etcd.Name),
zap.Strings("member-client-urls", m.Etcd.AdvertiseClientURLs),
zap.String("snapshot-path", m.SnapshotPath),
)
now := time.Now()
mgr := snapshot.NewV3(lg)
if err = mgr.Save(context.Background(), *ccfg, m.SnapshotPath); err != nil {
return err
}
took := time.Since(now)
var fi os.FileInfo
fi, err = os.Stat(m.SnapshotPath)
if err != nil {
return err
}
var st snapshot.Status
st, err = mgr.Status(m.SnapshotPath)
if err != nil {
return err
}
m.SnapshotInfo = &SnapshotInfo{
MemberName: m.Etcd.Name,
MemberClientURLs: m.Etcd.AdvertiseClientURLs,
SnapshotPath: m.SnapshotPath,
SnapshotFileSize: humanize.Bytes(uint64(fi.Size())),
SnapshotTotalSize: humanize.Bytes(uint64(st.TotalSize)),
SnapshotTotalKey: int64(st.TotalKey),
SnapshotHash: int64(st.Hash),
SnapshotRevision: st.Revision,
Took: fmt.Sprintf("%v", took),
}
lg.Info(
"snapshot save END",
zap.String("member-name", m.SnapshotInfo.MemberName),
zap.Strings("member-client-urls", m.SnapshotInfo.MemberClientURLs),
zap.String("snapshot-path", m.SnapshotPath),
zap.String("snapshot-file-size", m.SnapshotInfo.SnapshotFileSize),
zap.String("snapshot-total-size", m.SnapshotInfo.SnapshotTotalSize),
zap.Int64("snapshot-total-key", m.SnapshotInfo.SnapshotTotalKey),
zap.Int64("snapshot-hash", m.SnapshotInfo.SnapshotHash),
zap.Int64("snapshot-revision", m.SnapshotInfo.SnapshotRevision),
zap.String("took", m.SnapshotInfo.Took),
)
return nil
}
// RestoreSnapshot restores a cluster from a given snapshot file on disk.
// It's meant to requested remotely, so that local member can load the
// snapshot file from local disk.
func (m *Member) RestoreSnapshot(lg *zap.Logger) (err error) {
if err = os.RemoveAll(m.EtcdOnSnapshotRestore.DataDir); err != nil {
return err
}
if err = os.RemoveAll(m.EtcdOnSnapshotRestore.WALDir); err != nil {
return err
}
lg.Info(
"snapshot restore START",
zap.String("member-name", m.Etcd.Name),
zap.Strings("member-client-urls", m.Etcd.AdvertiseClientURLs),
zap.String("snapshot-path", m.SnapshotPath),
)
now := time.Now()
mgr := snapshot.NewV3(lg)
err = mgr.Restore(snapshot.RestoreConfig{
SnapshotPath: m.SnapshotInfo.SnapshotPath,
Name: m.EtcdOnSnapshotRestore.Name,
OutputDataDir: m.EtcdOnSnapshotRestore.DataDir,
OutputWALDir: m.EtcdOnSnapshotRestore.WALDir,
PeerURLs: m.EtcdOnSnapshotRestore.AdvertisePeerURLs,
InitialCluster: m.EtcdOnSnapshotRestore.InitialCluster,
InitialClusterToken: m.EtcdOnSnapshotRestore.InitialClusterToken,
SkipHashCheck: false,
// TODO: set SkipHashCheck it true, to recover from existing db file
})
took := time.Since(now)
lg.Info(
"snapshot restore END",
zap.String("member-name", m.SnapshotInfo.MemberName),
zap.Strings("member-client-urls", m.SnapshotInfo.MemberClientURLs),
zap.String("snapshot-path", m.SnapshotPath),
zap.String("snapshot-file-size", m.SnapshotInfo.SnapshotFileSize),
zap.String("snapshot-total-size", m.SnapshotInfo.SnapshotTotalSize),
zap.Int64("snapshot-total-key", m.SnapshotInfo.SnapshotTotalKey),
zap.Int64("snapshot-hash", m.SnapshotInfo.SnapshotHash),
zap.Int64("snapshot-revision", m.SnapshotInfo.SnapshotRevision),
zap.String("took", took.String()),
zap.Error(err),
)
return err
}