Commit d694dd1d authored by Brad Davidson's avatar Brad Davidson Committed by Brad Davidson

Add periodic background snapshot reconcile

Interval is configurable with new etcd-snapshot-reconcile-interval flag Signed-off-by: 's avatarBrad Davidson <brad.davidson@rancher.com>
parent bed1f668
...@@ -92,6 +92,7 @@ type Server struct { ...@@ -92,6 +92,7 @@ type Server struct {
EtcdExposeMetrics bool EtcdExposeMetrics bool
EtcdSnapshotDir string EtcdSnapshotDir string
EtcdSnapshotCron string EtcdSnapshotCron string
EtcdSnapshotReconcile time.Duration
EtcdSnapshotRetention int EtcdSnapshotRetention int
EtcdSnapshotCompress bool EtcdSnapshotCompress bool
EtcdListFormat string EtcdListFormat string
...@@ -390,6 +391,12 @@ var ServerFlags = []cli.Flag{ ...@@ -390,6 +391,12 @@ var ServerFlags = []cli.Flag{
Destination: &ServerConfig.EtcdSnapshotCron, Destination: &ServerConfig.EtcdSnapshotCron,
Value: "0 */12 * * *", Value: "0 */12 * * *",
}, },
&cli.DurationFlag{
Name: "etcd-snapshot-reconcile-interval",
Usage: "(db) Snapshot reconcile interval",
Destination: &ServerConfig.EtcdSnapshotReconcile,
Value: 10 * time.Minute,
},
&cli.IntFlag{ &cli.IntFlag{
Name: "etcd-snapshot-retention", Name: "etcd-snapshot-retention",
Usage: "(db) Number of snapshots to retain", Usage: "(db) Number of snapshots to retain",
......
...@@ -184,12 +184,19 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont ...@@ -184,12 +184,19 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
serverConfig.ControlConfig.VModule = cmds.LogConfig.VModule serverConfig.ControlConfig.VModule = cmds.LogConfig.VModule
if !cfg.EtcdDisableSnapshots || cfg.ClusterReset { if !cfg.EtcdDisableSnapshots || cfg.ClusterReset {
if cfg.EtcdSnapshotReconcile <= 0 {
return errors.New("etcd-snapshot-reconcile-interval must be greater than 0s")
}
serverConfig.ControlConfig.EtcdSnapshotCompress = cfg.EtcdSnapshotCompress serverConfig.ControlConfig.EtcdSnapshotCompress = cfg.EtcdSnapshotCompress
serverConfig.ControlConfig.EtcdSnapshotName = cfg.EtcdSnapshotName serverConfig.ControlConfig.EtcdSnapshotName = cfg.EtcdSnapshotName
serverConfig.ControlConfig.EtcdSnapshotCron = cfg.EtcdSnapshotCron serverConfig.ControlConfig.EtcdSnapshotCron = cfg.EtcdSnapshotCron
serverConfig.ControlConfig.EtcdSnapshotDir = cfg.EtcdSnapshotDir serverConfig.ControlConfig.EtcdSnapshotDir = cfg.EtcdSnapshotDir
serverConfig.ControlConfig.EtcdSnapshotReconcile = metav1.Duration{Duration: cfg.EtcdSnapshotReconcile}
serverConfig.ControlConfig.EtcdSnapshotRetention = cfg.EtcdSnapshotRetention serverConfig.ControlConfig.EtcdSnapshotRetention = cfg.EtcdSnapshotRetention
if cfg.EtcdS3 { if cfg.EtcdS3 {
if cfg.EtcdS3Timeout <= 0 {
return errors.New("etcd-s3-timeout must be greater than 0s")
}
serverConfig.ControlConfig.EtcdS3 = &config.EtcdS3{ serverConfig.ControlConfig.EtcdS3 = &config.EtcdS3{
AccessKey: cfg.EtcdS3AccessKey, AccessKey: cfg.EtcdS3AccessKey,
Bucket: cfg.EtcdS3BucketName, Bucket: cfg.EtcdS3BucketName,
......
...@@ -3,7 +3,6 @@ package cluster ...@@ -3,7 +3,6 @@ package cluster
import ( import (
"context" "context"
"net/url" "net/url"
"runtime"
"strings" "strings"
"time" "time"
...@@ -44,53 +43,61 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) { ...@@ -44,53 +43,61 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) {
return ready, nil return ready, nil
} }
// start managed database (if necessary) // start managed etcd database; when kine is in use this is a no-op.
if err := c.start(ctx); err != nil { if err := c.start(ctx); err != nil {
return nil, pkgerrors.WithMessage(err, "start managed database") return nil, pkgerrors.WithMessage(err, "start managed database")
} }
// get the wait channel for testing managed database readiness // get the wait channel for testing etcd server readiness; when kine is in
ready, err := c.testClusterDB(ctx) // use the channel is closed immediately.
if err != nil { ready := c.testClusterDB(ctx)
return nil, err
}
// set c.config.Datastore and c.config.Runtime.EtcdConfig with values
// necessary to build etcd clients, and start kine listener if necessary.
if err := c.startStorage(ctx, false); err != nil { if err := c.startStorage(ctx, false); err != nil {
return nil, err return nil, err
} }
// if necessary, store bootstrap data to datastore // if necessary, store bootstrap data to datastore. saveBootstrap is only set
// when using kine, so this can be done before the ready channel has been closed.
if c.saveBootstrap { if c.saveBootstrap {
if err := Save(ctx, c.config, false); err != nil { if err := Save(ctx, c.config, false); err != nil {
return nil, err return nil, err
} }
} }
// at this point, if etcd is in use, it's bootstrapping is complete
// so save the bootstrap data. We will need for etcd to be up. If
// the save call returns an error, we panic since subsequent etcd
// snapshots will be empty.
if c.managedDB != nil { if c.managedDB != nil {
go func() { go func() {
for { for {
select { select {
case <-ready: case <-ready:
// always save to managed etcd, to ensure that any file modified locally are in sync with the datastore.
// this will panic if multiple keys exist, to prevent nodes from running with different bootstrap data.
if err := Save(ctx, c.config, false); err != nil { if err := Save(ctx, c.config, false); err != nil {
panic(err) panic(err)
} }
if !c.config.EtcdDisableSnapshots { if !c.config.EtcdDisableSnapshots {
_ = wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) { // do an initial reconcile of snapshots with a fast retry until it succeeds
err := c.managedDB.ReconcileSnapshotData(ctx) wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
if err != nil { if err := c.managedDB.ReconcileSnapshotData(ctx); err != nil {
logrus.Errorf("Failed to record snapshots for cluster: %v", err) logrus.Errorf("Failed to record snapshots for cluster: %v", err)
return false, nil
} }
return err == nil, nil return true, nil
}) })
// continue reconciling snapshots in the background at the configured interval.
// the interval is jittered by 5% to avoid all nodes reconciling at the same time.
wait.JitterUntilWithContext(ctx, func(ctx context.Context) {
if err := c.managedDB.ReconcileSnapshotData(ctx); err != nil {
logrus.Errorf("Failed to record snapshots for cluster: %v", err)
}
}, c.config.EtcdSnapshotReconcile.Duration, 0.05, false)
} }
return return
default: case <-ctx.Done():
runtime.Gosched() return
} }
} }
}() }()
......
...@@ -25,11 +25,11 @@ import ( ...@@ -25,11 +25,11 @@ import (
// testClusterDB returns a channel that will be closed when the datastore connection is available. // testClusterDB returns a channel that will be closed when the datastore connection is available.
// The datastore is tested for readiness every 5 seconds until the test succeeds. // The datastore is tested for readiness every 5 seconds until the test succeeds.
func (c *Cluster) testClusterDB(ctx context.Context) (<-chan struct{}, error) { func (c *Cluster) testClusterDB(ctx context.Context) <-chan struct{} {
result := make(chan struct{}) result := make(chan struct{})
if c.managedDB == nil { if c.managedDB == nil {
close(result) close(result)
return result, nil return result
} }
go func() { go func() {
...@@ -50,7 +50,7 @@ func (c *Cluster) testClusterDB(ctx context.Context) (<-chan struct{}, error) { ...@@ -50,7 +50,7 @@ func (c *Cluster) testClusterDB(ctx context.Context) (<-chan struct{}, error) {
} }
}() }()
return result, nil return result
} }
// start starts the database, unless a cluster reset has been requested, in which case // start starts the database, unless a cluster reset has been requested, in which case
......
...@@ -235,17 +235,18 @@ type Control struct { ...@@ -235,17 +235,18 @@ type Control struct {
ClusterResetRestorePath string ClusterResetRestorePath string
MinTLSVersion string MinTLSVersion string
CipherSuites []string CipherSuites []string
TLSMinVersion uint16 `json:"-"` TLSMinVersion uint16 `json:"-"`
TLSCipherSuites []uint16 `json:"-"` TLSCipherSuites []uint16 `json:"-"`
EtcdSnapshotName string `json:"-"` EtcdSnapshotName string `json:"-"`
EtcdDisableSnapshots bool `json:"-"` EtcdDisableSnapshots bool `json:"-"`
EtcdExposeMetrics bool `json:"-"` EtcdExposeMetrics bool `json:"-"`
EtcdSnapshotDir string `json:"-"` EtcdSnapshotDir string `json:"-"`
EtcdSnapshotCron string `json:"-"` EtcdSnapshotCron string `json:"-"`
EtcdSnapshotRetention int `json:"-"` EtcdSnapshotReconcile metav1.Duration `json:"-"`
EtcdSnapshotCompress bool `json:"-"` EtcdSnapshotRetention int `json:"-"`
EtcdListFormat string `json:"-"` EtcdSnapshotCompress bool `json:"-"`
EtcdS3 *EtcdS3 `json:"-"` EtcdListFormat string `json:"-"`
EtcdS3 *EtcdS3 `json:"-"`
ServerNodeName string ServerNodeName string
VLevel int VLevel int
VModule string VModule string
......
...@@ -31,6 +31,7 @@ import ( ...@@ -31,6 +31,7 @@ import (
healthpb "google.golang.org/grpc/health/grpc_health_v1" healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection" "google.golang.org/grpc/reflection"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilnet "k8s.io/apimachinery/pkg/util/net" utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
) )
...@@ -67,6 +68,7 @@ func generateTestConfig() *config.Control { ...@@ -67,6 +68,7 @@ func generateTestConfig() *config.Control {
DataDir: "/tmp/k3s/", // Different than the default value DataDir: "/tmp/k3s/", // Different than the default value
EtcdSnapshotName: "etcd-snapshot", EtcdSnapshotName: "etcd-snapshot",
EtcdSnapshotCron: "0 */12 * * *", EtcdSnapshotCron: "0 */12 * * *",
EtcdSnapshotReconcile: metav1.Duration{Duration: 10 * time.Minute},
EtcdSnapshotRetention: 5, EtcdSnapshotRetention: 5,
EtcdS3: &config.EtcdS3{ EtcdS3: &config.EtcdS3{
Endpoint: "s3.amazonaws.com", Endpoint: "s3.amazonaws.com",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment