爱豆吧!

idouba@beta.

kubernetes liveness probe 流程

1 概述

kubernetes提供了的Probe可以进行健康检查。

https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-probes/

对pod中的每个容器通过配置liveness或者readiness。

当liveness probe failed后,该container会杀掉,并重新创建;而readinessProbe失败,则该pod ip 会从service的endpoints列表中删除,即隔离到该后端的请求。

如liveness 配置如下:

livenessProbe:
  httpGet:
    port: 10252
    path: “/healthz”
  initialDelaySeconds: 15
  timeoutSeconds: 15

文中尝试端到端的看下整个过程有哪些组件参与进来,怎么配合工作的。

2 配置

pkg/api/types.go#Probe结构描述了Probe的一个定义。其中Handler是执行的动作,initialDelaySeconds表示容器启动后延迟多少秒初始化probe,考虑一般应用启动需要一定时间。periodSeconds 表示周期检查的间隔,默认10秒,最小1秒。timeoutSeconds会告诉健康检查等待多长时间,默认1秒,最小1秒。successThreshold表示连续探测多少次成功才算成功。failureThreshold表示连续探测多少次失败才算失败。默认是3.

type Probe struct {
Handler json:",inline"
InitialDelaySeconds int32 json:"initialDelaySeconds,omitempty"
TimeoutSeconds int32 json:"timeoutSeconds,omitempty"
PeriodSeconds int32 json:"periodSeconds,omitempty"
SuccessThreshold int32 json:"successThreshold,omitempty"
FailureThreshold int32 json:"failureThreshold,omitempty"
}

探测动作Handler支持httpget tcd 和exec三种动作。

httpGet对应一个http请求,主要是配置http端口和path;TCPSocket对应一个TCP请求,主要是配置一个TCP端口,EXEC表示执行一个命令。各个handler详细的定义不看了。

type Handler struct {
Exec *ExecAction json:"exec,omitempty"
// HTTPGet specifies the http request to perform.
// +optional
HTTPGet *HTTPGetAction json:"httpGet,omitempty"
// TCPSocket specifies an action involving a TCP port.
// TODO: implement a realistic TCP lifecycle hook
// +optional
TCPSocket *TCPSocketAction json:"tcpSocket,omitempty"
}

下面看下probe的探测是怎么被执行的,探测结果又是如何处理的。

在kubelet启动的时候根据配置定期执行配置的Probe,写状态。

pkg/kubelet/kubelet.go# NewMainKubelet

klet.probeManager = prober.NewManager(
klet.statusManager,
klet.livenessManager,
klet.runner,
containerRefManager,
kubeDeps.Recorder)

kubelete创建时会创建一个prober_manager. 负责pod 的probe。主要工作是:对每个容器创建一个worker,worker周期的执行探测并保存探测结果。

pkg/kubelet/prober/prober_manager.go

kubelet的addpod的操作HandlePodAdditions会触发prober_manager的AddPod,对pod中每个容器检查是否配置了readiness或者liveness的probe,并创建启动对应的worker。

kubelete创建时会创建一个prober_manager. 负责pod 的probe。主要工作是:对每个容器创建一个worker,worker周期的执行探测并保存探测结果。

pkg/kubelet/prober/prober_manager.go

func (m *manager) AddPod(pod *api.Pod) {
for _, c := range pod.Spec.Containers {
if c.ReadinessProbe != nil {
key.probeType = readiness
w := newWorker(m, readiness, pod, c)
m.workers[key] = w
go w.run()
}
if c.LivenessProbe != nil {
key.probeType = liveness
w := newWorker(m, liveness, pod, c)
m.workers[key] = w
go w.run()
}

根据probe中配置的PeriodSeconds周期执行probe动作。

pkg/kubelet/prober/worker.go # doProbe

func (w *worker) doProbe() (keepGoing bool) {
result, err := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID, prev)
w.resultsManager.Set(w.containerID, result, w.pod)

执行prober中的探测动作,并将探测结果保持到resultsManager中。

忽略细节,调用过程是; pkg/kubelet/prober/prober.go#probe –> pkg/kubelet/prober/prober.go#runProbeWithRetries –> pkg/kubelet/prober/prober.go#runProbe

根据配置的是HTTPGET、Exec还是TCPSocket执行不同的探测。

func (pb *prober) runProbe(p *api.Probe, pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) {
if p.Exec != nil {
return pb.exec.Probe(pb.newExecInContainer(container, containerID, p.Exec.Command, timeout))
}
if p.HTTPGet != nil {
return pb.http.Probe(url, headers, timeout)
}
if p.TCPSocket != nil {
return pb.tcp.Probe(status.PodIP, port, timeout)
}
}<

分别对应pkg/probe/http/http.go,pkg/probe/tcp/tcp.go,pkg/probe/exec/exec.go 三种探测方式。各种方式也就是检查执行结果是否有错。

Tcp探测看连接后是否error

func DoTCPProbe(addr string, timeout time.Duration) (probe.Result, string, error) {
conn, err := net.DialTimeout(tcp, addr, timeout)
if err != nil {
// Convert errors to failures to handle timeouts.
return probe.Failure, err.Error(), nil
}

http探测看StatusCode

func DoHTTPProbe(url *url.URL, headers http.Header, client HTTPGetInterface) (probe.Result, string, error) {
req, err := http.NewRequest(GET, url.String(), nil)
if err != nil {
// Convert errors into failures to catch timeouts.
return probe.Failure, err.Error(), nil
}
res, err := client.Do(req)
if res.StatusCode &gt;= http.StatusOK && res.StatusCode &lt; http.StatusBadRequest {
return probe.Success, body, nil
}

exec探测看执行返回码

func (pr execProber) Probe(e exec.Cmd) (probe.Result, string, error) {
data, err := e.CombinedOutput()
if err != nil {
exit, ok := err.(exec.ExitError)
if ok {
if exit.ExitStatus() == 0 {
return probe.Success, string(data), nil
} else {
return probe.Failure, string(data), nil
}
}
return probe.Unknown, , err

至此探测流程很简单的看完了,关注下探测的结果的保存。

在prober_manager中,根据不同的探测类型,构造不同的worker,结果会保存到对应的prober_manager的readinessManager或者livenessManger中。

 4 根据状态 action

pkg/kubelet/kubelet.go#NewMainKubelet

kubelet启动时候会构造podWorker

klet.podWorkers = newPodWorkers(klet.syncPod, klet.syncPodWithProcess, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache, klet.processCache)

syncPod是kubelet一个比较核心的方法,做了包括更新为static pod创建mirrorpod,为pod创建数据目录等操作,还有就是调用容器runtime的SyncPod的回调。

result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
kl.reasonCache.Update(pod.UID, result)

pkg/kubelet/container/runtime.go

定义了对pod的各种操作。是容器运行时需要实现的接口。

type Runtime interface {
Type() string
Status() (*RuntimeStatus, error)
GetPods(all bool) ([]*Pod, error)
GarbageCollect(gcPolicy ContainerGCPolicy, allSourcesReady bool) error
SyncPod(pod *api.Pod, apiPodStatus api.PodStatus, podStatus *PodStatus, pullSecrets []api.Secret, backOff *flowcontrol.Backoff) PodSyncResult
KillPod(pod *api.Pod, runningPod Pod, gracePeriodOverride *int64) error
GetPodStatus(uid types.UID, name, namespace string) (*PodStatus, error)
GetNetNS(containerID ContainerID) (string, error)
GetPodContainerID(*Pod) (ContainerID, error)
GetContainerLogs(pod *api.Pod, containerID ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error)
DeleteContainer(containerID ContainerID) error
UpdatePodCIDR(podCIDR string) error
}

两种实现pkg/kubelet/dockertools/docker_manager.go 中的DockerManager 和

pkg/kubelet/rkt/rkt.go中Runtime

如pkg/kubelet/dockertools/docker_manager.go # SyncPod

containerChanges, err := dm.computePodContainerChanges(pod, podStatus)
for _, containerStatus := range runningContainerStatues {
_, keep := containerChanges.ContainersToKeep[kubecontainer.DockerID(containerStatus.ID.ID)]
if !keep && !keepInit {
if err := dm.KillContainerInPod(containerStatus.ID, podContainer, pod, killMessage, nil); }
}

pkg/kubelet/dockertools/docker_manager.go# computePodContainerChanges

liveness, found := dm.livenessManager.Get(containerStatus.ID)
if !found || liveness == proberesults.Success {
containersToKeep[containerID] = index
continue
}
if pod.Spec.RestartPolicy != api.RestartPolicyNever {
message := fmt.Sprintf(pod %q container %q is unhealthy, it will be killed and re-created., format.Pod(pod), container.Name)
glog.Info(message)
containersToStart[index] = message
}

从livenessmanger获取对应container的状态,状态不是Success的都会被kill掉,在这里就是不在containersToKeep和initContainersToKeep的 container都会被杀掉

pkg/kubelet/rkt/rkt.go 中的Runtime的处理也是类似,从livenessmanager中获取状态,并执行杀pod的动作。

liveness, found := r.livenessManager.Get(c.ID)
if found && liveness != proberesults.Success && pod.Spec.RestartPolicy != api.RestartPolicyNever {
glog.Infof(Pod %q container %q is unhealthy, it will be killed and re-created., format.Pod(pod), container.Name)
restartPod = true
}
delete(unidentifiedContainers, c.ID)
}
if restartPod {
if err = r.KillPod(pod, runningPod, nil); }

5 总结

对于文章前probe的处理过程,加上执行的主体,整个过程应该描述如下:

Kubelet 在延迟initialDelaySeconds后以periodSeconds为周期,执行定义的探测(包括httpGet,TcpSocket或者exec),并保存探测结果。对于liveness,kubelet会读取该探测结果,杀掉对应容器。