Skip to content

client-go 简介

3569字约12分钟

Go部署

2024-07-09

Kubernetes 是大家所非常熟悉的集群管理平台, 我们通常在 Web 界面上对集群内部的 pod 和 service 进行操作. 如果想要在程序代码中对集群中的 pod 进行操作或者获取其中某些容器的状态信息则可以使用 Kubernetes 官方给出的 API 接口. 由于这些接口的参数非常复杂, 所以通常我们会使用client-go 这个外部库来进行 API 的操作.

本文将详细介绍 client-go 的操作和源码实现. 文中涉及大量的源码, 未使用链接的原因是为了避免源码版本的变更.

0. Kubernetes API

文档

1. client-go 简介

1.1 client-go 说明

client-go 是一个调用 Kubernetes 集群资源对象 API 的客户端, 即通过 client-go 实现对 Kubernetes 集群中资源对象(包括 Deployment、Service、Ingress、Stateful Set、Pod、Namespace、Node 等)的增删改查等动作. 大部分对 Kubernetes 进行前置 API 封装的二次开发都通过 client-go 这个第三方包来实现.

client-g o官方文档

1.2 示例代码

func GetPods(clientSet *kubernetes.Clientset) {
	for {
		pods, err := clientSet.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{})
		if err != nil {
			panic(err.Error())
		}
		log.Info().Msgf("There are %d pods in the cluster\n", len(pods.Items))
		time.Sleep(10 * time.Second)
	}
}

1.3 运行结果

=== RUN   TestGetPods
{"level":"info","time":"2024-07-05T15:57:22+08:00","message":"There are 4963 pods in the cluster\n"}
{"level":"info","time":"2024-07-05T15:57:33+08:00","message":"There are 4963 pods in the cluster\n"}
{"level":"info","time":"2024-07-05T15:57:45+08:00","message":"There are 4958 pods in the cluster\n"}
{"level":"info","time":"2024-07-05T15:57:57+08:00","message":"There are 4950 pods in the cluster\n"}
{"level":"info","time":"2024-07-05T15:58:08+08:00","message":"There are 4950 pods in the cluster\n"}
{"level":"info","time":"2024-07-05T15:58:20+08:00","message":"There are 4950 pods in the cluster\n"}
{"level":"info","time":"2024-07-05T15:58:32+08:00","message":"There are 4946 pods in the cluster\n"}

2. client-go 源码分析

client-go 源码

client-go 源码目录结构

  • The kubernetes package contains the clientset to access Kubernetes API.
  • The discovery package is used to discover APIs supported by a Kubernetes API server.
  • The dynamic package contains a dynamic client that can perform generic operations on arbitrary Kubernetes API objects.
  • The plugin/pkg/client/auth packages contain optional authentication plugins for obtaining credentials from external sources.
  • The transport package is used to set up auth and start a connection.
  • The tools/cache package is useful for writing controllers.

2.1 kubeconfig

kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")

获取 Kubernetes 配置文件 kubeconfig 的绝对路径. 一般路径为 $HOME/.kube/config. 该文件主要用来配置本地连接的 Kubernetes 集群.

config 内容示例:

apiVersion: v1
clusters:
- cluster:
    server: http://<kube-master-ip>:8080
  name: k8s
contexts:
- context:
    cluster: k8s
    namespace: default
    user: ""
  name: default
current-context: default
kind: Config
preferences: {}
users: []

2.2 rest.config

通过参数(master 的 url 或者 kubeconfig 路径)和 BuildConfigFromFlags 方法来获取 rest.Config 对象, 一般是通过参数 kubeconfig 的路径.

config,err := clientcmd.BuildConfigFromFlags("",*kubeconfig)

BuildConfigFromFlags 函数源码

BuildConfigFromFlags 函数源码

// BuildConfigFromFlags is a helper function that builds configs from a master
// url or a kubeconfig filepath. These are passed in as command line flags for cluster
// components. Warnings should reflect this usage. If neither masterUrl or kubeconfigPath
// are passed in we fallback to inClusterConfig. If inClusterConfig fails, we fallback
// to the default config.
func BuildConfigFromFlags(masterUrl, kubeconfigPath string) (*restclient.Config, error) {
	if kubeconfigPath == "" && masterUrl == "" {
		klog.Warning("Neither --kubeconfig nor --master was specified.  Using the inClusterConfig.  This might not work.")
		kubeconfig, err := restclient.InClusterConfig()
		if err == nil {
			return kubeconfig, nil
		}
		klog.Warning("error creating inClusterConfig, falling back to default config: ", err)
	}
	return NewNonInteractiveDeferredLoadingClientConfig(
		&ClientConfigLoadingRules{ExplicitPath: kubeconfigPath},
		&ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: masterUrl}}).ClientConfig()
}

2.3 clientset

通过 *rest.Config 参数和 NewForConfig 方法来获取 clientset 对象, clientset 是多个 client 的集合, 每个 client 可能包含不同版本的方法调用.

clientset, err := kubernetes.NewForConfig(config)

2.3.1 NewForConfig

NewForConfig 函数就是初始化 clientset 中的每个 client.

NewForConfig 源码

// NewForConfig creates a new Clientset for the given config.
// If config's RateLimiter is not set and QPS and Burst are acceptable,
// NewForConfig will generate a rate-limiter in configShallowCopy.
// NewForConfig is equivalent to NewForConfigAndClient(c, httpClient),
// where httpClient was generated with rest.HTTPClientFor(c).
func NewForConfig(c *rest.Config) (*Clientset, error) {
	configShallowCopy := *c

	if configShallowCopy.UserAgent == "" {
		configShallowCopy.UserAgent = rest.DefaultKubernetesUserAgent()
	}

	// share the transport between all clients
	httpClient, err := rest.HTTPClientFor(&configShallowCopy)
	if err != nil {
		return nil, err
	}

	return NewForConfigAndClient(&configShallowCopy, httpClient)
}

2.3.2 Clientset 的结构体

Clientset 的结构体

// Clientset contains the clients for groups.
type Clientset struct {
	*discovery.DiscoveryClient
	admissionregistrationV1       *admissionregistrationv1.AdmissionregistrationV1Client
	admissionregistrationV1alpha1 *admissionregistrationv1alpha1.AdmissionregistrationV1alpha1Client
	admissionregistrationV1beta1  *admissionregistrationv1beta1.AdmissionregistrationV1beta1Client
	internalV1alpha1              *internalv1alpha1.InternalV1alpha1Client
	appsV1                        *appsv1.AppsV1Client
	appsV1beta1                   *appsv1beta1.AppsV1beta1Client
	appsV1beta2                   *appsv1beta2.AppsV1beta2Client
	authenticationV1              *authenticationv1.AuthenticationV1Client
	authenticationV1alpha1        *authenticationv1alpha1.AuthenticationV1alpha1Client
	authenticationV1beta1         *authenticationv1beta1.AuthenticationV1beta1Client
	authorizationV1               *authorizationv1.AuthorizationV1Client
	authorizationV1beta1          *authorizationv1beta1.AuthorizationV1beta1Client
	autoscalingV1                 *autoscalingv1.AutoscalingV1Client
	autoscalingV2                 *autoscalingv2.AutoscalingV2Client
	autoscalingV2beta1            *autoscalingv2beta1.AutoscalingV2beta1Client
	autoscalingV2beta2            *autoscalingv2beta2.AutoscalingV2beta2Client
	batchV1                       *batchv1.BatchV1Client
	batchV1beta1                  *batchv1beta1.BatchV1beta1Client
	certificatesV1                *certificatesv1.CertificatesV1Client
	certificatesV1beta1           *certificatesv1beta1.CertificatesV1beta1Client
	certificatesV1alpha1          *certificatesv1alpha1.CertificatesV1alpha1Client
	coordinationV1beta1           *coordinationv1beta1.CoordinationV1beta1Client
	coordinationV1                *coordinationv1.CoordinationV1Client
	coreV1                        *corev1.CoreV1Client
	discoveryV1                   *discoveryv1.DiscoveryV1Client
	discoveryV1beta1              *discoveryv1beta1.DiscoveryV1beta1Client
	eventsV1                      *eventsv1.EventsV1Client
	eventsV1beta1                 *eventsv1beta1.EventsV1beta1Client
	extensionsV1beta1             *extensionsv1beta1.ExtensionsV1beta1Client
	flowcontrolV1alpha1           *flowcontrolv1alpha1.FlowcontrolV1alpha1Client
	flowcontrolV1beta1            *flowcontrolv1beta1.FlowcontrolV1beta1Client
	flowcontrolV1beta2            *flowcontrolv1beta2.FlowcontrolV1beta2Client
	flowcontrolV1beta3            *flowcontrolv1beta3.FlowcontrolV1beta3Client
	networkingV1                  *networkingv1.NetworkingV1Client
	networkingV1alpha1            *networkingv1alpha1.NetworkingV1alpha1Client
	networkingV1beta1             *networkingv1beta1.NetworkingV1beta1Client
	nodeV1                        *nodev1.NodeV1Client
	nodeV1alpha1                  *nodev1alpha1.NodeV1alpha1Client
	nodeV1beta1                   *nodev1beta1.NodeV1beta1Client
	policyV1                      *policyv1.PolicyV1Client
	policyV1beta1                 *policyv1beta1.PolicyV1beta1Client
	rbacV1                        *rbacv1.RbacV1Client
	rbacV1beta1                   *rbacv1beta1.RbacV1beta1Client
	rbacV1alpha1                  *rbacv1alpha1.RbacV1alpha1Client
	resourceV1alpha2              *resourcev1alpha2.ResourceV1alpha2Client
	schedulingV1alpha1            *schedulingv1alpha1.SchedulingV1alpha1Client
	schedulingV1beta1             *schedulingv1beta1.SchedulingV1beta1Client
	schedulingV1                  *schedulingv1.SchedulingV1Client
	storageV1beta1                *storagev1beta1.StorageV1beta1Client
	storageV1                     *storagev1.StorageV1Client
	storageV1alpha1               *storagev1alpha1.StorageV1alpha1Client
}

2.3.3 clientset.Interface

clientset 实现了以下的 Interface, 因此可以通过调用以下方法获得具体的 client. 例如:

pods, err := clientSet.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{})
clientset 的方法集接口

clientet.Interface

type Interface interface {
	Discovery() discovery.DiscoveryInterface
	AdmissionregistrationV1() admissionregistrationv1.AdmissionregistrationV1Interface
	AdmissionregistrationV1alpha1() admissionregistrationv1alpha1.AdmissionregistrationV1alpha1Interface
	AdmissionregistrationV1beta1() admissionregistrationv1beta1.AdmissionregistrationV1beta1Interface
	InternalV1alpha1() internalv1alpha1.InternalV1alpha1Interface
	AppsV1() appsv1.AppsV1Interface
	AppsV1beta1() appsv1beta1.AppsV1beta1Interface
	AppsV1beta2() appsv1beta2.AppsV1beta2Interface
	AuthenticationV1() authenticationv1.AuthenticationV1Interface
	AuthenticationV1alpha1() authenticationv1alpha1.AuthenticationV1alpha1Interface
	AuthenticationV1beta1() authenticationv1beta1.AuthenticationV1beta1Interface
	AuthorizationV1() authorizationv1.AuthorizationV1Interface
	AuthorizationV1beta1() authorizationv1beta1.AuthorizationV1beta1Interface
	AutoscalingV1() autoscalingv1.AutoscalingV1Interface
	AutoscalingV2() autoscalingv2.AutoscalingV2Interface
	AutoscalingV2beta1() autoscalingv2beta1.AutoscalingV2beta1Interface
	AutoscalingV2beta2() autoscalingv2beta2.AutoscalingV2beta2Interface
	BatchV1() batchv1.BatchV1Interface
	BatchV1beta1() batchv1beta1.BatchV1beta1Interface
	CertificatesV1() certificatesv1.CertificatesV1Interface
	CertificatesV1beta1() certificatesv1beta1.CertificatesV1beta1Interface
	CertificatesV1alpha1() certificatesv1alpha1.CertificatesV1alpha1Interface
	CoordinationV1beta1() coordinationv1beta1.CoordinationV1beta1Interface
	CoordinationV1() coordinationv1.CoordinationV1Interface
	CoreV1() corev1.CoreV1Interface
	DiscoveryV1() discoveryv1.DiscoveryV1Interface
	DiscoveryV1beta1() discoveryv1beta1.DiscoveryV1beta1Interface
	EventsV1() eventsv1.EventsV1Interface
	EventsV1beta1() eventsv1beta1.EventsV1beta1Interface
	ExtensionsV1beta1() extensionsv1beta1.ExtensionsV1beta1Interface
	FlowcontrolV1alpha1() flowcontrolv1alpha1.FlowcontrolV1alpha1Interface
	FlowcontrolV1beta1() flowcontrolv1beta1.FlowcontrolV1beta1Interface
	FlowcontrolV1beta2() flowcontrolv1beta2.FlowcontrolV1beta2Interface
	FlowcontrolV1beta3() flowcontrolv1beta3.FlowcontrolV1beta3Interface
	NetworkingV1() networkingv1.NetworkingV1Interface
	NetworkingV1alpha1() networkingv1alpha1.NetworkingV1alpha1Interface
	NetworkingV1beta1() networkingv1beta1.NetworkingV1beta1Interface
	NodeV1() nodev1.NodeV1Interface
	NodeV1alpha1() nodev1alpha1.NodeV1alpha1Interface
	NodeV1beta1() nodev1beta1.NodeV1beta1Interface
	PolicyV1() policyv1.PolicyV1Interface
	PolicyV1beta1() policyv1beta1.PolicyV1beta1Interface
	RbacV1() rbacv1.RbacV1Interface
	RbacV1beta1() rbacv1beta1.RbacV1beta1Interface
	RbacV1alpha1() rbacv1alpha1.RbacV1alpha1Interface
	ResourceV1alpha2() resourcev1alpha2.ResourceV1alpha2Interface
	SchedulingV1alpha1() schedulingv1alpha1.SchedulingV1alpha1Interface
	SchedulingV1beta1() schedulingv1beta1.SchedulingV1beta1Interface
	SchedulingV1() schedulingv1.SchedulingV1Interface
	StorageV1beta1() storagev1beta1.StorageV1beta1Interface
	StorageV1() storagev1.StorageV1Interface
	StorageV1alpha1() storagev1alpha1.StorageV1alpha1Interface
}

2.4 CoreV1Client

我们以 clientset 中的 CoreV1Client 为例做分析

通过传入的配置信息 rest.Config 初始化 CoreV1Client 对象.

源码

cs.coreV1, err = corev1.NewForConfigAndClient(&configShallowCopy, httpClient)
if err != nil {
	return nil, err
}

2.4.1 coreV1.NewForConfig

源码

// NewForConfig creates a new CoreV1Client for the given config.
// NewForConfig is equivalent to NewForConfigAndClient(c, httpClient),
// where httpClient was generated with rest.HTTPClientFor(c).
func NewForConfig(c *rest.Config) (*CoreV1Client, error) {
	config := *c
	if err := setConfigDefaults(&config); err != nil {
		return nil, err
	}
	httpClient, err := rest.HTTPClientFor(&config)
	if err != nil {
		return nil, err
	}
	return NewForConfigAndClient(&config, httpClient)
}

coreV1.NewForConfig 方法本质是调用了 rest.HTTPClientFor(&config) 方法创建 *http.Client 对象, 即 CoreV1Client 的本质就是一个 http.Client 对象.

2.4.2 CoreV1Client 结构体

源码

// CoreV1Client is used to interact with features provided by the  group.
type CoreV1Client struct {
	restClient rest.Interface
}

CoreV1Client 实现了 CoreV1Interface 的接口, 从而对 Kubernetes 的资源对象进行操作:

源码

func (c *CoreV1Client) ComponentStatuses() ComponentStatusInterface {
	return newComponentStatuses(c)
}

func (c *CoreV1Client) ConfigMaps(namespace string) ConfigMapInterface {
	return newConfigMaps(c, namespace)
}

func (c *CoreV1Client) Endpoints(namespace string) EndpointsInterface {
	return newEndpoints(c, namespace)
}

func (c *CoreV1Client) Events(namespace string) EventInterface {
	return newEvents(c, namespace)
}

func (c *CoreV1Client) LimitRanges(namespace string) LimitRangeInterface {
	return newLimitRanges(c, namespace)
}

func (c *CoreV1Client) Namespaces() NamespaceInterface {
	return newNamespaces(c)
}

func (c *CoreV1Client) Nodes() NodeInterface {
	return newNodes(c)
}

func (c *CoreV1Client) PersistentVolumes() PersistentVolumeInterface {
	return newPersistentVolumes(c)
}

func (c *CoreV1Client) PersistentVolumeClaims(namespace string) PersistentVolumeClaimInterface {
	return newPersistentVolumeClaims(c, namespace)
}

func (c *CoreV1Client) Pods(namespace string) PodInterface {
	return newPods(c, namespace)
}

func (c *CoreV1Client) PodTemplates(namespace string) PodTemplateInterface {
	return newPodTemplates(c, namespace)
}

func (c *CoreV1Client) ReplicationControllers(namespace string) ReplicationControllerInterface {
	return newReplicationControllers(c, namespace)
}

func (c *CoreV1Client) ResourceQuotas(namespace string) ResourceQuotaInterface {
	return newResourceQuotas(c, namespace)
}

func (c *CoreV1Client) Secrets(namespace string) SecretInterface {
	return newSecrets(c, namespace)
}

func (c *CoreV1Client) Services(namespace string) ServiceInterface {
	return newServices(c, namespace)
}

func (c *CoreV1Client) ServiceAccounts(namespace string) ServiceAccountInterface {
	return newServiceAccounts(c, namespace)
}

2.4.3 CoreV1Interface

源码

type CoreV1Interface interface {
	RESTClient() rest.Interface
	ComponentStatusesGetter
	ConfigMapsGetter
	EndpointsGetter
	EventsGetter
	LimitRangesGetter
	NamespacesGetter
	NodesGetter
	PersistentVolumesGetter
	PersistentVolumeClaimsGetter
	PodsGetter
	PodTemplatesGetter
	ReplicationControllersGetter
	ResourceQuotasGetter
	SecretsGetter
	ServicesGetter
	ServiceAccountsGetter
}

CoreV1Interface 中包含了各种 Kubernetes 对象的调用接口, 例如 PodsGetter 是对 Kubernetes 中 pod 对象增删改查操作的接口. ServiceGetter 是对 Service 对象操作的接口.

2.4.4 PodsGetter

示例中的代码如下:

pods, err := clientSet.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{})

CoreV1().Pods()

源码

func (c *CoreV1Client) Pods(namespace string) PodInterface {
	return newPods(c, namespace)
}

newPods()

源码

// newPods returns a Pods
func newPods(c *CoreV1Client, namespace string) *pods {
	return &pods{
		client: c.RESTClient(),
		ns:     namespace,
	}
}

CoreV1().Pods() 的方法实际上是调用了 newPods() 的方法, 创建了一个 pods 对象, pods 对象实现了 rest.Interface 接口, 即最终的实现本质是 RESTClient 的HTTP调用.

源码

// pods implements PodInterface
type pods struct {
	client rest.Interface
	ns     string
}

pods 对象实现了 PodInterface 接口. PodInterface 定义了 pods 对象的增删改查等方法.

源码

// PodInterface has methods to work with Pod resources.
type PodInterface interface {
	Create(ctx context.Context, pod *v1.Pod, opts metav1.CreateOptions) (*v1.Pod, error)
	Update(ctx context.Context, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)
	UpdateStatus(ctx context.Context, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)
	Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
	DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error
	Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Pod, error)
	List(ctx context.Context, opts metav1.ListOptions) (*v1.PodList, error)
	Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
	Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.Pod, err error)
	Apply(ctx context.Context, pod *corev1.PodApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Pod, err error)
	ApplyStatus(ctx context.Context, pod *corev1.PodApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Pod, err error)
	UpdateEphemeralContainers(ctx context.Context, podName string, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)

	PodExpansion
}

PodsGetter

PodsGetter 继承了 PodInterface 的接口.

源码

// PodsGetter has a method to return a PodInterface.
// A group's client should implement this interface.
type PodsGetter interface {
	Pods(namespace string) PodInterface
}

Pod().List()

pods.List 方法通过 RESTClient 的 HTTP 调用来实现对 Kubernetes 的 pod 资源的获取. 源码

// List takes label and field selectors, and returns the list of Pods that match those selectors.
func (c *pods) List(ctx context.Context, opts metav1.ListOptions) (result *v1.PodList, err error) {
	var timeout time.Duration
	if opts.TimeoutSeconds != nil {
		timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
	}
	result = &v1.PodList{}
	err = c.client.Get().
		Namespace(c.ns).
		Resource("pods").
		VersionedParams(&opts, scheme.ParameterCodec).
		Timeout(timeout).
		Do(ctx).
		Into(result)
	return
}

至此分析了 clientset.CoreV1().Pods("").List(metaV1.ListOptions{}) 对 pod 资源获取的过程, 最终是调用 RESTClient 的方法实现.

2.5 RESTClient

下面分析 RESTClient 的创建过程及应用.

RESTClient 对象的创建同样是以来传入的 config 信息.

// NewForConfig creates a new CoreV1Client for the given config.
// NewForConfig is equivalent to NewForConfigAndClient(c, httpClient),
// where httpClient was generated with rest.HTTPClientFor(c).
func NewForConfig(c *rest.Config) (*CoreV1Client, error) {
	config := *c
	if err := setConfigDefaults(&config); err != nil {
		return nil, err
	}
	httpClient, err := rest.HTTPClientFor(&config)
	if err != nil {
		return nil, err
	}
	return NewForConfigAndClient(&config, httpClient)
}

2.5.1 rest.HTTPClientFor

// HTTPClientFor returns an http.Client that will provide the authentication
// or transport level security defined by the provided Config. Will return the
// default http.DefaultClient if no special case behavior is needed.
func HTTPClientFor(config *Config) (*http.Client, error) {
	transport, err := TransportFor(config)
	if err != nil {
		return nil, err
	}
	var httpClient *http.Client
	if transport != http.DefaultTransport || config.Timeout > 0 {
		httpClient = &http.Client{
			Transport: transport,
			Timeout:   config.Timeout,
		}
	} else {
		httpClient = http.DefaultClient
	}

	return httpClient, nil
}

2.5.2 rest.RESTClientForConfigAndClient

// RESTClientForConfigAndClient returns a RESTClient that satisfies the requested attributes on a
// client Config object.
// Unlike RESTClientFor, RESTClientForConfigAndClient allows to pass an http.Client that is shared
// between all the API Groups and Versions.
// Note that the http client takes precedence over the transport values configured.
// The http client defaults to the `http.DefaultClient` if nil.
func RESTClientForConfigAndClient(config *Config, httpClient *http.Client) (*RESTClient, error) {
	if config.GroupVersion == nil {
		return nil, fmt.Errorf("GroupVersion is required when initializing a RESTClient")
	}
	if config.NegotiatedSerializer == nil {
		return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
	}

	baseURL, versionedAPIPath, err := DefaultServerUrlFor(config)
	if err != nil {
		return nil, err
	}

	rateLimiter := config.RateLimiter
	if rateLimiter == nil {
		qps := config.QPS
		if config.QPS == 0.0 {
			qps = DefaultQPS
		}
		burst := config.Burst
		if config.Burst == 0 {
			burst = DefaultBurst
		}
		if qps > 0 {
			rateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst)
		}
	}

	var gv schema.GroupVersion
	if config.GroupVersion != nil {
		gv = *config.GroupVersion
	}
	clientContent := ClientContentConfig{
		AcceptContentTypes: config.AcceptContentTypes,
		ContentType:        config.ContentType,
		GroupVersion:       gv,
		Negotiator:         runtime.NewClientNegotiator(config.NegotiatedSerializer, gv),
	}

	restClient, err := NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)
	if err == nil && config.WarningHandler != nil {
		restClient.warningHandler = config.WarningHandler
	}
	return restClient, err
}

2.5.3 rest.NewRESTClient

// NewRESTClient creates a new RESTClient. This client performs generic REST functions
// such as Get, Put, Post, and Delete on specified paths.
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ClientContentConfig, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) {
	if len(config.ContentType) == 0 {
		config.ContentType = "application/json"
	}

	base := *baseURL
	if !strings.HasSuffix(base.Path, "/") {
		base.Path += "/"
	}
	base.RawQuery = ""
	base.Fragment = ""

	return &RESTClient{
		base:             &base,
		versionedAPIPath: versionedAPIPath,
		content:          config,
		createBackoffMgr: readExpBackoffConfig,
		rateLimiter:      rateLimiter,

		Client: client,
	}, nil
}

2.5.4 RESTClient

// RESTClient imposes common Kubernetes API conventions on a set of resource paths.
// The baseURL is expected to point to an HTTP or HTTPS path that is the parent
// of one or more resources.  The server should return a decodable API resource
// object, or an api.Status object which contains information about the reason for
// any failure.
//
// Most consumers should use client.New() to get a Kubernetes API client.
type RESTClient struct {
	// base is the root URL for all invocations of the client
	base *url.URL
	// versionedAPIPath is a path segment connecting the base URL to the resource root
	versionedAPIPath string

	// content describes how a RESTClient encodes and decodes responses.
	content ClientContentConfig

	// creates BackoffManager that is passed to requests.
	createBackoffMgr func() BackoffManager

	// rateLimiter is shared among all requests created by this client unless specifically
	// overridden.
	rateLimiter flowcontrol.RateLimiter

	// warningHandler is shared among all requests created by this client.
	// If not set, defaultWarningHandler is used.
	warningHandler WarningHandler

	// Set specific behavior of the client.  If not set http.DefaultClient will be used.
	Client *http.Client
}

2.5.5 rest.Interface

// Interface captures the set of operations for generically interacting with Kubernetes REST apis.
type Interface interface {
	GetRateLimiter() flowcontrol.RateLimiter
	Verb(verb string) *Request
	Post() *Request
	Put() *Request
	Patch(pt types.PatchType) *Request
	Get() *Request
	Delete() *Request
	APIVersion() schema.GroupVersion
}

在调用 HTTP 方法(Post(), Put(), Get(), Delete() )时, 实际上调用了 Verb(verb string)函数.

// Verb begins a request with a verb (GET, POST, PUT, DELETE).
//
// Example usage of RESTClient's request building interface:
// c, err := NewRESTClient(...)
// if err != nil { ... }
// resp, err := c.Verb("GET").
//
//	Path("pods").
//	SelectorParam("labels", "area=staging").
//	Timeout(10*time.Second).
//	Do()
//
// if err != nil { ... }
// list, ok := resp.(*api.PodList)
func (c *RESTClient) Verb(verb string) *Request {
	return NewRequest(c).Verb(verb)
}

Verb 函数调用了 NewRequest 方法, 最后调用 Do() 方法实现了一个 HTTP 请求获取 Result.

2.6 总结

client-go 对 kubernetes 资源对象的调用, 需要先获取 kubernetes 的配置信息, 即 $HOME/.kube/config. (或使用 Token)

整个调用的过程如下:

kubeconfig -> rest.config -> clientset -> 具体的 client(CoreV1Client) -> 具体的资源对象(pod)-> RESTClient -> http.Client -> HTTP 请求的发送及响应.

通过 clientset 中不同的 client 和 client 中不同资源对象的方法实现对 kubernetes 中资源对象的增删改查等操作, 常用的 client 有 CoreV1Client、AppsV1beta1Client、ExtensionsV1beta1Client 等.

3. client-go 对 Kubernetes 资源的调用

创建clientSet

//获取kubeconfig
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
//创建config  
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
//创建clientset
clientset, err := kubernetes.NewForConfig(config)
func NewKubernetesClient(c *Config) (*kubernetes.Clientset, error) {
	kubeConf := &rest.Config{
		Host:        fmt.Sprintf("%s:%d", c.Host, c.Port),
		BearerToken: c.Token,
		TLSClientConfig: rest.TLSClientConfig{
			Insecure: true,
		},
	}
	return kubernetes.NewForConfig(kubeConf)
}

3.1 deployment

//声明deployment对象
var deployment *v1beta1.Deployment
//构造deployment对象
//创建deployment
deployment, err := clientset.AppsV1beta1().Deployments(<namespace>).Create(<deployment>)
//更新deployment
deployment, err := clientset.AppsV1beta1().Deployments(<namespace>).Update(<deployment>)
//删除deployment
err := clientset.AppsV1beta1().Deployments(<namespace>).Delete(<deployment.Name>, &meta_v1.DeleteOptions{})
//查询deployment
deployment, err := clientset.AppsV1beta1().Deployments(<namespace>).Get(<deployment.Name>, meta_v1.GetOptions{})
//列出deployment
deploymentList, err := clientset.AppsV1beta1().Deployments(<namespace>).List(&meta_v1.ListOptions{})
//watch deployment
watchInterface, err := clientset.AppsV1beta1().Deployments(<namespace>).Watch(&meta_v1.ListOptions{})

3.2 service

//声明service对象
var service *v1.Service
//构造service对象
//创建service
service, err := clientset.CoreV1().Services(<namespace>).Create(<service>)
//更新service
service, err := clientset.CoreV1().Services(<namespace>).Update(<service>)
//删除service
err := clientset.CoreV1().Services(<namespace>).Delete(<service.Name>, &meta_v1.DeleteOptions{})
//查询service
service, err := clientset.CoreV1().Services(<namespace>).Get(<service.Name>, meta_v1.GetOptions{})
//列出service
serviceList, err := clientset.CoreV1().Services(<namespace>).List(&meta_v1.ListOptions{})
//watch service
watchInterface, err := clientset.CoreV1().Services(<namespace>).Watch(&meta_v1.ListOptions{})

3.3 ingress

//声明service对象
var service *v1.Service
//构造service对象
//创建service
service, err := clientset.CoreV1().Services(<namespace>).Create(<service>)
//更新service
service, err := clientset.CoreV1().Services(<namespace>).Update(<service>)
//删除service
err := clientset.CoreV1().Services(<namespace>).Delete(<service.Name>, &meta_v1.DeleteOptions{})
//查询service
service, err := clientset.CoreV1().Services(<namespace>).Get(<service.Name>, meta_v1.GetOptions{})
//列出service
serviceList, err := clientset.CoreV1().Services(<namespace>).List(&meta_v1.ListOptions{})
//watch service
watchInterface, err := clientset.CoreV1().Services(<namespace>).Watch(&meta_v1.ListOptions{})

replicaSet、pod、statefulset....... (kubernetes 中一般通过 deployment 来创建 replicaSet, 再通过 replicaSet 来控制 pod.)

通过以上对 Kubernetes 的资源对象的操作函数可以看出, 每个资源对象都有增删改查等方法, 基本调用逻辑类似. 一般二次开发只需要创建 deployment、service、ingress 三个资源对象即可, pod 对象由 deployment 包含的 replicaSet 来控制创建和删除. 函数调用的入参一般只有 NAMESPACE 和 kubernetesObject 两个参数, 部分操作有 Options 的参数. 在创建前, 需要对资源对象构造数据, 可以理解为编辑一个资源对象的 yaml 文件, 然后通过 kubectl create -f xxx.yaml 来创建对象.

变更历史

最后更新于: 查看全部变更历史
  • docs: update docs

    于 2024/11/21
  • docs: update docs

    于 2024/11/19
  • 更改navbar

    于 2024/11/4
  • 整理文章格式

    于 2024/11/1
  • 升级主题+整理文章格式

    于 2024/11/1
  • update

    于 2024/10/17
  • 新增文字+CRLF全部替换为LF

    于 2024/10/17
  • 给予文件夹顺序

    于 2024/9/24
  • 调整博客分类+修改about-me.md

    于 2024/9/24
  • 更改主题色+更改首页

    于 2024/9/23
  • 整理文章

    于 2024/9/23
  • first commit

    于 2024/9/20

Keep It Simple