本文从kubelet组件源码角度入手,分析kubelet中的事件处理机制

1. kubelet事件处理机制入口

// RunKubelet is responsible for setting up and running a kubelet.  It is used in three different applications:
//   1 Integration tests
//   2 Kubelet binary
//   3 Standalone 'kubernetes' binary
// Eventually, #2 will be replaced with instances of #3

// 第一步调用 makeEventRecorder,这里就表示 事件处理机制 产生了
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
	...
	// Setup event recorder if required.
	makeEventRecorder(kubeDeps, nodeName)
  ...
}

makeEventRecorder主要做了以下事情:

(1)初始化 EventBroadcaster

(2)初始化 Recorder

(3)记录文件到本地

(4)上传事件到apiserver

// makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise.
func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
	if kubeDeps.Recorder != nil {
		return
	}
  // 初始化 EventBroadcaster
	eventBroadcaster := record.NewBroadcaster()
  // 初始化 Recorder
	kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
  // 记录文件到本地
	eventBroadcaster.StartLogging(glog.V(3).Infof)
	
	// 上传事件到apiserver
	if kubeDeps.EventClient != nil {
		glog.V(4).Infof("zoux-test Sending events to api server.")
		eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
	} else {
		glog.Warning("zoux-test No api server defined - no events will be sent to API server.")
	}
}

接下来根据这四个点进行深入


2 初始化 EventBroadcaster

初始化 EventBroadcaster就是生成了一个eventBroadcasterImpl对象。该对象是NewBroadcaster的实现类。

// Creates a new event broadcaster.
func NewBroadcaster() EventBroadcaster {
	return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration}
}

const maxTriesPerEvent = 12
var defaultSleepDuration = 10 * time.Second
const maxQueuedEvents = 1000


// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
type EventBroadcaster interface {
	// StartEventWatcher starts sending events received from this EventBroadcaster to the given
	// event handler function. The return value can be ignored or used to stop recording, if
	// desired.
	StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface

	// StartRecordingToSink starts sending events received from this EventBroadcaster to the given
	// sink. The return value can be ignored or used to stop recording, if desired.
	StartRecordingToSink(sink EventSink) watch.Interface

	// StartLogging starts sending events received from this EventBroadcaster to the given logging
	// function. The return value can be ignored or used to stop recording, if desired.
	StartLogging(logf func(format string, args ...interface{})) watch.Interface

	// NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster
	// with the event source set to the given event source.
	NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder
}

NewBroadcaster对象是个接口类型,该接口有以下四个方法:

  • StartEventWatcher() : 接收各模块产生的 events,根据调用者的处理函数就行事件处理。
  • StartRecordingToSink() : 调用 StartEventWatcher() 接收 events,并将收到的 events 发送到 apiserver
  • StartLogging() :也是调用 StartEventWatcher() 接收 events,然后保存 events 到日志
  • NewRecorder() :会创建一个指定 EventSource 的 EventRecorder,EventSource 指明了哪个节点的哪个组件

这个有点类型于设计模式中的观察者模式

Broadcaster 是一个中心,它有俩个处理事件的函数:

第一个是StartLogging函数,这个就是直接打印处理日志。

第二个就是StartRecordingToSink函数,这个就是将事件发送到apiserver。

有了处理方式,接下来就是事件从哪里来的问题。NewRecorder就是干这个的。

NewRecorder 就是一个一个客户端,这个客户端通过chan忘中心发送事件用于处理。


接下来就是看eventBroadcasterImpl这个对象是怎样实现这套机制(四个函数)的。


2.1 NewRecorder

作用:生成一个事件客户端。比如kubelet的 statusManager就是一个客户端。它只负责产生事件,并且将事件送到 kubelet这个broadcaster就行。

NewRecorder其实返回一个recorderImpl对象。

// NewRecorder returns an EventRecorder that records events with the given event source.
func (eventBroadcaster *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder {
   return &recorderImpl{scheme, source, eventBroadcaster.Broadcaster, clock.RealClock{}}
}


type recorderImpl struct {
	scheme *runtime.Scheme
	source v1.EventSource
	*watch.Broadcaster
	clock clock.Clock
}


recorderImpl这个对象一个核心函数就是generateEvent。

func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {
	ref, err := ref.GetReference(recorder.scheme, object)
	if err != nil {
		glog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
		return
	}

	if !validateEventType(eventtype) {
		glog.Errorf("Unsupported event type: '%v'", eventtype)
		return
	}

	event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
	event.Source = recorder.source
 
  // 这里对事件进行了发送
	go func() {
		// NOTE: events should be a non-blocking operation
		defer utilruntime.HandleCrash()
		recorder.Action(watch.Added, event)
	}()
}

// 往Broadcaster发送事件
// Action distributes the given event among all watchers.
func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
	m.incoming <- Event{action, obj}
}

其他的函数,Event,AnnotatedEventf等最终都是调用generateEvent,发送事件到Broadcaster的chan。

func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
	recorder.generateEvent(object, nil, metav1.Now(), eventtype, reason, message)
}


func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
	recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}


func (recorder *recorderImpl) PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) {
	recorder.generateEvent(object, nil, timestamp, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}


func (recorder *recorderImpl) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
	recorder.generateEvent(object, annotations, metav1.Now(), eventtype, reason, fmt.Sprintf(messageFmt, args...))
}


func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
	t := metav1.Time{Time: recorder.clock.Now()}
	namespace := ref.Namespace
	if namespace == "" {
		namespace = metav1.NamespaceDefault
	}
	return &v1.Event{
		ObjectMeta: metav1.ObjectMeta{
			Name:        fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
			Namespace:   namespace,
			Annotations: annotations,
		},
		InvolvedObject: *ref,
		Reason:         reason,
		Message:        message,
		FirstTimestamp: t,
		LastTimestamp:  t,
		Count:          1,
		Type:           eventtype,
	}
}


了解了客户端是如何发送事件到事件中心的,接下来看看事件中心是如何处理的。

2.2 StartEventWatcher

从这个函数的实现很明显可以看出来,这个函数就是变量 一个channel,然后调用处理函数进行处理。

// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value can be ignored or used to stop recording, if desired.
func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
	watcher := eventBroadcaster.Watch()
	go func() {
		defer utilruntime.HandleCrash()
		for watchEvent := range watcher.ResultChan() {
			event, ok := watchEvent.Object.(*v1.Event)
			if !ok {
				// This is all local, so there's no reason this should
				// ever happen.
				continue
			}
			eventHandler(event)
		}
	}()
	return watcher
}

2.3 StartLogging

调用 StartEventWatcher() 接收 events,并将收到的 events 打印出来

// StartLogging starts sending events received from this EventBroadcaster to the given logging function.
// The return value can be ignored or used to stop recording, if desired.
func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
   return eventBroadcaster.StartEventWatcher(
      func(e *v1.Event) {
         logf("zoux-test Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
      })
}

例如在kubelet的日志中:

I0626 05:58:27.213752   36267 server.go:460] zoux-test Event(v1.ObjectReference{Kind:"Node", Namespace:"", Name:"10.212.1.4", UID:"10.212.1.4", APIVersion:"", ResourceVersion:"", FieldPath:""}): type: 'Warning' reason: 'MissingClusterDNS' kubelet does not have ClusterDNS IP configured and cannot create Pod using "ClusterFirst" policy. Falling back to "Default" policy.

2.4 StartRecordingToSink

调用 StartEventWatcher() 接收 events,并将收到的 events 发送到 apiserver

可以看出来StartEventWatcher的参数是一个函数。recordToSink

// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
// The return value can be ignored or used to stop recording, if desired.
// TODO: make me an object with parameterizable queue length and retry interval
func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
	// The default math/rand package functions aren't thread safe, so create a
	// new Rand object for each StartRecording call.
	randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
	eventCorrelator := NewEventCorrelator(clock.RealClock{})
	return eventBroadcaster.StartEventWatcher(
		func(event *v1.Event) {
			recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration)
		})
}

这里每个事件的处理就是调用 recordToSink 函数进行处理


3.上传事件到apiserver-recordToSink

总结起来分为以下三步:

(1)对事件进行聚合

(2)通过令牌桶算法对事件发送速度进行控制

(3)调用rest-api进行事件发送

func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand, sleepDuration time.Duration) {
	// Make a copy before modification, because there could be multiple listeners.
	// Events are safe to copy like this.
	eventCopy := *event
	event = &eventCopy
	glog.Errorf("zoux-test event is '%#v'", event)
	// 对事件进行聚合和获取令牌
	result, err := eventCorrelator.EventCorrelate(event)
	if err != nil {
		utilruntime.HandleError(err)
	}
	glog.Errorf("zoux-test result is '%#v'", result)
	// 如果没有获得令牌就直接跳过
	if result.Skip {
		glog.Errorf("zoux-test event '%#v' is Skipped",event)
		return
	}
	// 最多重试12次, const maxTriesPerEvent = 12
	tries := 0
	for {
		glog.Errorf("zoux-test starting send event '%#v' ",event)
		// 上传到apiserver
		if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
			break
		}
		glog.Errorf("zoux-test starting send event  failed'%#v' ",event)
		tries++
		if tries >= maxTriesPerEvent {
			glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
			break
		}
		// Randomize the first sleep so that various clients won't all be
		// synced up if the master goes down.
		if tries == 1 {
			time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64()))
		} else {
			time.Sleep(sleepDuration)
		}
	}
}


// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
	if newEvent == nil {
		return nil, fmt.Errorf("event is nil")
	}
	// 1.先聚合。这里分为了两步,EventAggregate和eventObserve
	aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
	observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
	
	// 2.通过filter函数,就是令牌桶算法判断是否有足够令牌可以够事件发送
	if c.filterFunc(observedEvent) {
		return &EventCorrelateResult{Skip: true}, nil
	}
	return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}

3.1 事件聚合

事件聚合分为了两步,EventAggregate和eventObserv。EventAggregate主要做了以下的事情:

第一步,基于 event 构建 key。事件聚合的一个关键就是比较俩个事件是不是同一个类型的事件,如果是同一个类型的事件才可能聚合。这里构造的key就是用于判断是否是一类事件。

这里有来个key,一个是aggregateKey:event.Source, event.InvolvedObject, event.Typeevent.Reason拼接而成

另一个是localKey:event.Message

第二步, 通过 aggregateKey 从 cache 中获取缓存的 record ,或新建一个 record。

第三步, 将第一步构建出的 localKey 加入 上一步得到的 record 的 localKeys,并更新缓存。record保存了同一 aggregateKey 下的事件的 message。所以就是 aggregateKey并且 localKey都相同的是一类的。

第四步,若当前 record 的record.localKeys的长度小于 maxEvents(默认10),即同一 aggregateKey 下的 localKeys 数量没达到最大阈值,这代表不用对该事件进行聚合,直接返回传入的 event 不做需改。 如果超过maxEvents等等话,当record.localKeys长度大于maxEvents,说明此时在一段时间内同一 reason 产生的事件较多,这时会先从 localKeys 中删除最老的值(PopAny)以保证长度不会大于maxEvents,并对事件进行聚合,组装一个新的事件返回,并以 aggregateKey 作为 cache key 返回。

/ EventAggregate checks if a similar event has been seen according to the
// aggregation configuration (max events, max interval, etc) and returns:
//
// - The (potentially modified) event that should be created
// - The cache key for the event, for correlation purposes. This will be set to
//   the full key for normal events, and to the result of
//   EventAggregatorMessageFunc for aggregate events.
func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
	now := metav1.NewTime(e.clock.Now())
	var record aggregateRecord
	
	// eventKey is the full cache key for this event
	eventKey := getEventKey(newEvent)
	// aggregateKey is for the aggregate event, if one is needed.
	// 1.这里的keyFunc就是EventAggregatorByReasonFunc
	aggregateKey, localKey := e.keyFunc(newEvent)

	// Do we have a record of similar events in our cache?
	e.Lock()
	defer e.Unlock()
	value, found := e.cache.Get(aggregateKey)
	if found {
		record = value.(aggregateRecord)
	}

	// Is the previous record too old? If so, make a fresh one. Note: if we didn't
	// find a similar record, its lastTimestamp will be the zero value, so we
	// create a new one in that case.
	maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
	interval := now.Time.Sub(record.lastTimestamp.Time)
	if interval > maxInterval {
		record = aggregateRecord{localKeys: sets.NewString()}
	}

	// Write the new event into the aggregation record and put it on the cache
	record.localKeys.Insert(localKey)
	record.lastTimestamp = now
	e.cache.Add(aggregateKey, record)

	// If we are not yet over the threshold for unique events, don't correlate them
	if uint(record.localKeys.Len()) < e.maxEvents {
		return newEvent, eventKey
	}

	// do not grow our local key set any larger than max
	record.localKeys.PopAny()

	// create a new aggregate event, and return the aggregateKey as the cache key
	// (so that it can be overwritten.)
	eventCopy := &v1.Event{
		ObjectMeta: metav1.ObjectMeta{
			Name:      fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
			Namespace: newEvent.Namespace,
		},
		Count:          1,
		FirstTimestamp: now,
		InvolvedObject: newEvent.InvolvedObject,
		LastTimestamp:  now,
		Message:        e.messageFunc(newEvent),   //就是聚合后的mess会增加 (combined from similar events): 字段
		Type:           newEvent.Type,
		Reason:         newEvent.Reason,
		Source:         newEvent.Source,
	}
	return eventCopy, aggregateKey
}

// EventAggregratorByReasonMessageFunc returns an aggregate message by prefixing the incoming message
func EventAggregatorByReasonMessageFunc(event *v1.Event) string {
	return "(combined from similar events): " + event.Message
}

// EventAggregatorByReasonFunc aggregates events by exact match on event.Source, event.InvolvedObject, event.Type and event.Reason
func EventAggregatorByReasonFunc(event *v1.Event) (string, string) {
	return strings.Join([]string{
		event.Source.Component,
		event.Source.Host,
		event.InvolvedObject.Kind,
		event.InvolvedObject.Namespace,
		event.InvolvedObject.Name,
		string(event.InvolvedObject.UID),
		event.InvolvedObject.APIVersion,
		event.Type,
		event.Reason,
	},
		""), event.Message
}


eventObserve处理逻辑如下:

调用e.lastEventObservationFromCache(key)查询 cache,如果缓存不为空,那么要对事件 count+1,并进行 merge 生成 patch。 \最后将事件的 eventLog 加入缓存

// eventObserve records an event, or updates an existing one if key is a cache hit
func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) {
	var (
		patch []byte
		err   error
	)
	eventCopy := *newEvent
	event := &eventCopy

	e.Lock()
	defer e.Unlock()

	// Check if there is an existing event we should update
	lastObservation := e.lastEventObservationFromCache(key)

	// If we found a result, prepare a patch
	if lastObservation.count > 0 {
		// update the event based on the last observation so patch will work as desired
		event.Name = lastObservation.name
		event.ResourceVersion = lastObservation.resourceVersion
		event.FirstTimestamp = lastObservation.firstTimestamp
		event.Count = int32(lastObservation.count) + 1

		eventCopy2 := *event
		eventCopy2.Count = 0
		eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(0, 0))
		eventCopy2.Message = ""

		newData, _ := json.Marshal(event)
		oldData, _ := json.Marshal(eventCopy2)
		patch, err = strategicpatch.CreateTwoWayMergePatch(oldData, newData, event)
	}

	// record our new observation
	e.cache.Add(
		key,
		eventLog{
			count:           uint(event.Count),
			firstTimestamp:  event.FirstTimestamp,
			name:            event.Name,
			resourceVersion: event.ResourceVersion,
		},
	)
	return event, patch, err
}


3.2 事件过滤/获取令牌

这里需要注意的是:如果当前没有足够的令牌给一个事件,这个事件并不会等待,而是直接跳过,这个就是造成apiserver没有收到event的根本原因

令牌桶发送事件的步骤:

  1. 根据事件获得对应的令牌桶。

k8s中每一个对象都有一个令牌桶。一个pod就是一个对象,其实就是每个pod都有一个令牌桶存放在kubelet的缓存中。

如果当前对象没有令牌桶,这里会自动生成一个。

令牌桶的核心参数:qps: 1300, burst: 25,tokens, durTime。令牌桶的核心参数说明:

qps: 1300 表示300秒产生一个令牌。

burst: 25 表示令牌桶的大小为25。

tokens表示上一次记录时还有多少令牌可用

durTime 表示距离上一次记录经过了多少事件


  1. 根据令牌桶的四个核心参数,判断该event是否可用进行发送

计算逻辑如下:

(1)得到这次需要发送事件的数量,count

(2)得到当前可用的令牌, curToken = tokens + durTime*qps

(3)如果count <= curToken, 可以发送;否则就直接跳过。

主要调用链路如下:

// Filter controls that a given source+object are not exceeding the allowed rate.
func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
	var record spamRecord

	// controls our cached information about this event (source+object)
	eventKey := getSpamKey(event)

	glog.Errorf("zoux-test eventKey is '%#v'", eventKey)
	// do we have a record of similar events in our cache?
	f.Lock()
	defer f.Unlock()
	value, found := f.cache.Get(eventKey)
	if found {
		record = value.(spamRecord)
		glog.Errorf("zoux-test found record.rateLimiter is '%#v'", record.rateLimiter, record.rateLimiter)
	}

	// verify we have a rate limiter for this record
	if record.rateLimiter == nil {
		record.rateLimiter = flowcontrol.NewTokenBucketRateLimiterWithClock(f.qps, f.burst, f.clock)
	}
	glog.Errorf("zoux-test record.rateLimiter is '%#v'", record.rateLimiter)

	// ensure we have available rate
	filter := !record.rateLimiter.TryAccept()

	glog.Errorf("zoux-test filter is '%#v'", filter)


	// update the cache
	f.cache.Add(eventKey, record)

	return filter
}

3.3 调用rest接口进行事件发送

这个依赖于第二步的判断,如果令牌桶没有可用的令牌,这个事件会被跳过。核心代码如下:

sink.Patch, sink.Create就是具体的发送函数。

// recordEvent attempts to write event to a sink. It returns true if the event
// was successfully recorded or discarded, false if it should be retried.
// If updateExistingEvent is false, it creates a new event, otherwise it updates
// existing event.
func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
	var newEvent *v1.Event
	var err error
	if updateExistingEvent {
		glog.Errorf("zoux-test event '%#v' is exist",event)
		newEvent, err = sink.Patch(event, patch)
	}
	// Update can fail because the event may have been removed and it no longer exists.
	if !updateExistingEvent || (updateExistingEvent && isKeyNotFoundError(err)) {
		// Making sure that ResourceVersion is empty on creation
		event.ResourceVersion = ""
		newEvent, err = sink.Create(event)
		glog.Errorf("zoux-test newEvent '%#v' , err is %v",newEvent,err)
	}
	if err == nil {
		// we need to update our event correlator with the server returned state to handle name/resourceversion
		eventCorrelator.UpdateState(newEvent)
		return true
	}


	// If we can't contact the server, then hold everything while we keep trying.
	// Otherwise, something about the event is malformed and we should abandon it.
	switch err.(type) {
	case *restclient.RequestConstructionError:
		// We will construct the request the same next time, so don't keep trying.
		glog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
		return true
	case *errors.StatusError:
		if errors.IsAlreadyExists(err) {
			glog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
		} else {
			glog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
		}
		return true
	case *errors.UnexpectedObjectError:
		// We don't expect this; it implies the server's response didn't match a
		// known pattern. Go ahead and retry.
	default:
		// This case includes actual http transport errors. Go ahead and retry.
	}
	glog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
	return false
}

// k8s.io/client-go/kubernetes/typed/core/v1/event_expansion.go
func (e *EventSinkImpl) Patch(event *v1.Event, data []byte) (*v1.Event, error) {
	return e.Interface.PatchWithEventNamespace(event, data)
}


// 通过rest接口发送了事件
// PatchWithEventNamespace modifies an existing event. It returns the copy of
// the event that the server returns, or an error. The namespace and name of the
// target event is deduced from the incompleteEvent. The namespace must either
// match this event client's namespace, or this event client must have been
// created with the "" namespace.
func (e *events) PatchWithEventNamespace(incompleteEvent *v1.Event, data []byte) (*v1.Event, error) {
	glog.Errorf("zoux-test v1 starting patch event in PatchWithEventNamespace")
	if e.ns != "" && incompleteEvent.Namespace != e.ns {
		return nil, fmt.Errorf("can't patch an event with namespace '%v' in namespace '%v'", incompleteEvent.Namespace, e.ns)
	}
	result := &v1.Event{}
	err := e.client.Patch(types.StrategicMergePatchType).
		NamespaceIfScoped(incompleteEvent.Namespace, len(incompleteEvent.Namespace) > 0).
		Resource("events").
		Name(incompleteEvent.Name).
		Body(data).
		Do().
		Into(result)
	glog.Errorf("zoux-test v1 post event in PatchWithEventNamespace result is '%#v'", result)
	return result, err
}


4. 有趣的现象

基于上面的分析,可以看出来,令牌桶的qps非常低,那这样会不会出现某些情况下,大量的无用事件使用了令牌,导致有用的事件无令牌可用从而不能发送事件呢。答案是可以的。构造以下场景。

构造场景:创建一个pod, 该pod有10个容器(),并且某个容器会让pod OOM。

目的就是让pull ,create container等事件消耗令牌,然后后面的oom等事件因为没有令牌而被跳过。

pod yaml如下:

apiVersion: v1
kind: Pod
metadata:
  name: nginx
spec:
  hostNetwork: true
  containers:
    - name: busbox
      image: polinux/stress
      imagePullPolicy: IfNotPresent
      resources:
        limits:
          cpu: 1
          memory: 200Mi
        requests:
          cpu: 1
          memory: 100Mi
      command: ["stress"]
      args: ["--vm", "1", "--vm-bytes", "3000M", "--vm-hang", "1"]
    - name: busbox1
      image: busybox
      imagePullPolicy: IfNotPresent
      command:
        - sleep
        - "36000"
    - name: busbox2
      image: busybox
      imagePullPolicy: IfNotPresent
      command:
        - sleep
        - "36000"
    - name: busbox3
      image: busybox
      imagePullPolicy: IfNotPresent
      command:
        - sleep
        - "36000"
    - name: busbox4
      image: busybox
      imagePullPolicy: IfNotPresent
      command:
        - sleep
        - "36000"
    - name: busbox5
      image: busybox
      imagePullPolicy: IfNotPresent
      command:
        - sleep
        - "36000"
    - name: busbox6
      image: busybox
      imagePullPolicy: IfNotPresent
      command:
        - sleep
        - "36000"
    - name: busbox7
      image: busybox
      imagePullPolicy: IfNotPresent
      command:
        - sleep
        - "36000"
    - name: busbox8
      image: busybox
      imagePullPolicy: IfNotPresent
      command:
        - sleep
        - "36000"
    - name: busbox9
      image: busybox
      imagePullPolicy: IfNotPresent
      command:
        - sleep
        - "36000"

创建pod的后,进行以下的观察(多开几个窗口同时观察):

(1)观察pod的状态, 发现pod OOMKilled, CrashLoopBackOff

root@cld-pmaster1-1699:/home/zouxiang# kubectl get pod -w


NAME    READY   STATUS      RESTARTS   AGE
nginx   9/10    OOMKilled   0          22s
nginx   9/10   OOMKilled   1     22s
nginx   9/10   CrashLoopBackOff   1     22s
nginx   10/10   Running   2     36s
nginx   9/10   OOMKilled   2     37s
nginx   9/10   CrashLoopBackOff   2     49s

(2)kubectl describe pod nginx

这里只显示event,发现一个事情,oom的事件没有出现。

Events:
  Type    Reason     Age   From                    Message
  ----    ------     ----  ----                    -------
  Normal  Scheduled  53s   default-scheduler       Successfully assigned default/nginx to 10.212.195.74
  Normal  Pulled     52s   kubelet, 10.212.195.74  Container image "polinux/stress" already present on machine
  Normal  Created    51s   kubelet, 10.212.195.74  Created container
  Normal  Started    51s   kubelet, 10.212.195.74  Started container
  Normal  Pulled     51s   kubelet, 10.212.195.74  Container image "busybox" already present on machine
  Normal  Created    50s   kubelet, 10.212.195.74  Created container
  Normal  Pulled     49s   kubelet, 10.212.195.74  Container image "busybox" already present on machine
  Normal  Started    49s   kubelet, 10.212.195.74  Started container
  Normal  Created    48s   kubelet, 10.212.195.74  Created container
  Normal  Started    47s   kubelet, 10.212.195.74  Started container
  Normal  Pulled     47s   kubelet, 10.212.195.74  Container image "busybox" already present on machine
  Normal  Created    47s   kubelet, 10.212.195.74  Created container
  Normal  Started    46s   kubelet, 10.212.195.74  Started container
  Normal  Pulled     46s   kubelet, 10.212.195.74  Container image "busybox" already present on machine
  Normal  Created    45s   kubelet, 10.212.195.74  Created container
  Normal  Started    44s   kubelet, 10.212.195.74  Started container
  Normal  Pulled     44s   kubelet, 10.212.195.74  Container image "busybox" already present on machine
  Normal  Created    44s   kubelet, 10.212.195.74  Created container
  Normal  Started    43s   kubelet, 10.212.195.74  Started container
  Normal  Pulled     43s   kubelet, 10.212.195.74  Container image "busybox" already present on machine
  Normal  Created    42s   kubelet, 10.212.195.74  Created container
  Normal  Started    41s   kubelet, 10.212.195.74  Started container
  Normal  Pulled     41s   kubelet, 10.212.195.74  Container image "busybox" already present on machine
  Normal  Created    41s   kubelet, 10.212.195.74  Created container
  Normal  Started    39s   kubelet, 10.212.195.74  Started container
  Normal  Pulled     39s   kubelet, 10.212.195.74  Container image "busybox" already present on machine

(3)kubectl get event -w 也发现oom事件

root@cld-pmaster1-1699:/home/zouxiang# kubectl get event -w


0s    Normal   Scheduled   Pod   Successfully assigned default/nginx to 10.212.195.74
0s    Normal   Pulled   Pod   Container image "polinux/stress" already present on machine
0s    Normal   Created   Pod   Created container
0s    Normal   Started   Pod   Started container
0s    Normal   Pulled   Pod   Container image "busybox" already present on machine
0s    Normal   Created   Pod   Created container
0s    Normal   Started   Pod   Started container
0s    Normal   Pulled   Pod   Container image "busybox" already present on machine
0s    Normal   Created   Pod   Created container
0s    Normal   Started   Pod   Started container
0s    Normal   Pulled   Pod   Container image "busybox" already present on machine
0s    Normal   Created   Pod   Created container
0s    Normal   Started   Pod   Started container
0s    Normal   Pulled   Pod   Container image "busybox" already present on machine
0s    Normal   Created   Pod   Created container
0s    Normal   Started   Pod   Started container
0s    Normal   Pulled   Pod   Container image "busybox" already present on machine
0s    Normal   Created   Pod   Created container
0s    Normal   Started   Pod   Started container
0s    Normal   Pulled   Pod   Container image "busybox" already present on machine
0s    Normal   Created   Pod   Created container
0s    Normal   Started   Pod   Started container
0s    Normal   Pulled   Pod   Container image "busybox" already present on machine
0s    Normal   Created   Pod   Created container
0s    Normal   Started   Pod   Started container
0s    Normal   Pulled   Pod   Container image "busybox" already present on machine

(4)查看kubelet日志,这里我增加了很多自己的日志。具体可以参考2.4节中的recordToSink函数

// 创建容器的日志被跳过了
E0626 21:21:31.605289   36267 event.go:140] zoux-test event '&v1.Event{TypeMeta:v1.TypeMeta{Kind:"", APIVersion:""}, ObjectMeta:v1.ObjectMeta{Name:"nginx.168c247e91d19615", GenerateName:"", Namespace:"default", SelfLink:"", UID:"", ResourceVersion:"", Generation:0, CreationTimestamp:v1.Time{Time:time.Time{wall:0x0, ext:0, loc:(*time.Location)(nil)}}, DeletionTimestamp:(*v1.Time)(nil), DeletionGracePeriodSeconds:(*int64)(nil), Labels:map[string]string(nil), Annotations:map[string]string(nil), OwnerReferences:[]v1.OwnerReference(nil), Initializers:(*v1.Initializers)(nil), Finalizers:[]string(nil), ClusterName:""}, InvolvedObject:v1.ObjectReference{Kind:"Pod", Namespace:"default", Name:"nginx", UID:"570f873f-d681-11eb-921a-52540019dd70", APIVersion:"v1", ResourceVersion:"11565452", FieldPath:"spec.containers{busbox}"}, Reason:"Created", Message:"Created container", Source:v1.EventSource{Component:"kubelet", Host:"10.212.195.74"}, FirstTimestamp:v1.Time{Time:time.Time{wall:0xc02de856e40cc815, ext:169113605992047, loc:(*time.Location)(0x65a7120)}}, LastTimestamp:v1.Time{Time:time.Time{wall:0xc02de856e40cc815, ext:169113605992047, loc:(*time.Location)(0x65a7120)}}, Count:1, Type:"Normal", EventTime:v1.MicroTime{Time:time.Time{wall:0x0, ext:0, loc:(*time.Location)(nil)}}, Series:(*v1.EventSeries)(nil), Action:"", Related:(*v1.ObjectReference)(nil), ReportingController:"", ReportingInstance:""}' is Skipped

// Started container的事件也被跳过了
E0626 21:21:32.260935   36267 event.go:140] zoux-test event '&v1.Event{TypeMeta:v1.TypeMeta{Kind:"", APIVersion:""}, ObjectMeta:v1.ObjectMeta{Name:"nginx.168c247eb8e6cfb1", GenerateName:"", Namespace:"default", SelfLink:"", UID:"", ResourceVersion:"", Generation:0, CreationTimestamp:v1.Time{Time:time.Time{wall:0x0, ext:0, loc:(*time.Location)(nil)}}, DeletionTimestamp:(*v1.Time)(nil), DeletionGracePeriodSeconds:(*int64)(nil), Labels:map[string]string(nil), Annotations:map[string]string(nil), OwnerReferences:[]v1.OwnerReference(nil), Initializers:(*v1.Initializers)(nil), Finalizers:[]string(nil), ClusterName:""}, InvolvedObject:v1.ObjectReference{Kind:"Pod", Namespace:"default", Name:"nginx", UID:"570f873f-d681-11eb-921a-52540019dd70", APIVersion:"v1", ResourceVersion:"11565452", FieldPath:"spec.containers{busbox}"}, Reason:"Started", Message:"Started container", Source:v1.EventSource{Component:"kubelet", Host:"10.212.195.74"}, FirstTimestamp:v1.Time{Time:time.Time{wall:0xc02de8570f8737b1, ext:169114261694459, loc:(*time.Location)(0x65a7120)}}, LastTimestamp:v1.Time{Time:time.Time{wall:0xc02de8570f8737b1, ext:169114261694459, loc:(*time.Location)(0x65a7120)}}, Count:1, Type:"Normal", EventTime:v1.MicroTime{Time:time.Time{wall:0x0, ext:0, loc:(*time.Location)(nil)}}, Series:(*v1.EventSeries)(nil), Action:"", Related:(*v1.ObjectReference)(nil), ReportingController:"", ReportingInstance:""}' is Skipped

// BackOff的事件也被跳过了
E0626 21:21:33.554220   36267 event.go:140] zoux-test event '&v1.Event{TypeMeta:v1.TypeMeta{Kind:"", APIVersion:""}, ObjectMeta:v1.ObjectMeta{Name:"nginx.168c247f05fbd51f", GenerateName:"", Namespace:"default", SelfLink:"", UID:"", ResourceVersion:"", Generation:0, CreationTimestamp:v1.Time{Time:time.Time{wall:0x0, ext:0, loc:(*time.Location)(nil)}}, DeletionTimestamp:(*v1.Time)(nil), DeletionGracePeriodSeconds:(*int64)(nil), Labels:map[string]string(nil), Annotations:map[string]string(nil), OwnerReferences:[]v1.OwnerReference(nil), Initializers:(*v1.Initializers)(nil), Finalizers:[]string(nil), ClusterName:""}, InvolvedObject:v1.ObjectReference{Kind:"Pod", Namespace:"default", Name:"nginx", UID:"570f873f-d681-11eb-921a-52540019dd70", APIVersion:"v1", ResourceVersion:"11565452", FieldPath:"spec.containers{busbox}"}, Reason:"BackOff", Message:"Back-off restarting failed container", Source:v1.EventSource{Component:"kubelet", Host:"10.212.195.74"}, FirstTimestamp:v1.Time{Time:time.Time{wall:0xc02de8576101731f, ext:169115554917743, loc:(*time.Location)(0x65a7120)}}, LastTimestamp:v1.Time{Time:time.Time{wall:0xc02de8576101731f, ext:169115554917743, loc:(*time.Location)(0x65a7120)}}, Count:1, Type:"Warning", EventTime:v1.MicroTime{Time:time.Time{wall:0x0, ext:0, loc:(*time.Location)(nil)}}, Series:(*v1.EventSeries)(nil), Action:"", Related:(*v1.ObjectReference)(nil), ReportingController:"", ReportingInstance:""}' is Skipped

结论: 当前情况下,qps太小会导致某些事件上传不了apiserver。


5. 总结

(1)分析了kubulet的事件处理机制。包括聚合和过滤等流程。这个不仅仅可以有利于自己对kubelet的了解,还可以借鉴这种处理的思想。

(2)过滤流程发现了一个很有趣的现象,这个在某些情况下是非常致命的。例如集群依赖event做一些重要的告警。这样重要的事件如果没有上传就回导致告警失败。