Commit 529e748a authored by Brad Davidson's avatar Brad Davidson Committed by Brad Davidson

Move apiserver ready wait into common channel

Splits server startup into prepare/start phases. Server's agent is now started after server is prepared, but before it is started. This allows us to properly bootstrap the executor before starting server components, and use the executor to provide a shared channel to wait on apiserver readiness. This allows us to replace four separate callers of WaitForAPIServerReady with reads from a common ready channel. Signed-off-by: 's avatarBrad Davidson <brad.davidson@rancher.com>
parent 2c133692
...@@ -174,10 +174,27 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error { ...@@ -174,10 +174,27 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error {
return err return err
} }
if err := util.WaitForAPIServerReady(ctx, nodeConfig.AgentConfig.KubeConfigKubelet, util.DefaultAPIServerReadyTimeout); err != nil { go func() {
return pkgerrors.WithMessage(err, "failed to wait for apiserver ready") <-executor.APIServerReadyChan()
} if err := startNetwork(ctx, nodeConfig); err != nil {
logrus.Fatalf("Failed to start networking: %v", err)
}
// By default, the server is responsible for notifying systemd
// On agent-only nodes, the agent will notify systemd
if notifySocket != "" {
logrus.Info(version.Program + " agent is up and running")
os.Setenv("NOTIFY_SOCKET", notifySocket)
systemd.SdNotify(true, "READY=1\n")
}
}()
return nil
}
// startNetwork updates the network annotations on the node, and starts flannel
// and the kube-router netpol controller, if enabled.
func startNetwork(ctx context.Context, nodeConfig *daemonconfig.Node) error {
// Use the kubelet kubeconfig to update annotations on the local node // Use the kubelet kubeconfig to update annotations on the local node
kubeletClient, err := util.GetClientSet(nodeConfig.AgentConfig.KubeConfigKubelet) kubeletClient, err := util.GetClientSet(nodeConfig.AgentConfig.KubeConfigKubelet)
if err != nil { if err != nil {
...@@ -200,16 +217,7 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error { ...@@ -200,16 +217,7 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error {
} }
} }
// By default, the server is responsible for notifying systemd return nil
// On agent-only nodes, the agent will notify systemd
if notifySocket != "" {
logrus.Info(version.Program + " agent is up and running")
os.Setenv("NOTIFY_SOCKET", notifySocket)
systemd.SdNotify(true, "READY=1\n")
}
<-ctx.Done()
return ctx.Err()
} }
// getConntrackConfig uses the kube-proxy code to parse the user-provided kube-proxy-arg values, and // getConntrackConfig uses the kube-proxy code to parse the user-provided kube-proxy-arg values, and
...@@ -258,8 +266,7 @@ func getConntrackConfig(nodeConfig *daemonconfig.Node) (*kubeproxyconfig.KubePro ...@@ -258,8 +266,7 @@ func getConntrackConfig(nodeConfig *daemonconfig.Node) (*kubeproxyconfig.KubePro
// RunStandalone bootstraps the executor, but does not run the kubelet or containerd. // RunStandalone bootstraps the executor, but does not run the kubelet or containerd.
// This allows other bits of code that expect the executor to be set up properly to function // This allows other bits of code that expect the executor to be set up properly to function
// even when the agent is disabled. It will only return in case of error or context // even when the agent is disabled.
// cancellation.
func RunStandalone(ctx context.Context, cfg cmds.Agent) error { func RunStandalone(ctx context.Context, cfg cmds.Agent) error {
proxy, err := createProxyAndValidateToken(ctx, &cfg) proxy, err := createProxyAndValidateToken(ctx, &cfg)
if err != nil { if err != nil {
...@@ -298,13 +305,11 @@ func RunStandalone(ctx context.Context, cfg cmds.Agent) error { ...@@ -298,13 +305,11 @@ func RunStandalone(ctx context.Context, cfg cmds.Agent) error {
} }
} }
<-ctx.Done() return nil
return ctx.Err()
} }
// Run sets up cgroups, configures the LB proxy, and triggers startup // Run sets up cgroups, configures the LB proxy, and triggers startup
// of containerd and kubelet. It will only return in case of error or context // of containerd and kubelet.
// cancellation.
func Run(ctx context.Context, cfg cmds.Agent) error { func Run(ctx context.Context, cfg cmds.Agent) error {
if err := cgroups.Validate(); err != nil { if err := cgroups.Validate(); err != nil {
return err return err
......
...@@ -17,6 +17,7 @@ import ( ...@@ -17,6 +17,7 @@ import (
"github.com/k3s-io/k3s/pkg/agent/proxy" "github.com/k3s-io/k3s/pkg/agent/proxy"
"github.com/k3s-io/k3s/pkg/clientaccess" "github.com/k3s-io/k3s/pkg/clientaccess"
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config" daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/daemons/executor"
"github.com/k3s-io/k3s/pkg/util" "github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version" "github.com/k3s-io/k3s/pkg/version"
"github.com/rancher/remotedialer" "github.com/rancher/remotedialer"
...@@ -94,11 +95,9 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er ...@@ -94,11 +95,9 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er
startTime: time.Now().Truncate(time.Second), startTime: time.Now().Truncate(time.Second),
} }
apiServerReady := make(chan struct{}) rbacReady := make(chan struct{})
go func() { go func() {
if err := util.WaitForAPIServerReady(ctx, config.AgentConfig.KubeConfigKubelet, util.DefaultAPIServerReadyTimeout); err != nil { <-executor.APIServerReadyChan()
logrus.Fatalf("Tunnel watches failed to wait for apiserver ready: %v", err)
}
if err := util.WaitForRBACReady(ctx, config.AgentConfig.KubeConfigK3sController, util.DefaultAPIServerReadyTimeout, authorizationv1.ResourceAttributes{ if err := util.WaitForRBACReady(ctx, config.AgentConfig.KubeConfigK3sController, util.DefaultAPIServerReadyTimeout, authorizationv1.ResourceAttributes{
Namespace: metav1.NamespaceDefault, Namespace: metav1.NamespaceDefault,
Verb: "list", Verb: "list",
...@@ -107,14 +106,14 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er ...@@ -107,14 +106,14 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er
logrus.Fatalf("Tunnel watches failed to wait for RBAC: %v", err) logrus.Fatalf("Tunnel watches failed to wait for RBAC: %v", err)
} }
close(apiServerReady) close(rbacReady)
}() }()
// We don't need to run the tunnel authorizer if the container runtime endpoint is /dev/null, // We don't need to run the tunnel authorizer if the container runtime endpoint is /dev/null,
// signifying that this is an agentless server that will not register a node. // signifying that this is an agentless server that will not register a node.
if config.ContainerRuntimeEndpoint != "/dev/null" { if config.ContainerRuntimeEndpoint != "/dev/null" {
// Allow the kubelet port, as published via our node object. // Allow the kubelet port, as published via our node object.
go tunnel.setKubeletPort(ctx, apiServerReady) go tunnel.setKubeletPort(ctx, rbacReady)
switch tunnel.mode { switch tunnel.mode {
case daemonconfig.EgressSelectorModeCluster: case daemonconfig.EgressSelectorModeCluster:
...@@ -122,7 +121,7 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er ...@@ -122,7 +121,7 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er
tunnel.clusterAuth(config) tunnel.clusterAuth(config)
case daemonconfig.EgressSelectorModePod: case daemonconfig.EgressSelectorModePod:
// In Pod mode, we watch pods assigned to this node, and allow their addresses, as well as ports used by containers with host network. // In Pod mode, we watch pods assigned to this node, and allow their addresses, as well as ports used by containers with host network.
go tunnel.watchPods(ctx, apiServerReady, config) go tunnel.watchPods(ctx, rbacReady, config)
} }
} }
...@@ -165,7 +164,7 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er ...@@ -165,7 +164,7 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
go tunnel.watchEndpoints(ctx, apiServerReady, wg, tlsConfig, config, proxy) go tunnel.watchEndpoints(ctx, rbacReady, wg, tlsConfig, config, proxy)
wait := make(chan int, 1) wait := make(chan int, 1)
go func() { go func() {
...@@ -184,8 +183,8 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er ...@@ -184,8 +183,8 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er
} }
// setKubeletPort retrieves the configured kubelet port from our node object // setKubeletPort retrieves the configured kubelet port from our node object
func (a *agentTunnel) setKubeletPort(ctx context.Context, apiServerReady <-chan struct{}) { func (a *agentTunnel) setKubeletPort(ctx context.Context, rbacReady <-chan struct{}) {
<-apiServerReady <-rbacReady
wait.PollUntilContextTimeout(ctx, time.Second, util.DefaultAPIServerReadyTimeout, true, func(ctx context.Context) (bool, error) { wait.PollUntilContextTimeout(ctx, time.Second, util.DefaultAPIServerReadyTimeout, true, func(ctx context.Context) (bool, error) {
var readyTime metav1.Time var readyTime metav1.Time
...@@ -231,7 +230,7 @@ func (a *agentTunnel) clusterAuth(config *daemonconfig.Node) { ...@@ -231,7 +230,7 @@ func (a *agentTunnel) clusterAuth(config *daemonconfig.Node) {
// watchPods watches for pods assigned to this node, adding their IPs to the CIDR list. // watchPods watches for pods assigned to this node, adding their IPs to the CIDR list.
// If the pod uses host network, we instead add the // If the pod uses host network, we instead add the
func (a *agentTunnel) watchPods(ctx context.Context, apiServerReady <-chan struct{}, config *daemonconfig.Node) { func (a *agentTunnel) watchPods(ctx context.Context, rbacReady <-chan struct{}, config *daemonconfig.Node) {
for _, ip := range config.AgentConfig.NodeIPs { for _, ip := range config.AgentConfig.NodeIPs {
if cidr, err := util.IPToIPNet(ip); err == nil { if cidr, err := util.IPToIPNet(ip); err == nil {
logrus.Infof("Tunnel authorizer adding Node IP %s", cidr) logrus.Infof("Tunnel authorizer adding Node IP %s", cidr)
...@@ -239,7 +238,7 @@ func (a *agentTunnel) watchPods(ctx context.Context, apiServerReady <-chan struc ...@@ -239,7 +238,7 @@ func (a *agentTunnel) watchPods(ctx context.Context, apiServerReady <-chan struc
} }
} }
<-apiServerReady <-rbacReady
nodeName := os.Getenv("NODE_NAME") nodeName := os.Getenv("NODE_NAME")
pods := a.client.CoreV1().Pods(metav1.NamespaceNone) pods := a.client.CoreV1().Pods(metav1.NamespaceNone)
...@@ -308,11 +307,11 @@ func (a *agentTunnel) watchPods(ctx context.Context, apiServerReady <-chan struc ...@@ -308,11 +307,11 @@ func (a *agentTunnel) watchPods(ctx context.Context, apiServerReady <-chan struc
// WatchEndpoints attempts to create tunnels to all supervisor addresses. Once the // WatchEndpoints attempts to create tunnels to all supervisor addresses. Once the
// apiserver is up, go into a watch loop, adding and removing tunnels as endpoints come // apiserver is up, go into a watch loop, adding and removing tunnels as endpoints come
// and go from the cluster. // and go from the cluster.
func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan struct{}, wg *sync.WaitGroup, tlsConfig *tls.Config, node *daemonconfig.Node, proxy proxy.Proxy) { func (a *agentTunnel) watchEndpoints(ctx context.Context, rbacReady <-chan struct{}, wg *sync.WaitGroup, tlsConfig *tls.Config, node *daemonconfig.Node, proxy proxy.Proxy) {
syncProxyAddresses := a.getProxySyncer(ctx, wg, tlsConfig, proxy) syncProxyAddresses := a.getProxySyncer(ctx, wg, tlsConfig, proxy)
refreshFromSupervisor := getAPIServersRequester(node, proxy, syncProxyAddresses) refreshFromSupervisor := getAPIServersRequester(node, proxy, syncProxyAddresses)
<-apiServerReady <-rbacReady
endpoints := a.client.CoreV1().Endpoints(metav1.NamespaceDefault) endpoints := a.client.CoreV1().Endpoints(metav1.NamespaceDefault)
fieldSelector := fields.Set{metav1.ObjectNameField: "kubernetes"}.String() fieldSelector := fields.Set{metav1.ObjectNameField: "kubernetes"}.String()
......
...@@ -130,5 +130,10 @@ func Run(ctx *cli.Context) error { ...@@ -130,5 +130,10 @@ func Run(ctx *cli.Context) error {
return https.Start(ctx, nodeConfig, nil) return https.Start(ctx, nodeConfig, nil)
} }
return agent.Run(contextCtx, cfg) if err := agent.Run(contextCtx, cfg); err != nil {
return err
}
<-contextCtx.Done()
return contextCtx.Err()
} }
...@@ -18,6 +18,7 @@ import ( ...@@ -18,6 +18,7 @@ import (
"github.com/k3s-io/k3s/pkg/cli/cmds" "github.com/k3s-io/k3s/pkg/cli/cmds"
"github.com/k3s-io/k3s/pkg/clientaccess" "github.com/k3s-io/k3s/pkg/clientaccess"
"github.com/k3s-io/k3s/pkg/daemons/config" "github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/daemons/executor"
"github.com/k3s-io/k3s/pkg/datadir" "github.com/k3s-io/k3s/pkg/datadir"
"github.com/k3s-io/k3s/pkg/etcd" "github.com/k3s-io/k3s/pkg/etcd"
k3smetrics "github.com/k3s-io/k3s/pkg/metrics" k3smetrics "github.com/k3s-io/k3s/pkg/metrics"
...@@ -513,28 +514,10 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont ...@@ -513,28 +514,10 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
ctx := signals.SetupSignalContext() ctx := signals.SetupSignalContext()
if err := server.StartServer(ctx, &serverConfig, cfg); err != nil { if err := server.PrepareServer(ctx, &serverConfig, cfg); err != nil {
return err return err
} }
go cmds.WriteCoverage(ctx)
go func() {
if !serverConfig.ControlConfig.DisableAPIServer {
<-serverConfig.ControlConfig.Runtime.APIServerReady
logrus.Info("Kube API server is now running")
serverConfig.ControlConfig.Runtime.StartupHooksWg.Wait()
}
if !serverConfig.ControlConfig.DisableETCD {
<-serverConfig.ControlConfig.Runtime.ETCDReady
logrus.Info("ETCD server is now running")
}
logrus.Info(version.Program + " is up and running")
os.Setenv("NOTIFY_SOCKET", notifySocket)
systemd.SdNotify(true, "READY=1\n")
}()
url := fmt.Sprintf("https://%s:%d", serverConfig.ControlConfig.BindAddressOrLoopback(false, true), serverConfig.ControlConfig.SupervisorPort) url := fmt.Sprintf("https://%s:%d", serverConfig.ControlConfig.BindAddressOrLoopback(false, true), serverConfig.ControlConfig.SupervisorPort)
token, err := clientaccess.FormatToken(serverConfig.ControlConfig.Runtime.AgentToken, serverConfig.ControlConfig.Runtime.ServerCA) token, err := clientaccess.FormatToken(serverConfig.ControlConfig.Runtime.AgentToken, serverConfig.ControlConfig.Runtime.ServerCA)
if err != nil { if err != nil {
...@@ -604,10 +587,38 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont ...@@ -604,10 +587,38 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
if cfg.DisableAgent { if cfg.DisableAgent {
agentConfig.ContainerRuntimeEndpoint = "/dev/null" agentConfig.ContainerRuntimeEndpoint = "/dev/null"
return agent.RunStandalone(ctx, agentConfig) if err := agent.RunStandalone(ctx, agentConfig); err != nil {
return err
}
} else {
if err := agent.Run(ctx, agentConfig); err != nil {
return err
}
}
go cmds.WriteCoverage(ctx)
go func() {
if !serverConfig.ControlConfig.DisableETCD {
<-serverConfig.ControlConfig.Runtime.ETCDReady
logrus.Info("ETCD server is now running")
}
if !serverConfig.ControlConfig.DisableAPIServer {
<-executor.APIServerReadyChan()
logrus.Info("Kube API server is now running")
serverConfig.ControlConfig.Runtime.StartupHooksWg.Wait()
}
logrus.Info(version.Program + " is up and running")
os.Setenv("NOTIFY_SOCKET", notifySocket)
systemd.SdNotify(true, "READY=1\n")
}()
if err := server.StartServer(ctx, &serverConfig, cfg); err != nil {
return err
} }
return agent.Run(ctx, agentConfig) <-ctx.Done()
return ctx.Err()
} }
// validateNetworkConfig ensures that the network configuration values make sense. // validateNetworkConfig ensures that the network configuration values make sense.
......
...@@ -27,16 +27,17 @@ type Cluster struct { ...@@ -27,16 +27,17 @@ type Cluster struct {
cnFilterFunc func(...string) []string cnFilterFunc func(...string) []string
} }
// Start creates the dynamic tls listener, http request handler, // ListenAndServe creates the dynamic tls listener, registers http request
// handles starting and writing/reading bootstrap data, and returns a channel // handlers, and starts the supervisor API server loop.
func (c *Cluster) ListenAndServe(ctx context.Context) error {
// Set up the dynamiclistener and http request handlers
return c.initClusterAndHTTPS(ctx)
}
// Start handles writing/reading bootstrap data, and returns a channel
// that will be closed when datastore is ready. If embedded etcd is in use, // that will be closed when datastore is ready. If embedded etcd is in use,
// a secondary call to Cluster.save is made. // a secondary call to Cluster.save is made.
func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) { func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) {
// Set up the dynamiclistener and http request handlers
if err := c.initClusterAndHTTPS(ctx); err != nil {
return nil, pkgerrors.WithMessage(err, "init cluster datastore and https")
}
if c.config.DisableETCD { if c.config.DisableETCD {
ready := make(chan struct{}) ready := make(chan struct{})
defer close(ready) defer close(ready)
......
...@@ -256,6 +256,7 @@ type Control struct { ...@@ -256,6 +256,7 @@ type Control struct {
SANSecurity bool SANSecurity bool
PrivateIP string PrivateIP string
Runtime *ControlRuntime `json:"-"` Runtime *ControlRuntime `json:"-"`
Cluster Cluster `json:"-"`
} }
// BindAddressOrLoopback returns an IPv4 or IPv6 address suitable for embedding in // BindAddressOrLoopback returns an IPv4 or IPv6 address suitable for embedding in
...@@ -313,7 +314,6 @@ type ControlRuntimeBootstrap struct { ...@@ -313,7 +314,6 @@ type ControlRuntimeBootstrap struct {
type ControlRuntime struct { type ControlRuntime struct {
ControlRuntimeBootstrap ControlRuntimeBootstrap
APIServerReady <-chan struct{}
ContainerRuntimeReady <-chan struct{} ContainerRuntimeReady <-chan struct{}
ETCDReady <-chan struct{} ETCDReady <-chan struct{}
StartupHooksWg *sync.WaitGroup StartupHooksWg *sync.WaitGroup
...@@ -382,6 +382,12 @@ type ControlRuntime struct { ...@@ -382,6 +382,12 @@ type ControlRuntime struct {
EtcdConfig endpoint.ETCDConfig EtcdConfig endpoint.ETCDConfig
} }
type Cluster interface {
Bootstrap(ctx context.Context, reset bool) error
ListenAndServe(ctx context.Context) error
Start(ctx context.Context) (<-chan struct{}, error)
}
type CoreFactory interface { type CoreFactory interface {
Core() core.Interface Core() core.Interface
Sync(ctx context.Context) error Sync(ctx context.Context) error
......
...@@ -36,7 +36,9 @@ import ( ...@@ -36,7 +36,9 @@ import (
_ "k8s.io/component-base/metrics/prometheus/restclient" _ "k8s.io/component-base/metrics/prometheus/restclient"
) )
func Server(ctx context.Context, cfg *config.Control) error { // Prepare loads bootstrap data from the datastore and sets up the initial
// tunnel server request handler and stub authenticator.
func Prepare(ctx context.Context, cfg *config.Control) error {
rand.Seed(time.Now().UTC().UnixNano()) rand.Seed(time.Now().UTC().UnixNano())
logsapi.ReapplyHandling = logsapi.ReapplyHandlingIgnoreUnchanged logsapi.ReapplyHandling = logsapi.ReapplyHandlingIgnoreUnchanged
...@@ -62,6 +64,18 @@ func Server(ctx context.Context, cfg *config.Control) error { ...@@ -62,6 +64,18 @@ func Server(ctx context.Context, cfg *config.Control) error {
} }
cfg.Runtime.Authenticator = auth cfg.Runtime.Authenticator = auth
return nil
}
// Server starts the apiserver and whatever other control-plane components are
// not disabled on this node.
func Server(ctx context.Context, cfg *config.Control) error {
if ready, err := cfg.Cluster.Start(ctx); err != nil {
return pkgerrors.WithMessage(err, "failed to start cluster")
} else {
cfg.Runtime.ETCDReady = ready
}
if !cfg.DisableAPIServer { if !cfg.DisableAPIServer {
go waitForAPIServerHandlers(ctx, cfg.Runtime) go waitForAPIServerHandlers(ctx, cfg.Runtime)
...@@ -70,12 +84,6 @@ func Server(ctx context.Context, cfg *config.Control) error { ...@@ -70,12 +84,6 @@ func Server(ctx context.Context, cfg *config.Control) error {
} }
} }
// Wait for an apiserver to become available before starting additional controllers,
// even if we're not running an apiserver locally.
if err := waitForAPIServerInBackground(ctx, cfg.Runtime); err != nil {
return err
}
if !cfg.DisableScheduler { if !cfg.DisableScheduler {
if err := scheduler(ctx, cfg); err != nil { if err := scheduler(ctx, cfg); err != nil {
return err return err
...@@ -139,7 +147,7 @@ func controllerManager(ctx context.Context, cfg *config.Control) error { ...@@ -139,7 +147,7 @@ func controllerManager(ctx context.Context, cfg *config.Control) error {
args := config.GetArgs(argsMap, cfg.ExtraControllerArgs) args := config.GetArgs(argsMap, cfg.ExtraControllerArgs)
logrus.Infof("Running kube-controller-manager %s", config.ArgString(args)) logrus.Infof("Running kube-controller-manager %s", config.ArgString(args))
return executor.ControllerManager(ctx, cfg.Runtime.APIServerReady, args) return executor.ControllerManager(ctx, args)
} }
func scheduler(ctx context.Context, cfg *config.Control) error { func scheduler(ctx context.Context, cfg *config.Control) error {
...@@ -165,22 +173,12 @@ func scheduler(ctx context.Context, cfg *config.Control) error { ...@@ -165,22 +173,12 @@ func scheduler(ctx context.Context, cfg *config.Control) error {
args := config.GetArgs(argsMap, cfg.ExtraSchedulerAPIArgs) args := config.GetArgs(argsMap, cfg.ExtraSchedulerAPIArgs)
schedulerNodeReady := make(chan struct{}) nodeReady := make(chan struct{})
go func() { go func() {
defer close(schedulerNodeReady) defer close(nodeReady)
apiReadyLoop: <-executor.APIServerReadyChan()
for {
select {
case <-ctx.Done():
return
case <-cfg.Runtime.APIServerReady:
break apiReadyLoop
case <-time.After(30 * time.Second):
logrus.Infof("Waiting for API server to become available to start kube-scheduler")
}
}
// If we're running the embedded cloud controller, wait for it to untaint at least one // If we're running the embedded cloud controller, wait for it to untaint at least one
// node (usually, the local node) before starting the scheduler to ensure that it // node (usually, the local node) before starting the scheduler to ensure that it
...@@ -194,7 +192,7 @@ func scheduler(ctx context.Context, cfg *config.Control) error { ...@@ -194,7 +192,7 @@ func scheduler(ctx context.Context, cfg *config.Control) error {
}() }()
logrus.Infof("Running kube-scheduler %s", config.ArgString(args)) logrus.Infof("Running kube-scheduler %s", config.ArgString(args))
return executor.Scheduler(ctx, schedulerNodeReady, args) return executor.Scheduler(ctx, nodeReady, args)
} }
func apiServer(ctx context.Context, cfg *config.Control) error { func apiServer(ctx context.Context, cfg *config.Control) error {
...@@ -287,6 +285,9 @@ func defaults(config *config.Control) { ...@@ -287,6 +285,9 @@ func defaults(config *config.Control) {
} }
} }
// prepare sets up the server data-dir, calls deps.GenServerDeps to
// set paths, extracts the cluster bootstrap data to the
// configured paths, and starts the supervisor listener.
func prepare(ctx context.Context, config *config.Control) error { func prepare(ctx context.Context, config *config.Control) error {
defaults(config) defaults(config)
...@@ -306,8 +307,8 @@ func prepare(ctx context.Context, config *config.Control) error { ...@@ -306,8 +307,8 @@ func prepare(ctx context.Context, config *config.Control) error {
deps.CreateRuntimeCertFiles(config) deps.CreateRuntimeCertFiles(config)
cluster := cluster.New(config) config.Cluster = cluster.New(config)
if err := cluster.Bootstrap(ctx, config.ClusterReset); err != nil { if err := config.Cluster.Bootstrap(ctx, config.ClusterReset); err != nil {
return pkgerrors.WithMessage(err, "failed to bootstrap cluster data") return pkgerrors.WithMessage(err, "failed to bootstrap cluster data")
} }
...@@ -315,10 +316,8 @@ func prepare(ctx context.Context, config *config.Control) error { ...@@ -315,10 +316,8 @@ func prepare(ctx context.Context, config *config.Control) error {
return pkgerrors.WithMessage(err, "failed to generate server dependencies") return pkgerrors.WithMessage(err, "failed to generate server dependencies")
} }
if ready, err := cluster.Start(ctx); err != nil { if err := config.Cluster.ListenAndServe(ctx); err != nil {
return pkgerrors.WithMessage(err, "failed to start cluster") return pkgerrors.WithMessage(err, "failed to start supervisor listener")
} else {
config.Runtime.ETCDReady = ready
} }
return nil return nil
...@@ -385,17 +384,7 @@ func cloudControllerManager(ctx context.Context, cfg *config.Control) error { ...@@ -385,17 +384,7 @@ func cloudControllerManager(ctx context.Context, cfg *config.Control) error {
go func() { go func() {
defer close(ccmRBACReady) defer close(ccmRBACReady)
apiReadyLoop: <-executor.APIServerReadyChan()
for {
select {
case <-ctx.Done():
return
case <-cfg.Runtime.APIServerReady:
break apiReadyLoop
case <-time.After(30 * time.Second):
logrus.Infof("Waiting for API server to become available to start cloud-controller-manager")
}
}
logrus.Infof("Waiting for cloud-controller-manager privileges to become available") logrus.Infof("Waiting for cloud-controller-manager privileges to become available")
for { for {
...@@ -438,43 +427,6 @@ func waitForAPIServerHandlers(ctx context.Context, runtime *config.ControlRuntim ...@@ -438,43 +427,6 @@ func waitForAPIServerHandlers(ctx context.Context, runtime *config.ControlRuntim
runtime.APIServer = handler runtime.APIServer = handler
} }
func waitForAPIServerInBackground(ctx context.Context, runtime *config.ControlRuntime) error {
done := make(chan struct{})
runtime.APIServerReady = done
go func() {
defer close(done)
etcdLoop:
for {
select {
case <-ctx.Done():
return
case <-runtime.ETCDReady:
break etcdLoop
case <-time.After(30 * time.Second):
logrus.Infof("Waiting for etcd server to become available")
}
}
logrus.Infof("Waiting for API server to become available")
for {
select {
case <-ctx.Done():
return
case err := <-promise(func() error { return util.WaitForAPIServerReady(ctx, runtime.KubeConfigSupervisor, 30*time.Second) }):
if err != nil {
logrus.Infof("Waiting for API server to become available")
continue
}
return
}
}
}()
return nil
}
func promise(f func() error) <-chan error { func promise(f func() error) <-chan error {
c := make(chan error, 1) c := make(chan error, 1)
go func() { go func() {
...@@ -487,7 +439,6 @@ func promise(f func() error) <-chan error { ...@@ -487,7 +439,6 @@ func promise(f func() error) <-chan error {
// waitForUntaintedNode watches nodes, waiting to find one not tainted as // waitForUntaintedNode watches nodes, waiting to find one not tainted as
// uninitialized by the external cloud provider. // uninitialized by the external cloud provider.
func waitForUntaintedNode(ctx context.Context, kubeConfig string) error { func waitForUntaintedNode(ctx context.Context, kubeConfig string) error {
restConfig, err := util.GetRESTConfig(kubeConfig) restConfig, err := util.GetRESTConfig(kubeConfig)
if err != nil { if err != nil {
return err return err
......
...@@ -43,6 +43,7 @@ func init() { ...@@ -43,6 +43,7 @@ func init() {
} }
func (e *Embedded) Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error { func (e *Embedded) Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error {
e.apiServerReady = util.APIServerReadyChan(ctx, nodeConfig.AgentConfig.KubeConfigK3sController, util.DefaultAPIServerReadyTimeout)
e.nodeConfig = nodeConfig e.nodeConfig = nodeConfig
go func() { go func() {
...@@ -72,17 +73,12 @@ func (e *Embedded) Kubelet(ctx context.Context, args []string) error { ...@@ -72,17 +73,12 @@ func (e *Embedded) Kubelet(ctx context.Context, args []string) error {
command.SetArgs(args) command.SetArgs(args)
go func() { go func() {
<-e.APIServerReadyChan()
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
logrus.WithField("stack", string(debug.Stack())).Fatalf("kubelet panic: %v", err) logrus.WithField("stack", string(debug.Stack())).Fatalf("kubelet panic: %v", err)
} }
}() }()
// The embedded executor doesn't need the kubelet to come up to host any components, and
// having it come up on servers before the apiserver is available causes a lot of log spew.
// Agents don't have access to the server's apiReady channel, so just wait directly.
if err := util.WaitForAPIServerReady(ctx, e.nodeConfig.AgentConfig.KubeConfigKubelet, util.DefaultAPIServerReadyTimeout); err != nil {
logrus.Fatalf("Kubelet failed to wait for apiserver ready: %v", err)
}
err := command.ExecuteContext(ctx) err := command.ExecuteContext(ctx)
if err != nil && !errors.Is(err, context.Canceled) { if err != nil && !errors.Is(err, context.Canceled) {
logrus.Errorf("kubelet exited: %v", err) logrus.Errorf("kubelet exited: %v", err)
...@@ -99,6 +95,7 @@ func (e *Embedded) KubeProxy(ctx context.Context, args []string) error { ...@@ -99,6 +95,7 @@ func (e *Embedded) KubeProxy(ctx context.Context, args []string) error {
command.SetArgs(daemonconfig.GetArgs(platformKubeProxyArgs(e.nodeConfig), args)) command.SetArgs(daemonconfig.GetArgs(platformKubeProxyArgs(e.nodeConfig), args))
go func() { go func() {
<-e.APIServerReadyChan()
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
logrus.WithField("stack", string(debug.Stack())).Fatalf("kube-proxy panic: %v", err) logrus.WithField("stack", string(debug.Stack())).Fatalf("kube-proxy panic: %v", err)
...@@ -142,12 +139,13 @@ func (*Embedded) APIServer(ctx context.Context, etcdReady <-chan struct{}, args ...@@ -142,12 +139,13 @@ func (*Embedded) APIServer(ctx context.Context, etcdReady <-chan struct{}, args
return nil return nil
} }
func (e *Embedded) Scheduler(ctx context.Context, apiReady <-chan struct{}, args []string) error { func (e *Embedded) Scheduler(ctx context.Context, nodeReady <-chan struct{}, args []string) error {
command := sapp.NewSchedulerCommand(ctx.Done()) command := sapp.NewSchedulerCommand(ctx.Done())
command.SetArgs(args) command.SetArgs(args)
go func() { go func() {
<-apiReady <-e.APIServerReadyChan()
<-nodeReady
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
logrus.WithField("stack", string(debug.Stack())).Fatalf("scheduler panic: %v", err) logrus.WithField("stack", string(debug.Stack())).Fatalf("scheduler panic: %v", err)
...@@ -164,12 +162,12 @@ func (e *Embedded) Scheduler(ctx context.Context, apiReady <-chan struct{}, args ...@@ -164,12 +162,12 @@ func (e *Embedded) Scheduler(ctx context.Context, apiReady <-chan struct{}, args
return nil return nil
} }
func (*Embedded) ControllerManager(ctx context.Context, apiReady <-chan struct{}, args []string) error { func (e *Embedded) ControllerManager(ctx context.Context, args []string) error {
command := cmapp.NewControllerManagerCommand() command := cmapp.NewControllerManagerCommand()
command.SetArgs(args) command.SetArgs(args)
go func() { go func() {
<-apiReady <-e.APIServerReadyChan()
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
logrus.WithField("stack", string(debug.Stack())).Fatalf("controller-manager panic: %v", err) logrus.WithField("stack", string(debug.Stack())).Fatalf("controller-manager panic: %v", err)
...@@ -243,3 +241,10 @@ func (e *Embedded) Containerd(ctx context.Context, cfg *daemonconfig.Node) error ...@@ -243,3 +241,10 @@ func (e *Embedded) Containerd(ctx context.Context, cfg *daemonconfig.Node) error
func (e *Embedded) Docker(ctx context.Context, cfg *daemonconfig.Node) error { func (e *Embedded) Docker(ctx context.Context, cfg *daemonconfig.Node) error {
return cridockerd.Run(ctx, cfg) return cridockerd.Run(ctx, cfg)
} }
func (e *Embedded) APIServerReadyChan() <-chan struct{} {
if e.apiServerReady == nil {
panic("executor not bootstrapped")
}
return e.apiServerReady
}
...@@ -13,8 +13,11 @@ import ( ...@@ -13,8 +13,11 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
) )
// Embedded is defined here so that we can use embedded.ETCD even when the rest
// of the embedded execututor is disabled by build flags
type Embedded struct { type Embedded struct {
nodeConfig *daemonconfig.Node apiServerReady <-chan struct{}
nodeConfig *daemonconfig.Node
} }
func (e *Embedded) ETCD(ctx context.Context, args ETCDConfig, extraArgs []string) error { func (e *Embedded) ETCD(ctx context.Context, args ETCDConfig, extraArgs []string) error {
......
...@@ -26,13 +26,14 @@ type Executor interface { ...@@ -26,13 +26,14 @@ type Executor interface {
KubeProxy(ctx context.Context, args []string) error KubeProxy(ctx context.Context, args []string) error
APIServerHandlers(ctx context.Context) (authenticator.Request, http.Handler, error) APIServerHandlers(ctx context.Context) (authenticator.Request, http.Handler, error)
APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) error APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) error
Scheduler(ctx context.Context, apiReady <-chan struct{}, args []string) error Scheduler(ctx context.Context, nodeReady <-chan struct{}, args []string) error
ControllerManager(ctx context.Context, apiReady <-chan struct{}, args []string) error ControllerManager(ctx context.Context, args []string) error
CurrentETCDOptions() (InitialOptions, error) CurrentETCDOptions() (InitialOptions, error)
ETCD(ctx context.Context, args ETCDConfig, extraArgs []string) error ETCD(ctx context.Context, args ETCDConfig, extraArgs []string) error
CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error
Containerd(ctx context.Context, node *daemonconfig.Node) error Containerd(ctx context.Context, node *daemonconfig.Node) error
Docker(ctx context.Context, node *daemonconfig.Node) error Docker(ctx context.Context, node *daemonconfig.Node) error
APIServerReadyChan() <-chan struct{}
} }
type ETCDConfig struct { type ETCDConfig struct {
...@@ -154,12 +155,12 @@ func APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) er ...@@ -154,12 +155,12 @@ func APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) er
return executor.APIServer(ctx, etcdReady, args) return executor.APIServer(ctx, etcdReady, args)
} }
func Scheduler(ctx context.Context, apiReady <-chan struct{}, args []string) error { func Scheduler(ctx context.Context, nodeReady <-chan struct{}, args []string) error {
return executor.Scheduler(ctx, apiReady, args) return executor.Scheduler(ctx, nodeReady, args)
} }
func ControllerManager(ctx context.Context, apiReady <-chan struct{}, args []string) error { func ControllerManager(ctx context.Context, args []string) error {
return executor.ControllerManager(ctx, apiReady, args) return executor.ControllerManager(ctx, args)
} }
func CurrentETCDOptions() (InitialOptions, error) { func CurrentETCDOptions() (InitialOptions, error) {
...@@ -181,3 +182,7 @@ func Containerd(ctx context.Context, config *daemonconfig.Node) error { ...@@ -181,3 +182,7 @@ func Containerd(ctx context.Context, config *daemonconfig.Node) error {
func Docker(ctx context.Context, config *daemonconfig.Node) error { func Docker(ctx context.Context, config *daemonconfig.Node) error {
return executor.Docker(ctx, config) return executor.Docker(ctx, config)
} }
func APIServerReadyChan() <-chan struct{} {
return executor.APIServerReadyChan()
}
...@@ -17,6 +17,7 @@ import ( ...@@ -17,6 +17,7 @@ import (
"github.com/k3s-io/k3s/pkg/clientaccess" "github.com/k3s-io/k3s/pkg/clientaccess"
"github.com/k3s-io/k3s/pkg/daemons/config" "github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/daemons/control" "github.com/k3s-io/k3s/pkg/daemons/control"
"github.com/k3s-io/k3s/pkg/daemons/executor"
"github.com/k3s-io/k3s/pkg/datadir" "github.com/k3s-io/k3s/pkg/datadir"
"github.com/k3s-io/k3s/pkg/deploy" "github.com/k3s-io/k3s/pkg/deploy"
"github.com/k3s-io/k3s/pkg/node" "github.com/k3s-io/k3s/pkg/node"
...@@ -44,7 +45,10 @@ func ResolveDataDir(dataDir string) (string, error) { ...@@ -44,7 +45,10 @@ func ResolveDataDir(dataDir string) (string, error) {
return filepath.Join(dataDir, "server"), err return filepath.Join(dataDir, "server"), err
} }
func StartServer(ctx context.Context, config *Config, cfg *cmds.Server) error { // PrepareServer prepares the server for operation. This includes setting paths
// in ControlConfig, creating any certificates not extracted from the bootstrap
// data, and binding request handlers.
func PrepareServer(ctx context.Context, config *Config, cfg *cmds.Server) error {
if err := setupDataDirAndChdir(&config.ControlConfig); err != nil { if err := setupDataDirAndChdir(&config.ControlConfig); err != nil {
return err return err
} }
...@@ -53,6 +57,19 @@ func StartServer(ctx context.Context, config *Config, cfg *cmds.Server) error { ...@@ -53,6 +57,19 @@ func StartServer(ctx context.Context, config *Config, cfg *cmds.Server) error {
return err return err
} }
if err := control.Prepare(ctx, &config.ControlConfig); err != nil {
return err
}
config.ControlConfig.Runtime.Handler = handlers.NewHandler(ctx, &config.ControlConfig, cfg)
return nil
}
// StartServer starts whatever control-plane and etcd components are enabled by
// the current server configuration, runs startup hooks, starts controllers,
// and writes the admin kubeconfig.
func StartServer(ctx context.Context, config *Config, cfg *cmds.Server) error {
if err := control.Server(ctx, &config.ControlConfig); err != nil { if err := control.Server(ctx, &config.ControlConfig); err != nil {
return pkgerrors.WithMessage(err, "starting kubernetes") return pkgerrors.WithMessage(err, "starting kubernetes")
} }
...@@ -60,11 +77,10 @@ func StartServer(ctx context.Context, config *Config, cfg *cmds.Server) error { ...@@ -60,11 +77,10 @@ func StartServer(ctx context.Context, config *Config, cfg *cmds.Server) error {
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
wg.Add(len(config.StartupHooks)) wg.Add(len(config.StartupHooks))
config.ControlConfig.Runtime.Handler = handlers.NewHandler(ctx, &config.ControlConfig, cfg)
config.ControlConfig.Runtime.StartupHooksWg = wg config.ControlConfig.Runtime.StartupHooksWg = wg
shArgs := cmds.StartupHookArgs{ shArgs := cmds.StartupHookArgs{
APIServerReady: config.ControlConfig.Runtime.APIServerReady, APIServerReady: executor.APIServerReadyChan(),
KubeConfigSupervisor: config.ControlConfig.Runtime.KubeConfigSupervisor, KubeConfigSupervisor: config.ControlConfig.Runtime.KubeConfigSupervisor,
Skips: config.ControlConfig.Skips, Skips: config.ControlConfig.Skips,
Disables: config.ControlConfig.Disables, Disables: config.ControlConfig.Disables,
...@@ -87,7 +103,7 @@ func startOnAPIServerReady(ctx context.Context, config *Config) { ...@@ -87,7 +103,7 @@ func startOnAPIServerReady(ctx context.Context, config *Config) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-config.ControlConfig.Runtime.APIServerReady: case <-executor.APIServerReadyChan():
if err := runControllers(ctx, config); err != nil { if err := runControllers(ctx, config); err != nil {
logrus.Fatalf("failed to start controllers: %v", err) logrus.Fatalf("failed to start controllers: %v", err)
} }
......
...@@ -104,6 +104,22 @@ func WaitForAPIServerReady(ctx context.Context, kubeconfigPath string, timeout t ...@@ -104,6 +104,22 @@ func WaitForAPIServerReady(ctx context.Context, kubeconfigPath string, timeout t
return nil return nil
} }
// APIServerReadyChan wraps WaitForAPIServerReady, returning a channel that
// is closed when the apiserver is ready. If the apiserver does not become
// ready within the expected duration, a fatal error is raised.
func APIServerReadyChan(ctx context.Context, kubeConfig string, timeout time.Duration) <-chan struct{} {
ready := make(chan struct{})
go func() {
defer close(ready)
if err := WaitForAPIServerReady(ctx, kubeConfig, timeout); err != nil {
logrus.Fatalf("Failed to wait for API server to become ready: %v", err)
}
}()
return ready
}
type genericAccessReviewRequest func(context.Context) (*authorizationv1.SubjectAccessReviewStatus, error) type genericAccessReviewRequest func(context.Context) (*authorizationv1.SubjectAccessReviewStatus, error)
// WaitForRBACReady polls an AccessReview request until it returns an allowed response. If the user // WaitForRBACReady polls an AccessReview request until it returns an allowed response. If the user
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment