Client-go是认识Kubernetes开发接口的良好载体,理解Client-go的运行机制有助于理解Kubernetes的运行机制。其本身的设计模式和理念也值得学习和借鉴。
ResourceVersion
etcd默认保留5分钟以内的变更记录,每个资源发生变更都会更新一个更大的资源版本ResourceVersion,ResourceVersion是一个所有资源类型共享的全局变量。
对于watch请求来说,你可以指定一个resourceVersion=0来获取5分钟以内的任意变更记录及其之后,这种表现很奇怪,所以不建议指定0。可以指定一个resourceVersion来获取这个资源版本之后的变更记录,但这个资源版本早于5分钟以内保留的最小版本,则会回复一个410状态码,如果大于最大版本,则可能会一直等下去,直到超时。
对于list,请求后会返回一个Kind=XXList的资源类型,XXList这种资源类型是按照惯例附带创建的,比如Pod和PodList,如果你写过CRD应该能明白了;items字段内包含资源列表,metadata包含的了resourceVersion,但这个resourceVersion是PodList的资源版本,而不是Pod的资源版本,指定resourceVersion=0来获取任意的PodList,也可以指定一个resourceVersion来获取这个资源版本或之后的PodList,如果指定的resourceVersion小于当前最新资源版本,它总是返回最新的PodList,如果大于则返回504状态码。但如果你指定了limit参数或resourceVersionMatch=Excat,就意味着apiserver必须精准匹配你填写的resourceVersion,这时候就和watch一样了,如果找不到指定的resourceVersion(可能是超过了5分钟),则会返回410状态码。关于resourceVersion的解释可以看官方文档,虽然官方文档写得不清晰,但你可以结合我上面说的来理解。
变更事件有四种:ADD, DELETE, MODIFY, BOOKMARK。前面三个容易看懂,但第四个BOOKMARK是干什么的?正如前面所说etcd只保留5分钟的变更记录,万一客户端很长时间内都没有watch到变更,然后断连之后又重连到apiserver时,客户端可能按常规的把上次收到的resourceVersion传到url里,但这个resourceVersion已经是一个过期的资源版本,apiserver找不到资源版本,就会回复一个410状态码。那么这时客户端为了能获取最新的资源版本号就不得不先list一次。为了防止这种情况,apiserver会定期发送BOOKMARK事件,BOOKMARK将包含一个当前最新的资源版本号,尽管这个版本号对应的资源类型并不是你监听的那种,但这样是为了客户端能更新最新的资源版本号。
API访问例子
这里提供API访问的例子,首先是list,通过resourceVersion=0来获取任意版本的PodList 1
2
3
4
5
6
7
8
9
10
11# curl -s "http://127.0.0.1:8001/api/v1/namespaces/default/pods?resourceVersion=0" 2>&1 | head
{
"kind": "PodList",
"apiVersion": "v1",
"metadata": {
"selfLink": "/api/v1/namespaces/default/pods",
"resourceVersion": "1084093"
},
"items": [
{
"metadata": {1
2
3
4
5
6
7
8
9# curl -s "http://127.0.0.1:8001/api/v1/namespaces/default/pods?resourceVersion=1083300&resourceVersionMatch=Exact" 2>&1 | head
{
"kind": "Status",
"apiVersion": "v1",
"metadata": {},
"status": "Failure",
"message": "The resourceVersion for the provided list is too old.",
"reason": "Expired",
"code": 4101
2
3
4
5
6
7
8
9
10
11
12
13# curl -s "http://127.0.0.1:8001/api/v1/namespaces/default/pods?watch=1&resourceVersion=1075698" 2>&1 | jq | head -n 15
{
"type": "MODIFIED",
"object": {
"kind": "Pod",
"apiVersion": "v1",
"metadata": {
"name": "centos",
"namespace": "default",
"selfLink": "/api/v1/namespaces/default/pods/centos",
"uid": "2c357511-a557-4246-b77c-4ffc5d8efcb4",
"resourceVersion": "1075702",
"creationTimestamp": "2021-08-14T09:36:26Z",1
2
3
4
5
6# curl -s "http://127.0.0.1:8001/api/v1/namespaces/default/pods?watch=1&resourceVersion=1075698" 2>&1 | jq .object.metadata.resourceVersion
"1075702"
"1075740"
"1075774"
"1083264"
"1083273"
Informer
Informer的意思是通知器,内部会以list和watch形式请求API Server,来监控对应资源,当资源有更新时就会调用对应处理函数。一个Informer只处理一种资源类型。 1
2
3
4
5
6
7
8
9
10podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceAll, fields.Everything())
store, controller := cache.NewInformer(podListWatcher, &v1.Pod{}, 0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
UpdateFunc: func(old interface{}, new interface{}) {},
DeleteFunc: func(obj interface{}) {},
})
controller.Run(stopCh)
controller.Run()
之后就会阻塞协程,一直等到回调处理函数。
Store
NewInformer时返回了一个store,顾名思义就是存储了资源信息的存储器, 1
2
3
4
5
6
7
8
9
10type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
Replace([]interface{}, string) error
Resync() error
DeltaFIFO
这个结构体由两部分组成,一个是Delta,所有更新的对象都会包装到这个结构里面,并被DeltaFIFO用一个Map存储 [src], 1
2
3
4
5
6
7
8
9
10
11
12type Delta struct {
Type DeltaType // Delta类型,增、删、减、同步
Object interface{} // 对象
}
type DeltaType string // Delta的类型用字符串表达
const (
Added DeltaType = "Added" // 增加
Updated DeltaType = "Updated" // 更新
Deleted DeltaType = "Deleted" // 删除
Sync DeltaType = "Sync" // 同步
)1
2
3
4
5
6
7
8
type Queue interface {
Store
Pop(PopProcessFunc) (interface{}, error)
AddIfNotPresent(interface{}) error
HasSynced() bool
Close()
}
- items的值Deltas本质上是一个Delta数组,为什么是数组后面再解释;items的key则是对象的一个字符串表示,是通过另一个成员keyFunc算出来的,具体视传进来的函数是什么。
- queue是用一个字符串数组模拟队列,装的是对象的key。
1
2
3
4
5
6
7
8type DeltaFIFO struct {
// ...
items map[string]Deltas
keyFunc KeyFunc
queue []string
}
type Deltas []Delta
threadSafeMap
Informer的threadSafeMap会被DeltaFIFO所持有,成员变量是knownObjects,每次Resync时都会顺便遍历threadSafeMap,把它和最新的list作比较,已经不存在的对象会调用queueActionLocked()
触发一次Deleted
事件,以便于将其从threadSafeMap删除 [src]。其中,删除时并不是马上删除,而是在原本的对象上套一个DeletedFinalStateUnknown
结构体 [src]。
Pop
DeltaFIFO的关键方法Pop()
会被controller调用 [src],Pop函数会调用一个信号量一直阻塞 [src],直到队里中有元素,所有尝试往队列添加元素的操作后面都会用该信号量发起广播,来唤醒阻塞。Pop内部会调用传进来的process方法进行回调处理对象 [src],这个process方法的主体在这里可以看到,当尝试往threadSafeMap更新值失败时就会返回err,DeltaFIFO发现是ErrRequeue则会重新加入队列。
HasSynced
HasSynced()方法会检查第一次list资源后是否全部Pop出来 [src],Controller的HasSynced()其实就是调用了这个HasSynced() [src]。
官方的 client-go/examples/workqueue 例子里面,会一直等待这个函数返回true,才开始runWorker [src],再说具体一点就是,第一次list资源后,等到所有资源都从DeltaFIFO中pop出来,并且全部回调处理函数,例子中的处理函数就是把对象都放进workqueue里面,然后才开始从workqueue里面消费数据。
Resync
Resync()做的事情就是把threadSafeMap里面的所有对象都触发一次Sync事件 [src],很多Controller会定时调用以便于把资源同步到期望状态。理论上,如果你的资源的更新逻辑处理得足够好,其实是不用Resync的,Resync更像是一种弥补逻辑处理漏掉的万金油。还是要强调一下Resync不会访问apiserver,下面提到的ReList才会。
Deltas
Deltas作为一个数组其实是为了处理多时间内同一个对象被多次操作的情况。比如新增对象时产生一个Added Delta,然后短时间内又删除该对象,又产生了一个Deleted Delta;通过对象的key,如果发现队列中已经有这个对象,那么会把Delta插入它的Deltas数组里 [src]。
一个新Delta插入Deltas数组后会有一个合并操作,当数组最后两个元素都是Deleted Delta,意味着一个对象被连续执行两次删除操作,这是多余的,所以会去掉后面那个Delta。而为什么Add Delta不需要处理合并,是因为资源的namespace/name的唯一性由apiserver保证,如果namespace/name冲突,apiserver层面就会返回错误,轮不到client-go去处理。这里额外提一句,成员变量keyFunc大部分情况下都会传入cache.MetaNamespaceKeyFunc()
函数,它算出来的key的格式就是{namespace}/{name}
。
Indexer
Indexer就是索引器,是为Store建立索引的。 1
2
3
4
5
6
7
8type Indexer interface {
Store
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexKey string) ([]string, error)
ListIndexFuncValues(indexName string) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error1
items, err := indexer.Index("namespace", &metav1.ObjectMeta{Namespace: "default"})
NewIndexerInformer()
顺便创建出来。
threadSafeMap实现了一套比较复杂的索引分类,它支持外部用户通过AddIndexers()
接口传入计算索引key的函数,然后每次更新数据都会更新其索引,简单的说就是一个用若干Map实现的小型索引数据库。但目前K8s的所有使用场景下,只有两种索引Key计算函数,一个是cache.MetaNamespaceKeyFunc()
[src],以namespace建立索引,使得我们可以通过namespace获取其所有资源;另一种是indexByPodNodeName()
,只在deamon controller里面使用,以nodeName建立索引 [src]。
IndexerInformer
IndexerInformer与Informer不同的是,它需要在最后传入一个indexers,这个可以用默认的那个cache.Indexers{}
,其实它本质上就是一个Map,只是建了一个别名而已。IndexerInformer返回的是indexer而非store,但其实从源码看来,构造的时候都会构造一个threadSafeMap,只是返回了不同的抽象类型而已 [src]。 1
2
3
4
5
6
7
8indexer, controller := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
UpdateFunc: func(old interface{}, new interface{}) {},
DeleteFunc: func(obj interface{}) {},
}, cache.Indexers{})
controller.Run(stopCh)
Reflector
Reflector的工作就是通过Lister和Watcher得到的数据放进DeltaFIFO里。Reflector的数据类型如下,只保留几个关键的, 1
2
3
4
5
6type Reflector struct {
expectedType reflect.Type
store Store
listerWatcher ListerWatcher
resyncPeriod time.Duration
// ...v1.Pod{}
等。 - store 这里将传入DeltaFIFO - listerWatcher Lister和Watcher的结合体,负责从apiserver拉取列表和监听资源变化,这个结构具体后面再讲。 - resyncPeriod 全量同步的时间间隔
ListAndWatch
Reflector中的一个函数ListAndWatch()会负责三件事情,一件是list资源(从apiserver获取资源列表,并全量同步),第二件是开一个协程去定时resync(全量同步),第三件是watch监听资源。
第一次list资源会设置资源版本号为空 [src],旧版会设为0 [src],拉完后就更新资源版本 [src],后面watch的时候只要关心比这个资源版本大的资源。list的时候会把ListWatch对象包裹在pager对象里 [src],这个对象的作用是控制分页查询,比如资源对象太多时,为了防止过大的网络IO,pager可以通过控制url的limit和continue参数来指定一次请求获取的资源数量 [src]。
Resync的时候实际上会调用DeltaFIFO的Resync函数 [src],这个上面说过了。
watch的时候会开启一个死循环 [src],ListerWatcher会返回要一个watch对象及其内部的一条channel,没有数据时则一直阻塞监听channel,只要有新资源变化就会停止阻塞 [src],然后就根据事件类型往DeltaFIFO里面更新数据 [src],最后会更新最新资源版本。
每次向apiserver发起watch请求,如果大概8分钟内都没有任何事件,则apiserver会主动断开连接,断开连接则会关闭watch对象的channel [src],Reflector监听channel结束,然后会再次构建watch对象并发起watch请求。
ListAndWatch()会被Run()调用。Run()里面把ListAndWatch()包裹在了一个重试函数wait.Until()里面,ListAndWatch()正常情况下是死循环,一旦ListAndWatch()发送错误就会返回,wait.Until()在指定时间后又会重新执行ListAndWatch() [src]。这一步也叫所谓的ReList。再一次list资源时会尝试传入一个上次list到或最新watch到的资源版本,但并不保证可以成功list,比如watch到的Pod的资源版本和PodList的资源版本没有任何关联,Pod的更新不代表PodList的更新,这里只是尝试一下而已,如果list失败了就把url参数resourceVersion置为空 [src],这样就能拉最新的列表。
类型检查
Reflector是反射器的意思,Reflector确实做了反射,它把ListerWatcher得到的对象反射出具体对象,然后与成员期望对象expectedType进行比较 [src],若类型不符则不放入DeltaFIFO里。
ListerWatcher
ListerWatcher分别继承了Lister和Watcher的接口,而ListWatch结构体则实现了ListerWatcher接口, 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24type Lister interface {
List(options metav1.ListOptions) (runtime.Object, error)
}
type Watcher interface {
Watch(options metav1.ListOptions) (watch.Interface, error)
}
type ListerWatcher interface {
Lister
Watcher
}
// 结构体
type ListWatch struct {
ListFunc ListFunc
WatchFunc WatchFunc
}
// 仅调用成员函数变量
func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
return lw.ListFunc(options)
}
func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
return lw.WatchFunc(options)
}
ListWatch可以通过cache.NewListWatchFromClient()
构建 [src], 1
podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())
cache.NewFilteredListWatchFromClient()
构建的 [src],所谓的ListFunc和WatchFunc,内部其实是发起http请求,请求是由RestClient发起的。
注意的是cache.NewListWatchFromClient()
这个接口只是cache包里面提供的接口,但到了后面讲到SharedIndexInformerFactory的时候,内部资源接口都被封装好了,其内部不会用这个。
RestClient和restclient.Request
RestClient继承了http.Client,下面是其结构体,保留部分重要参数, 1
2
3
4
5
6
7
8
9
10
11type RESTClient struct {
base *url.URL
versionedAPIPath string
createBackoffMgr func() BackoffManager
rateLimiter flowcontrol.RateLimiter
Client *http.Client
// ...
}
- base:APIServer的地址,比如https://192.168.1.2:6443
- versionedAPIPath:资源访问路径,比如/apis/apps/v1
- createBackoffMgr:退避管理器的构造函数,这部分在workqueuq那里再详细讲
- rateLimiter:限速器,这部分在workqueuq那里再详细讲
- Client:标准库http客户端
RESTClient其实特别简单,最终还是会传入restclient.Request
结构体体里面,
1
2
3
4
5type Request struct {
c *RESTClient
pathPrefix string
// ...
}
Request通过函数式编程来构造最终的请求url,参考NewFilteredListWatchFromClient里面的实现: 1
2
3c.Get().Namespace(namespace).Resource(resource).VersionedParams(&options, metav1.ParameterCodec).Do(context.TODO()).Get()
c.Get().Namespace(namespace).Resource(resource).VersionedParams(&options, metav1.ParameterCodec).Watch(context.TODO())RESTClient.Get()
之后就返回一个restclient.Request
。对于Get请求,在Do()
内部会发起http请求,然后返回一个Result的结构体,[src],后面的Get()方法不是发起请求而是把得到的对象返回。
对于Watch,在Watch()
里面,还是发起http请求,只不过它是一个长连接,可以持续从http.Response读取数据 [src],这时会构造一个StreamWatcher对象 [src],它通过NewStreamWatcher()
方法构造,方法内部会开启一个协程无限循环并阻塞读取数据 [src],然后把读取的数据放入channel,提供给外部读取。
额外提一句,StreamWatcher的定义不在client-go里,而是在apimachinery这个仓库里,有兴趣的话可以了解一下这个仓库。
Controller
Controller的结构如下: 1
2
3
4
5
6
7
8
9
10
11
12
13
14type controller struct {
config Config
reflector *Reflector
// ..
}
type Config struct {
Queue
ListerWatcher
Process ProcessFunc
ObjectType runtime.Object
FullResyncPeriod time.Duration
// ...
}Process
函数里面。这一点在DeltaFIFO章节有提过,就不多说了。
上面提到的Informer和IndexInformer本质上就是一个Controller [src]。
SharedIndexInformer
SharedIndexInformer在IndexInformer基础上又加了一个Shared,我的理解是他把Controller和indexer整合在一起,并且可以传入多个handler,所以就叫SharedIndexInformer。
使用如下,只返回一个SharedIndexInformer。 1
2
3
4
5
6
7
8
9s := cache.NewSharedIndexInformer(podListWatcher, &v1.Pod{}, 0, cache.Indexers{})
s.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
UpdateFunc: func(old interface{}, new interface{}) {},
DeleteFunc: func(obj interface{}) {},
})
s.Run(stopCh)
结构如下: 1
2
3
4
5
6type sharedIndexInformer struct {
indexer Indexer
controller Controller
processor *sharedProcessor
// ...
}1
2
3
4
5
6
7
8type SharedInformer interface {
AddEventHandler(handler ResourceEventHandler)
GetStore() Store
GetController() Controller
Run(stopCh <-chan struct{})
HasSynced() bool
// ...
}AddEventHandler()
说明可以添加多个handler,每个handler会被一个ProcessListener结构包裹 [src],所有ProcessListener最终会放入成员变量processor里 [src]。如果SharedInformer已经Run()起来后,再给它新增handler,则会把indexer的所有对象都给该handler处理一次 [src]。
在Run()函数的工作就是构造controller并调用其Run()而已,controller的关键函数Process传进的是HandleDeltas() [src]。该函数会调用processor的distribute()函数,把对象分发给所有ProcessListener,并调用相关处理函数 [src]。
ProcessListener里面有个比较有意思的缓冲实现,当外部加入对象是会放进addCh,当外部处理时会从nextCh取,那么理论上只要弄一个channel就好了呀?但问题在于nextCh和用户自定义的handler是同一协程下执行的,用户可能写了效率很差的代码,导致从nextCh读取速度比写入速度慢,进而导致nextCh空间占满,最终导致阻塞。 1
2
3
4
5type processorListener struct {
nextCh chan interface{}
addCh chan interface{}
pendingNotifications buffer.RingGrowing
// ...
ProcessListener使用了一个pendingNotifications无限缓冲区,先把addCh得到的对象写入pendingNotifications,再把对象从pendingNotifications取出存入nextCh,源码里把这部分转换写得非常精妙,可以参考一下 [src]。
SharedInformerFactory
SharedInformerFactory可以说是k8s资源访问的完全集合体,它能访问所有官方的API资源。可以先看看SharedInformerFactory的接口 [src]: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
Admissionregistration() admissionregistration.Interface
Internal() apiserverinternal.Interface
Apps() apps.Interface
Autoscaling() autoscaling.Interface
Batch() batch.Interface
Certificates() certificates.Interface
Coordination() coordination.Interface
Core() core.Interface
Discovery() discovery.Interface
Events() events.Interface
Extensions() extensions.Interface
Flowcontrol() flowcontrol.Interface
Networking() networking.Interface
Node() node.Interface
Policy() policy.Interface
Rbac() rbac.Interface
Scheduling() scheduling.Interface
Storage() storage.Interface
}
还有一个同名接口 [src]: 1
2
3
4
5// client-go/informers/internalinterfaces/factory_interfaces.go
type SharedInformerFactory interface {
Start(stopCh <-chan struct{})
InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
}
- Start() 很明显就是用来启动的
- InformerFor() 可以看出是用来构造SharedIndexInformer的
看一下结构体,这里只保留重要的参数: 1
2
3
4
5
6
7
8type sharedInformerFactory struct {
client kubernetes.Interface
defaultResync time.Duration
informers map[reflect.Type]cache.SharedIndexInformer
startedInformers map[reflect.Type]bool
// ...
}
- client: kubeernetes的clientset,它是一个包装了各种RestClient的结合 [src],client-go里面把所有官方API Group及其资源类型都做了RestClient包装,省去了自己从头构造RestClient。
- defaultResync: resync的时间间隔
- informers: 资源类型 到 其Informer的映射
- startedInformers: 记录informer是否已经启动的Map
构造函数特别简单,只要传入clientset即可和一个resync的时间间隔, 1
2kubeClient, err := kubernetes.NewForConfig(cfg)
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, time.Second*30)
其实构造一个SharedInformerFactory基本没有做什么工作,只有当你尝试获取某种资源的时候,才会开始构造Informer,比如我们需要一个访问deployment的Informer,就这么调用, 1
2deploymentInformer := kubeInformerFactory.Apps().V1().Deployments()
informer := deploymentInformer.Informer()Deployments()
干了什么 [src], 1
2
3func (v *version) Deployments() DeploymentInformer {
return &deploymentInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}1
2
3
4type DeploymentInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.DeploymentLister
}1
2
3func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}
InformerFor的实现有点长,我就不贴代码了,主要就是先判断一下成员变量informers里面有没有这个资源对应的Informer存在,如果不存在,就利用传进来的构造函数构造一个并存起来 [src],最后返回构造出来的informer。
这时候就都串起来了,SharedInformerFactory在你调用到对应资源时,会先检查一下自己的缓存里是否已经创建了该资源对应的informer,如果创建了就直接返回informer;如果没有创建则用预先定义好的构造函数创建一个并存起来,也会返回这个informer。
这里之所以把整个流程捋一遍,是为了明白SharedInformerFactory是如何做资源访问集合的,了解这一点后你就能明白官方的代码生成工具code-generator生成的东西是干什么用的。看到这里就知道了生成的generated下的三个目录clientset, informers, listers的代码分别在哪里被使用。
SharedInformerFactory还有一个Start()接口,内部实现就是遍历成员informers列表,然后为每一个informer开一个协程去执行它的Run()方法 [src]。
Workqueue 和 RateLimiting
如果有看过官方的例子,就会发现用一个Workqueue把从Informer获取到的对象缓存起来是常规做法,然后会开若干线程去消费Workqueue队列。 1
2
3
4
5
6
7
8
9
10
11
12
13
14deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, _ := cache.MetaNamespaceKeyFunc(obj)
workqueue.Add(key)
},
UpdateFunc: func(old, new interface{}) {
key, _ := cache.MetaNamespaceKeyFunc(new)
controller.handleObject(key)
},
DeleteFunc: func(obj interface{}) {
key, _ := cache.MetaNamespaceKeyFunc(obj)
workqueue.Add(key)
},
})
插入时会调用Add(item)
。还有一个插入接口是AddRateLimited(item)
,主要用于延迟对象的插入,在controller里面主要是用来延迟多次插入的对象。主循环可能处理某个对象失败了,可能是因为闪断等不可抗因素,为了不影响处理后面的对象,这时会再插到队列最后,但这可能会导致不断插入又不断取出的死循环,所以对于再次插入的对象会给予一定延迟,延迟时间随插入次数不断增大。
要记录对象的延迟就需要Map缓存,所以需要一个额外的接口去清理缓存,需要用接口Forget(item)
。
获取数据时会调用Get()
,队列内部还一个正在处理的processing缓存,每当你Get出这个对象时,就会把对象插入processing缓存中,如果要清理缓存,必须调用Done(item)
接口。你不能往队列插入一个已经存在于processing缓存的对象。
因为队列会被多线程消费,可能导致同一个对象重复加入队列又同时被多个线程处理,所以在Add(item)
时,如果发现一个对象已经存在队里中,需要用一个脏数据缓存dirty先存起来,在Done(item)
函数中,把对象从队列移除后,会把该对象从dirty缓存取出再加入队列。
构造
构造方法有两种,一种是不指定名称,一种是指定名称 [src], 1
2workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos")
AddRateLimited
内部调用了AddAfter,即延迟多少时间后再插入队列。 1
2
3func (q *rateLimitingType) AddRateLimited(item interface{}) {
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21// AddAfter adds the given item to the work queue after the given delay
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
// don't add if we're already shutting down
if q.ShuttingDown() {
return
}
q.metrics.retry()
// immediately add things with no delay
if duration <= 0 {
q.Add(item)
return
}
select {
case <-q.stopCh:
// unblock if ShutDown() is called
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
}
}func (q *delayingType) waitingLoop()
看到,
关键在于rateLimiter.When
如何计算时间, 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
exp := r.failures[item]
r.failures[item] = r.failures[item] + 1
// The backoff is capped such that 'calculated' value never overflows.
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
return r.maxDelay
}
calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay
}
return calculated
}failures
,对象第一次进来时获取空对象exp=0,以后每次进来exp+1,延迟时间的公式是delay = base * 2 ^ exp
。
Discovery
Discovery的作用其实用来发现集群中的一些版本、API组和资源。 1
2
3
4
5
6
7type DiscoveryInterface interface {
RESTClient() restclient.Interface
ServerGroupsInterface
ServerResourcesInterface
ServerVersionInterface
OpenAPISchemaInterface
}
- ServerGroupsInterface:获取所有API组metav1.APIGroupList
- ServerResourcesInterface:用于获取某个组的资源列表metav1.APIResourceList
- ServerVersionInterface:用于获取集群版本
- OpenAPISchemaInterface:用于获取所有API的语法文档
如果想知道,OpenAPISchemaInterface获取的是什么东西,可以尝试把apiserver代理到8080端口,然后访问下面的url,看看test.log里面输出了什么。 1
2kubectl proxy --port=8080
curl "http://localhost:8080/openapi/v2" | jq . > test.log
看看具体实现: 1
2
3
4type DiscoveryClient struct {
restClient restclient.Interface
LegacyPrefix string
}
- restClient:http客户端
- LegacyPrefix:核心API的前缀,默认就是/api。k8s默认核心api的前缀是/api/{version},而非核心的则是/apis/{group}/{version} [src]
具体方法就是实现了DiscoveryInterface的方法,其实就是url的拼接,然后访问apiserver获取数据 [src]。构造可以调用NewDiscoveryClient() [src]。
还有一个带缓存的接口: 1
2
3
4
5type CachedDiscoveryInterface interface {
DiscoveryInterface
Fresh() bool
Invalidate()
}
- Fresh():用于判断是否需要重新获取数据
- Invalidate():设置需要重新获取数据
Fresh()这个函数名看起来是动词,像是用来重新拉取数据的接口,但在官方的实现里面这个只是一个返回布尔值的判断。
可以参考memCacheClient的实现 [src]。内部有一个变量cacheValid用来记录数据是否合法,当你通过DiscoveryInterface的接口获取数据时,若发现cacheValid=false,则重新拉取数据,拉完后把cacheValid设为true。而Invalidate()函数则负责把cacheValid设为false和清理缓存。
RESTMapper
RESTMapper这个接口会时常用到,虽然不在client-go包里,而是在apimachinery包里,但还是拿出来说一下: 1
2
3
4
5
6
7
8
9
10
11
12
13
14type RESTMapper interface {
KindFor(resource schema.GroupVersionResource) (schema.GroupVersionKind, error)
KindsFor(resource schema.GroupVersionResource) ([]schema.GroupVersionKind, error)
ResourceFor(input schema.GroupVersionResource) (schema.GroupVersionResource, error)
ResourcesFor(input schema.GroupVersionResource) ([]schema.GroupVersionResource, error)
RESTMapping(gk schema.GroupKind, versions ...string) (*RESTMapping, error)
RESTMappings(gk schema.GroupKind, versions ...string) ([]*RESTMapping, error)
// ...
}
- KindFor: 通过一个可能残缺的GroupVersionResource获取一个具体的GroupVersionKind,如果有多个就error
- KindsFor:通过一个可能残缺的GroupVersionResource获取可能的GroupVersionKind列表
- ResourceFor和ResourcesFor和上面相似。
- RESTMapping:指定group, kind和version,获取一个对应的RESTMapping
- RESTMappings:指定group, kind和version,获取一个对应的RESTMapping列表
既然它叫REST,可以想象它的实现可以通过RESTClient从apiserver获取数据来组装这些接口,但官方的从来没有这么去实现,DefaultRESTMapper是需要通过Add()和AddSpecific()从外部手动添加的。而DeferredDiscoveryRESTMapper则是传进一个Discovery,从Discovery获取数据。
RESTMapping的构造可以参考controller-manager源码 [src]。podautoscaler就是通过RESTMapper和配置里指定的Group和Kind来找到其指向的资源实例的 [src]。
Comments