diff --git a/.gitignore b/.gitignore index f2dd9554a12fd7acdc62e60e8eccae086f718be2..a883bc67a7866c2b07975cc23a2c8a60100a39fd 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,7 @@ # Output of the go coverage tool, specifically when used with LiteIDE *.out +/.idea/vcs.xml +/.idea/workspace.xml +/.idea/modules.xml +/.idea/ascend-hccl-controller.iml diff --git a/build/hccl-controller.yaml b/build/hccl-controller.yaml index 37c6fce5818f748173cf60b3ba1743afd03e8f05..c60c683e5b75d77e0db58a74df1864f250172766 100644 --- a/build/hccl-controller.yaml +++ b/build/hccl-controller.yaml @@ -47,7 +47,7 @@ spec: imagePullPolicy: Never command: [ "/bin/bash" ] args: [ "-c", "umask 027;hccl-controller --logtostderr=false --log_dir=/var/log/atlas_dls/hccl-controller - --log_file=/var/log/atlas_dls/hccl-controller/hccl-controller.log --stderrthreshold=1 -v=5" ] + --log_file=/var/log/atlas_dls/hccl-controller/hccl-controller.log --stderrthreshold=1 -v=3" ] volumeMounts: - name: device-hcclcontroller mountPath: /var/log/atlas_dls/hccl-controller diff --git a/build/test.sh b/build/test.sh index 4da647f7772c7745a8c14711c89d3ce6420d7e5c..f268a625d4e4be3fb44bca6956e1087c75d5f319 100644 --- a/build/test.sh +++ b/build/test.sh @@ -32,7 +32,6 @@ function mockgen_files() { mockgen k8s.io/client-go/kubernetes/typed/core/v1 CoreV1Interface >"${MOCK_TOP}"/mock_v1/corev1_mock.go mockgen volcano.sh/volcano/pkg/client/informers/externalversions/batch/v1alpha1 JobInformer >"${MOCK_TOP}"/mock_v1alpha1/former_mock.go mockgen k8s.io/client-go/kubernetes Interface >"${MOCK_TOP}"/mock_kubernetes/k8s_interface_mock.go - mockgen hccl-controller/pkg/ring-controller/controller WorkAgentInterface >"${MOCK_TOP}"/mock_controller/businessagent_mock.go mockgen k8s.io/client-go/tools/cache Indexer >"${MOCK_TOP}"/mock_cache/indexer_mock.go mockgen k8s.io/client-go/tools/cache SharedIndexInformer >"${MOCK_TOP}"/mock_cache/sharedInformer_mock.go } diff --git a/go.sum b/go.sum index ab3281e632f18f3ca5734563dae2b9d949e78b05..3ebf7586e66911b62fb8508abc11156563c2ddb1 100644 --- a/go.sum +++ b/go.sum @@ -226,6 +226,7 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/imdario/mergo v0.3.5 h1:JboBksRwiiAJWvIYJVo46AfV+IAIKZpfrSzVKj42R4Q= github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= +github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jimstudt/http-authentication v0.0.0-20140401203705-3eca13d6893a/go.mod h1:wK6yTYYcgjHE1Z1QtXACPDjcFJyBskHEdagmnq3vsP8= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= @@ -361,6 +362,7 @@ github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= +github.com/spf13/cobra v0.0.5 h1:f0B+LkLX6DtmRH1isoNA9VTtNUK9K8xYd28JNNfOv/s= github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo= @@ -570,6 +572,7 @@ k8s.io/apiextensions-apiserver v0.17.8/go.mod h1:5H/i0XiKizIE9SkoAQaU/ou31JJBIff k8s.io/apimachinery v0.16.9-beta.0/go.mod h1:Xk2vD2TRRpuWYLQNM6lT9R7DSFZUYG03SarNkbGrnKE= k8s.io/apimachinery v0.17.8 h1:zXvd8rYMAjRJXpILP9tdAiUnFIENM9EmHuE81apIoms= k8s.io/apimachinery v0.17.8/go.mod h1:Lg8zZ5iC/O8UjCqW6DNhcQG2m4TdjF9kwG3891OWbbA= +k8s.io/apiserver v0.17.8 h1:bazdS/BsMOo4SOh+EueJ0s34A1oHF+BQptI3+Dx9d3A= k8s.io/apiserver v0.17.8/go.mod h1:XU2YBi1I/v/P1R5lb0lEwSQ1rnXE01k7yxVtdIWH4Lo= k8s.io/cli-runtime v0.17.8/go.mod h1:YDS2GZU0dhHUPIh1tjex69MhR9Gt7//LqDN+XR4vbaA= k8s.io/client-go v0.16.9-beta.0/go.mod h1:At/mYjTE2j+YW6bFrgWMVMdi0KepVuRWCUiH4AR/URc= diff --git a/main.go b/main.go index 381c6011eecabc25110cb3a1186c94de64b93a58..880290902fdd005cde465b6a4ca8c931b3df373c 100644 --- a/main.go +++ b/main.go @@ -20,20 +20,23 @@ package main import ( "flag" "fmt" + "hccl-controller/pkg/ring-controller/agent" + "hccl-controller/pkg/ring-controller/model" "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" "os" "path/filepath" "time" "hccl-controller/pkg/resource-controller/signals" "hccl-controller/pkg/ring-controller/controller" - vkClientset "volcano.sh/volcano/pkg/client/clientset/versioned" - informers "volcano.sh/volcano/pkg/client/informers/externalversions" - "k8s.io/apimachinery/pkg/apis/meta/v1" + cinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog" + vkClientset "volcano.sh/volcano/pkg/client/clientset/versioned" + informers "volcano.sh/volcano/pkg/client/informers/externalversions" ) const ( @@ -53,8 +56,6 @@ var ( cmCheckTimeout int version bool jsonVersion string - // BuildName build name - BuildName string // BuildVersion build version BuildVersion string ) @@ -83,7 +84,7 @@ func main() { if jsonVersion != "v1" && jsonVersion != "v2" { klog.Fatalf("invalid json version value, should be v1/v2") } - controller.JSONVersion = jsonVersion + agent.JSONVersion = jsonVersion if version { fmt.Printf("HCCL-Controller version: %s \n", BuildVersion) @@ -111,27 +112,44 @@ func main() { if err != nil { klog.Fatalf("Error building job clientset: %s", err.Error()) } + jobInformerFactory, deploymentFactory := newInformerFactory(jobClient, kubeClient) + config := newConfig() + jobInformer := jobInformerFactory.Batch().V1alpha1().Jobs() + deploymentInformer := deploymentFactory.Apps().V1().Deployments() + cacheIndexer := make(map[string]cache.Indexer, 1) + cacheIndexer[model.VCJobType] = jobInformer.Informer().GetIndexer() + cacheIndexer[model.DeploymentType] = deploymentInformer.Informer().GetIndexer() + control := controller.NewController(kubeClient, jobClient, config, controller.InformerInfo{JobInformer: jobInformer, + DeployInformer: deploymentInformer, CacheIndexers: cacheIndexer}, stopCh) + go jobInformerFactory.Start(stopCh) + go deploymentFactory.Start(stopCh) + if err = control.Run(jobParallelism, monitorPerformance, stopCh); err != nil { + klog.Fatalf("Error running controller: %s", err.Error()) + } +} - labelSelector := labels.Set(map[string]string{controller.Key910: controller.Val910}).AsSelector().String() - jobInformerFactory := informers.NewSharedInformerFactoryWithOptions(jobClient, time.Second*30, - informers.WithTweakListOptions(func(options *v1.ListOptions) { - options.LabelSelector = labelSelector - })) - config := &controller.Config{ +func newConfig() *agent.Config { + return &agent.Config{ DryRun: dryRun, DisplayStatistic: displayStatistic, PodParallelism: podParallelism, CmCheckInterval: cmCheckInterval, CmCheckTimeout: cmCheckTimeout, } - controller := controller.NewController(kubeClient, jobClient, config, - jobInformerFactory.Batch().V1alpha1().Jobs(), stopCh) - - go jobInformerFactory.Start(stopCh) +} - if err = controller.Run(jobParallelism, monitorPerformance, stopCh); err != nil { - klog.Fatalf("Error running controller: %s", err.Error()) - } +func newInformerFactory(jobClient *vkClientset.Clientset, kubeClient *kubernetes.Clientset) ( + informers.SharedInformerFactory, cinformers.SharedInformerFactory) { + labelSelector := labels.Set(map[string]string{agent.Key910: agent.Val910}).AsSelector().String() + jobInformerFactory := informers.NewSharedInformerFactoryWithOptions(jobClient, time.Second*30, + informers.WithTweakListOptions(func(options *v1.ListOptions) { + options.LabelSelector = labelSelector + })) + deploymentFactory := cinformers.NewSharedInformerFactoryWithOptions(kubeClient, time.Second*30, + cinformers.WithTweakListOptions(func(options *v1.ListOptions) { + options.LabelSelector = labelSelector + })) + return jobInformerFactory, deploymentFactory } func init() { @@ -159,7 +177,7 @@ func init() { "Parallelism of pod events handling.") flag.IntVar(&cmCheckInterval, "cmCheckInterval", cmCheckIntervalConst, "Interval (seconds) to check job's configmap before building rank table.") - flag.IntVar(&cmCheckTimeout, "ceckTimeout", cmCheckTimeoutConst, + flag.IntVar(&cmCheckTimeout, "cmCheckTimeout", cmCheckTimeoutConst, "Maximum time (seconds) to check creation of job's configmap.") flag.BoolVar(&version, "version", false, "Query the verison of the program") diff --git a/mindx-dl/yamls/hccl-controller-v20.2.0.yaml b/mindx-dl/yamls/hccl-controller-v20.2.0.yaml index eaac2484368a4715583e725f62175fb8dd4ec2cf..d23b653ae58dfd3c32fcc7c7c97e6826a46ebc3c 100644 --- a/mindx-dl/yamls/hccl-controller-v20.2.0.yaml +++ b/mindx-dl/yamls/hccl-controller-v20.2.0.yaml @@ -47,7 +47,7 @@ spec: imagePullPolicy: Never command: [ "/bin/bash", "-c", "--" ] args: [ "umask 027;hccl-controller --logtostderr=false --log_dir=/var/log/atlas_dls/hccl-controller - --log_file=/var/log/atlas_dls/hccl-controller/hccl-controller.log --stderrthreshold=1 -v=5" ] + --log_file=/var/log/atlas_dls/hccl-controller/hccl-controller.log --stderrthreshold=1 -v=3" ] volumeMounts: - name: device-hcclcontroller mountPath: /var/log/atlas_dls/hccl-controller diff --git a/pkg/ring-controller/controller/businessagent.go b/pkg/ring-controller/agent/businessagent.go similarity index 30% rename from pkg/ring-controller/controller/businessagent.go rename to pkg/ring-controller/agent/businessagent.go index 8ebe8c45ff3ae35369e14bd7707997bde01ddb63..7a4771ea00db0bada3c452f0218f5c796982904f 100644 --- a/pkg/ring-controller/controller/businessagent.go +++ b/pkg/ring-controller/agent/businessagent.go @@ -14,17 +14,12 @@ * limitations under the License. */ -// Package controller for run the logic -package controller +// Package agent for run the logic +package agent import ( "fmt" - "strings" - "sync" - "time" - - "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" + apiCoreV1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -35,62 +30,27 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog" - - "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + "strings" + "time" "reflect" - "strconv" ) -// Agent for all businessWorkers, responsibilities: -// * list/watch 910 pods, and assign each pod to corresponding handler -// (each business worker belongs to a volcano job, and contains a handler for building rank table) -type businessAgent struct { - // business worker for each volcano job - businessWorker map[string]*businessWorker - informerFactory informers.SharedInformerFactory - podInformer cache.SharedIndexInformer - podsIndexer cache.Indexer - kubeClientSet kubernetes.Interface - agentSwitch <-chan struct{} - - rwMu sync.RWMutex - - // event recorder - recorder record.EventRecorder - - workqueue workqueue.RateLimitingInterface - - // if print only, do not delete anything. - dryRun bool - - // if display progress of configmap updating - displayStatistic bool - - // Interval to check job's configmap before building rank table - cmCheckInterval int - - // Maximum time to check creation of job's configmap - cmCheckTimeout int -} - -type podIdentifier struct { - namespace string - name string - jobName string - eventType string -} - -// String to string +// String to return podIdentifier string style : +// namespace:%s,name:%s,jobName:%s,eventType:%s func (p *podIdentifier) String() string { return fmt.Sprintf("namespace:%s,name:%s,jobName:%s,eventType:%s", p.namespace, p.name, p.jobName, p.eventType) } -var newBusinessAgent = func( +// NewBusinessAgent to create a agent. Agent is a framework, all types of workers can be +// implemented in the form of worker interface in the agent framework run. +// Agent monitors POD events with a specific label and implements the +// combination of tasks through different workers at different times. +var NewBusinessAgent = func( kubeClientSet kubernetes.Interface, recorder record.EventRecorder, config *Config, - stopCh <-chan struct{}) (*businessAgent, error) { + stopCh <-chan struct{}) (*BusinessAgent, error) { // create pod informer factory labelSelector := labels.Set(map[string]string{ @@ -102,20 +62,17 @@ var newBusinessAgent = func( })) // each worker share the same init parameters stored here - businessAgent := &businessAgent{ + businessAgent := &BusinessAgent{ informerFactory: podInformerFactory, podInformer: podInformerFactory.Core().V1().Pods().Informer(), - podsIndexer: podInformerFactory.Core().V1().Pods().Informer().GetIndexer(), - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter( + PodsIndexer: podInformerFactory.Core().V1().Pods().Informer().GetIndexer(), + Workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter( retryMilliSecond*time.Millisecond, threeMinutes*time.Second), "Pods"), - kubeClientSet: kubeClientSet, - businessWorker: make(map[string]*businessWorker), - recorder: recorder, - dryRun: config.DryRun, - displayStatistic: config.DisplayStatistic, - cmCheckInterval: config.CmCheckInterval, - cmCheckTimeout: config.CmCheckTimeout, - agentSwitch: stopCh, + KubeClientSet: kubeClientSet, + BusinessWorker: make(map[string]Worker), + recorder: recorder, + Config: config, + agentSwitch: stopCh, } // when pod is added, annotation info is ready. No need to listen update event. @@ -143,40 +100,19 @@ var newBusinessAgent = func( return businessAgent, businessAgent.run(config.PodParallelism) } -func (b *businessAgent) enqueuePod(obj interface{}, eventType string) { +// enqueuePod to through the monitoring of POD time, +// the corresponding event information is generated and put into the queue of Agent. +func (b *BusinessAgent) enqueuePod(obj interface{}, eventType string) { var name string var err error - if name, err = b.nameGenerationFunc(obj, eventType); err != nil { + if name, err = nameGenerationFunc(obj, eventType); err != nil { klog.Errorf("pod key generation error: %v", err) return } - b.workqueue.AddRateLimited(name) -} - -func (b *businessAgent) nameGenerationFunc(obj interface{}, eventType string) (string, error) { - metaData, err := meta.Accessor(obj) - if err != nil { - return "", fmt.Errorf("object has no meta: %v", err) - } - labels := metaData.GetLabels() - return metaData.GetNamespace() + "/" + metaData.GetName() + "/" + labels[VolcanoJobNameKey] + "/" + eventType, nil -} - -func (b *businessAgent) splitKeyFunc(key string) (podInfo *podIdentifier, err error) { - parts := strings.Split(key, "/") - if len(parts) == splitNum { - podInfo := &podIdentifier{ - namespace: parts[0], - name: parts[1], - jobName: parts[2], - eventType: parts[3], - } - return podInfo, nil - } - return nil, fmt.Errorf("unexpected key format: %q", key) + b.Workqueue.AddRateLimited(name) } -func (b *businessAgent) run(threadiness int) error { +func (b *BusinessAgent) run(threadiness int) error { klog.V(L1).Info("Starting workers") for i := 0; i < threadiness; i++ { go wait.Until(b.runMasterWorker, time.Second, b.agentSwitch) @@ -186,181 +122,121 @@ func (b *businessAgent) run(threadiness int) error { return nil } -func (b *businessAgent) runMasterWorker() { +func (b *BusinessAgent) runMasterWorker() { for b.processNextWorkItem() { } } -func (b *businessAgent) processNextWorkItem() bool { - obj, shutdown := b.workqueue.Get() +func (b *BusinessAgent) processNextWorkItem() bool { + obj, shutdown := b.Workqueue.Get() if shutdown { return false } if !b.doWork(obj) { - b.workqueue.AddRateLimited(obj) + b.Workqueue.AddRateLimited(obj) } return true } -func (b *businessAgent) doWork(obj interface{}) bool { - defer b.workqueue.Done(obj) - podKeyInfo, done := b.preCheck(obj) - if podKeyInfo == nil { - return done +// doWork : Each POD time is resolved in detail. If the return value is false, it means that this POD event cannot be +// processed temporarily due to some factors and needs to be put into the queue to continue the next execution. +func (b *BusinessAgent) doWork(obj interface{}) bool { + // This value is deleted from the queue each time the doWork function is executed. + defer b.Workqueue.Done(obj) + // Check the validity of the value in the queue, and if it returns true, discard the value in the queue. + podKeyInfo, retry := preCheck(obj) + if retry { + b.Workqueue.Forget(obj) + return retry } // get pod obj from lister - tmpObj, podExist, err := b.podsIndexer.GetByKey(podKeyInfo.namespace + "/" + podKeyInfo.name) + tmpObj, podExist, err := b.PodsIndexer.GetByKey(podKeyInfo.namespace + "/" + podKeyInfo.name) if err != nil { - b.workqueue.Forget(obj) + b.Workqueue.Forget(obj) klog.Errorf("syncing '%s' failed: failed to get obj from indexer", podKeyInfo) return true } - - b.rwMu.RLock() - defer b.rwMu.RUnlock() - bsnsWorker, workerExist := b.businessWorker[podKeyInfo.namespace+"/"+podKeyInfo.jobName] + // Lock to safely obtain worker data in the Map + b.RwMutex.RLock() + defer b.RwMutex.RUnlock() + bsnsWorker, workerExist := b.BusinessWorker[podKeyInfo.namespace+"/"+podKeyInfo.jobName] + klog.V(L4).Infof(" worker : \n %+v", b.BusinessWorker) if !workerExist { - return b.workerNotExistHandler(podExist, obj, podKeyInfo.String()) - } - - // if worker exist && pod exist, need check some special scenarios - pod, pass, done := b.convertAndCheckPod(obj, podExist, tmpObj, bsnsWorker, podKeyInfo) - if !pass { - return done - } - // if configmap status of worker struct is completed, no need to sync pod anymore - pass, done = b.updateConfigMap(obj, pod, podExist, podKeyInfo) - if !pass { - return done - } - b.workqueue.Forget(obj) - klog.V(L3).Infof("successfully synced '%s'", podKeyInfo) - return true -} - -func (b *businessAgent) preCheck(obj interface{}) (*podIdentifier, bool) { - var key string - var ok bool - if key, ok = obj.(string); !ok { - b.workqueue.Forget(obj) - klog.Errorf("expected string in workqueue but got %#v", obj) - return nil, true - } - podPathInfo, err := b.splitKeyFunc(key) - if err != nil || podPathInfo == nil { - b.workqueue.Forget(obj) - klog.Errorf("failed to split key: %v", err) - return nil, true + if !podExist { + b.Workqueue.Forget(obj) + klog.V(L3).Infof("syncing '%s' terminated: current obj is no longer exist", + podKeyInfo.String()) + return true + } + // llTODO: if someone create a single 910 pod without a job, how to handle? + klog.V(L4).Infof("syncing '%s' delayed: corresponding job worker may be uninitialized", + podKeyInfo.String()) + return false } - return podPathInfo, false -} - -func (b *businessAgent) convertAndCheckPod(obj interface{}, podExist bool, tmpObj interface{}, - bsnsWorker *businessWorker, podInfo *podIdentifier) (newPod *v1.Pod, isPass bool, isOver bool) { - var pod *v1.Pod + // if worker exist but pod not exist, try again if !podExist { - return pod, false, true + return true } - var ok bool - pod, ok = tmpObj.(*v1.Pod) + pod, ok := tmpObj.(*apiCoreV1.Pod) if !ok { klog.Error("pod transform failed") - return nil, false, true - } - done, pass := b.checkPodCondition(pod, bsnsWorker, obj, podInfo) - if !pass { - return pod, false, done + return true } - return pod, true, false -} - -func (b *businessAgent) updateConfigMap(obj interface{}, pod *v1.Pod, podExist bool, - podInfo *podIdentifier) (pass, isOver bool) { - if configmapComplete := - b.businessWorker[podInfo.namespace+"/"+podInfo.jobName].configmapData.getStatus() == ConfigmapCompleted; configmapComplete { - b.workqueue.Forget(obj) - klog.V(L3).Infof("syncing '%s' terminated: corresponding rank table is completed", - podInfo) - return false, true - } + // if worker exist && pod exist, need check some special scenarios + klog.V(L4).Infof("successfully synced '%s'", podKeyInfo) - // start to sync current pod - if err := b.businessWorker[podInfo.namespace+"/"+podInfo.jobName].syncHandler(pod, podExist, podInfo); err != nil { - b.workqueue.Forget(obj) - klog.Errorf("error syncing '%s': %s", podInfo, err.Error()) - return false, true + forgetQueue, retry := bsnsWorker.doWorker(pod, podKeyInfo) + if forgetQueue { + b.Workqueue.Forget(obj) } - - return true, false + return retry } -func (b *businessAgent) checkPodCondition(pod *v1.Pod, bsnsWorker *businessWorker, - obj interface{}, podInfo *podIdentifier) (isOver, pass bool) { - - // scenario check A: For an identical job, create it immediately after deletion - // check basis: job uid + creationTimestamp - if !isReferenceJobSameWithBsnsWorker(pod, podInfo.jobName, bsnsWorker.jobUID) { - if pod.CreationTimestamp.Before(&bsnsWorker.jobCreationTimestamp) { - // old pod + new worker - b.workqueue.Forget(obj) - klog.V(L3).Infof("syncing '%s' terminated: corresponding job worker is no "+ - "longer exist (basis: job uid + creationTimestamp)", podInfo) - return true, false - } - // new pod + old worker - klog.V(L3).Infof("syncing '%s' delayed: corresponding job worker is "+ - "uninitialized (basis: job uid + creationTimestamp)", podInfo) - return false, false - - } - // scenario check B: job set restart policy, delete pod - // check basis: job version - version64, err := strconv.ParseInt(pod.Annotations[PodJobVersion], 10, 32) +// nameGenerationFunc: Generate the objects (Strings) to be put into the queue from POD metadata +func nameGenerationFunc(obj interface{}, eventType string) (string, error) { + metaData, err := meta.Accessor(obj) if err != nil { - b.workqueue.Forget(obj) - klog.Errorf("syncing '%s' failed, parse pod annotation error: %v", podInfo, err) - return true, false - } - version32 := int32(version64) - // job restart action will increase job version number - if version32 < bsnsWorker.jobVersion { - b.workqueue.Forget(obj) - klog.V(L3).Infof("syncing '%s' terminated: corresponding job worker "+ - "is no longer exist (basis: job version number)", podInfo) - return true, false + return "", fmt.Errorf("object has no meta: %v", err) } - if version32 > bsnsWorker.jobVersion { - klog.V(L3).Infof("syncing '%s' delayed: corresponding job worker "+ - "is uninitialized (basis: job version number)", podInfo) - return false, false + labelMaps := metaData.GetLabels() + return metaData.GetNamespace() + "/" + metaData.GetName() + "/" + getWorkName(labelMaps) + "/" + eventType, nil +} + +func splitWorkerKey(key string) (podInfo *podIdentifier, err error) { + parts := strings.Split(key, "/") + if len(parts) != splitNum { + return nil, fmt.Errorf("unexpected key format: %q", key) } - // scenario check C: if current pod use chip, its' device info may not be ready - // check basis: limits + annotations - if (podInfo.eventType == EventAdd || podInfo.eventType == EventUpdate) && !isPodAnnotationsReady(pod, - podInfo.String()) { - return false, false + podInfo = &podIdentifier{ + namespace: parts[0], + name: parts[1], + jobName: parts[2], + eventType: parts[3], } - return false, true + return podInfo, nil + } -func (b *businessAgent) workerNotExistHandler(podExist bool, obj interface{}, key string) bool { - if !podExist { - b.workqueue.Forget(obj) - klog.V(L3).Infof("syncing '%s' terminated: current obj is no longer exist", - key) - return true +func preCheck(obj interface{}) (*podIdentifier, bool) { + var key string + var ok bool + if key, ok = obj.(string); !ok { + klog.Errorf("expected string in WorkerQueue but got %#v", obj) + return nil, true + } + podPathInfo, err := splitWorkerKey(key) + if err != nil || podPathInfo == nil { + klog.Errorf("failed to split key: %v", err) + return nil, true } - // llTODO: if someone create a single 910 pod without a job, how to handle? - klog.V(L3).Infof("syncing '%s' delayed: corresponding job worker may be uninitialized", - key) - return false + return podPathInfo, false } -func isReferenceJobSameWithBsnsWorker(pod *v1.Pod, jobName, bsnsWorkerUID string) bool { +func isReferenceJobSameWithBsnsWorker(pod *apiCoreV1.Pod, jobName, bsnsWorkerUID string) bool { sameWorker := false for _, owner := range pod.OwnerReferences { if owner.Name == jobName && string(owner.UID) == bsnsWorkerUID { @@ -371,7 +247,7 @@ func isReferenceJobSameWithBsnsWorker(pod *v1.Pod, jobName, bsnsWorkerUID string return sameWorker } -func isPodAnnotationsReady(pod *v1.Pod, identifier string) bool { +func isPodAnnotationsReady(pod *apiCoreV1.Pod, identifier string) bool { useChip := false for _, container := range pod.Spec.Containers { quantity, exist := container.Resources.Limits[ResourceName] @@ -390,111 +266,23 @@ func isPodAnnotationsReady(pod *v1.Pod, identifier string) bool { return true } -// CheckConfigmapCreation check configmap -func (b *businessAgent) CheckConfigmapCreation(job *v1alpha1.Job) (*v1.ConfigMap, error) { - var cm *v1.ConfigMap - err := wait.PollImmediate(time.Duration(b.cmCheckInterval)*time.Second, time.Duration(b.cmCheckTimeout)*time.Second, - func() (bool, error) { - var errTmp error - cm, errTmp = b.kubeClientSet.CoreV1().ConfigMaps(job.Namespace). - Get(fmt.Sprintf("%s-%s", - ConfigmapPrefix, job.Name), metav1.GetOptions{}) - if errTmp != nil { - if errors.IsNotFound(errTmp) { - return false, nil - } - return true, fmt.Errorf("get configmap error: %v", errTmp) - } - return true, nil - }) - if err != nil { - return nil, fmt.Errorf("failed to get configmap for job %s/%s: %v", job.Namespace, job.Name, err) - } - label910, exist := (*cm).Labels[Key910] - if !exist || (exist && label910 != Val910) { - return nil, fmt.Errorf("invalid configmap label" + label910) - } - - return cm, nil -} - -// Make a new business worker -func (b *businessAgent) MakeBusinessWorker(job *v1alpha1.Job, r RankTable, replicasTotal int32) *businessWorker { - businessWorker := &businessWorker{ - kubeclientset: b.kubeClientSet, - podsIndexer: b.podsIndexer, - recorder: b.recorder, - dryRun: b.dryRun, - statisticSwitch: make(chan struct{}), - jobUID: string(job.UID), - jobVersion: job.Status.Version, - jobCreationTimestamp: job.CreationTimestamp, - jobNamespace: job.Namespace, - jobName: job.Name, - configmapName: fmt.Sprintf("%s-%s", ConfigmapPrefix, job.Name), - configmapData: r, - statisticStopped: false, - cachedPodNum: 0, - taskReplicasTotal: replicasTotal, - } - - return businessWorker -} - -// CreateBusinessWorker create worker -func (b *businessAgent) CreateBusinessWorker(job *v1alpha1.Job, ranktable RankTable, replicasTotal int32) error { - b.rwMu.Lock() - defer b.rwMu.Unlock() - - klog.V(L2).Infof("create business worker for %s/%s", job.Namespace, job.Name) - _, exist := b.businessWorker[job.Namespace+"/"+job.Name] - if exist { - klog.V(L2).Infof("business worker for %s/%s is already existed", job.Namespace, job.Name) - return nil - } - - // initialize business worker for current job - businessWorker := b.MakeBusinessWorker(job, ranktable, replicasTotal) - - // start to report rank table build statistic for current job - if b.displayStatistic { - go businessWorker.statistic(BuildStatInterval) - } - - // save current business worker - b.businessWorker[job.Namespace+"/"+job.Name] = businessWorker - klog.V(L2).Infof("create business worker for %s/%s success, %d pods need to be cached", - job.Namespace, job.Name, b.businessWorker[job.Namespace+"/"+job.Name].taskReplicasTotal) - - return nil -} - -// DeleteBusinessWorker delete businessworker -func (b *businessAgent) DeleteBusinessWorker(namespace string, name string) error { - b.rwMu.Lock() - defer b.rwMu.Unlock() +// DeleteWorker : Delete worker(namespace/name) from BusinessWorker map in agent +func DeleteWorker(namespace string, name string, agent *BusinessAgent) { + agent.RwMutex.Lock() + defer agent.RwMutex.Unlock() klog.V(L2).Infof("not exist + delete, current job is %s/%s", namespace, name) identifier := namespace + "/" + name - _, exist := b.businessWorker[identifier] + _, exist := agent.BusinessWorker[identifier] if !exist { - klog.V(L2).Infof("failed to delete business worker for %s/%s, it's not exist", namespace, + klog.V(L3).Infof("failed to delete business worker for %s/%s, it's not exist", namespace, name) - return nil + return } - if b.displayStatistic { - b.businessWorker[identifier].closeStatistic() + if agent.Config.DisplayStatistic { + agent.BusinessWorker[identifier].CloseStatistic() } - delete(b.businessWorker, identifier) - klog.V(L2).Infof("business worker for %s/%s is deleted", namespace, name) - - return nil -} - -// IsBusinessWorkerExist check worker if exist -func (b *businessAgent) IsBusinessWorkerExist(namespace string, name string) bool { - b.rwMu.Lock() - defer b.rwMu.Unlock() - _, exist := b.businessWorker[namespace+"/"+name] - return exist + delete(agent.BusinessWorker, identifier) + klog.V(L2).Infof("business worker for %s is deleted", identifier) + return } diff --git a/pkg/ring-controller/controller/businessagent_test.go b/pkg/ring-controller/agent/businessagent_test.go similarity index 82% rename from pkg/ring-controller/controller/businessagent_test.go rename to pkg/ring-controller/agent/businessagent_test.go index 100a464e9796c7d9848e160b53fe1c095ce39380..f0c0669cecb20f25526df76c1c825116dc21e5e6 100644 --- a/pkg/ring-controller/controller/businessagent_test.go +++ b/pkg/ring-controller/agent/businessagent_test.go @@ -15,12 +15,13 @@ */ // Package controller for run the logic -package controller +package agent import ( "fmt" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "hccl-controller/pkg/ring-controller/controller" "hccl-controller/pkg/ring-controller/controller/mock_cache" "hccl-controller/pkg/ring-controller/controller/mock_kubernetes" "hccl-controller/pkg/ring-controller/controller/mock_v1" @@ -36,7 +37,7 @@ import ( // Test_businessAgent_deleteBusinessWorker test deleteBusinessWorker func Test_businessAgent_deleteBusinessWorker(t *testing.T) { tests := []struct { - worker *businessAgent + worker *controller.BusinessAgent name string wantErr bool namespace string @@ -60,7 +61,7 @@ func Test_businessAgent_deleteBusinessWorker(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { if !tt.worker.dryRun { - tt.worker.businessWorker["vcjob/hccl-test3"] = newMockBusinessWorkerforStatistic(1, 1, false) + tt.worker.BusinessWorker["vcjob/hccl-test3"] = newMockBusinessWorkerforStatistic(1, 1, false) } if err := tt.worker.DeleteBusinessWorker(tt.namespace, tt.podName); (err != nil) != tt.wantErr { t.Errorf("deleteBusinessWorker() error = %v, wantErr %v", err, tt.wantErr) @@ -72,7 +73,7 @@ func Test_businessAgent_deleteBusinessWorker(t *testing.T) { // Test_businessAgent_isBusinessWorkerExist test isBusinessWorkerExist func Test_businessAgent_isBusinessWorkerExist(t *testing.T) { tests := []struct { - worker *businessAgent + worker *controller.BusinessAgent name string expect bool namespace string @@ -96,7 +97,7 @@ func Test_businessAgent_isBusinessWorkerExist(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { if !tt.worker.dryRun { - tt.worker.businessWorker["vcjob/hccl-test2"] = newMockBusinessWorkerforStatistic(1, 1, false) + tt.worker.BusinessWorker["vcjob/hccl-test2"] = newMockBusinessWorkerforStatistic(1, 1, false) } tt.worker.IsBusinessWorkerExist(tt.namespace, tt.podName) assert.Equal(t, !tt.worker.dryRun, tt.expect) @@ -104,45 +105,45 @@ func Test_businessAgent_isBusinessWorkerExist(t *testing.T) { } } -func createAgent(dryrun bool) *businessAgent { - return &businessAgent{ +func createAgent(dryrun bool) *controller.BusinessAgent { + return &controller.BusinessAgent{ informerFactory: nil, podInformer: nil, - podsIndexer: nil, - kubeClientSet: nil, - businessWorker: make(map[string]*businessWorker), + PodsIndexer: nil, + KubeClientSet: nil, + BusinessWorker: make(map[string]*controller.VCJobWorker), agentSwitch: nil, recorder: nil, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter( - retryMilliSecond*time.Millisecond, threeMinutes*time.Second), "Pods"), + controller.retryMilliSecond*time.Millisecond, controller.threeMinutes*time.Second), "Pods"), dryRun: dryrun, displayStatistic: true, - cmCheckInterval: decimal, - cmCheckTimeout: decimal, + cmCheckInterval: controller.decimal, + cmCheckTimeout: controller.decimal, } } -func createAgentForController(dryrun bool) *businessAgent { - return &businessAgent{ +func createAgentForController(dryrun bool) *controller.BusinessAgent { + return &controller.BusinessAgent{ informerFactory: nil, podInformer: nil, - podsIndexer: nil, - kubeClientSet: nil, - businessWorker: make(map[string]*businessWorker), + PodsIndexer: nil, + KubeClientSet: nil, + BusinessWorker: make(map[string]*controller.VCJobWorker), agentSwitch: nil, recorder: nil, workqueue: nil, dryRun: dryrun, displayStatistic: true, - cmCheckInterval: decimal, - cmCheckTimeout: decimal, + cmCheckInterval: controller.decimal, + cmCheckTimeout: controller.decimal, } } // Test_businessAgent_createBusinessWorker test createBusinessWorker func Test_businessAgent_createBusinessWorker(t *testing.T) { tests := []struct { - worker *businessAgent + worker *controller.BusinessAgent name string }{ { @@ -157,7 +158,7 @@ func Test_businessAgent_createBusinessWorker(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { if !tt.worker.dryRun { - tt.worker.businessWorker["vcjob/hccl-test"] = newMockBusinessWorkerforStatistic(1, 1, false) + tt.worker.BusinessWorker["vcjob/hccl-test"] = newMockBusinessWorkerforStatistic(1, 1, false) } tt.worker.CreateBusinessWorker(mockJob()) }) @@ -178,7 +179,7 @@ func mockJob() *v1alpha1.Job { SchedulerName: "volcano", MinAvailable: 1, Queue: "default", - MaxRetry: three, + MaxRetry: controller.three, PriorityClassName: "", Tasks: mockTask(), }, @@ -223,7 +224,7 @@ func mockPod() *v1.Pod { }, }, Annotations: map[string]string{ - PodGroupKey: "default-test", + controller.PodGroupKey: "default-test", }, }, Spec: mockSpec(), @@ -238,10 +239,10 @@ func mockSpec() v1.PodSpec { Image: "", Resources: v1.ResourceRequirements{ Limits: v1.ResourceList{ - ResourceName: resource.MustParse("1"), + controller.ResourceName: resource.MustParse("1"), }, Requests: v1.ResourceList{ - ResourceName: resource.MustParse("1"), + controller.ResourceName: resource.MustParse("1"), }, }, }, @@ -260,13 +261,13 @@ func Test_businessAgent_doWork(t *testing.T) { pod.OwnerReferences[0].Name = "jobname" pod.OwnerReferences[0].UID = "11" mockIndexer.EXPECT().GetByKey(gomock.Any()).Return(pod.DeepCopy(), true, nil) - pod.Annotations[PodJobVersion] = "0" + pod.Annotations[controller.PodJobVersion] = "0" mockIndexer.EXPECT().GetByKey(gomock.Any()).Return(pod.DeepCopy(), true, nil) - pod.Annotations[PodJobVersion] = "2" + pod.Annotations[controller.PodJobVersion] = "2" mockIndexer.EXPECT().GetByKey(gomock.Any()).Return(pod.DeepCopy(), true, nil) - pod.Annotations[PodJobVersion] = "1" + pod.Annotations[controller.PodJobVersion] = "1" mockIndexer.EXPECT().GetByKey(gomock.Any()).Return(pod.DeepCopy(), true, nil) - pod.Annotations[PodDeviceKey] = "{\"pod_name\":\"0\",\"server_id\":\"127.0.0.1\"," + + pod.Annotations[controller.PodDeviceKey] = "{\"pod_name\":\"0\",\"server_id\":\"127.0.0.1\"," + "\"devices\":[{\"device_id\":\"0\",\"device_ip\":\"0.0.0.0\"}]}\n" mockIndexer.EXPECT().GetByKey(gomock.Any()).Return(pod.DeepCopy(), true, nil) namespaceKey := "vcjob/hccl-test/jobname/add" @@ -284,9 +285,9 @@ func Test_businessAgent_doWork(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tt.workAgent.podsIndexer = mockIndexer + tt.workAgent.PodsIndexer = mockIndexer if tt.worker { - tt.workAgent.businessWorker["vcjob/jobname"] = + tt.workAgent.BusinessWorker["vcjob/jobname"] = newMockBusinessWorkerforStatistic(1, 1, false) } if got := tt.workAgent.doWork(tt.obj); got != tt.want { @@ -297,7 +298,7 @@ func Test_businessAgent_doWork(t *testing.T) { } type testCase struct { - workAgent *businessAgent + workAgent *controller.BusinessAgent obj interface{} name string want bool @@ -344,13 +345,13 @@ func Test_businessAgent_CheckConfigmapCreation(t *testing.T) { mockK8s := mock_kubernetes.NewMockInterface(ctrl) mockV1 := mock_v1.NewMockCoreV1Interface(ctrl) mockCm := mock_v1.NewMockConfigMapInterface(ctrl) - mockCm.EXPECT().Get(gomock.Any(), gomock.Any()).Return(mockConfigMap(), nil) + mockCm.EXPECT().Get(gomock.Any(), gomock.Any()).Return(controller.mockConfigMap(), nil) mockCm.EXPECT().Get(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("mock error")) - cm := mockConfigMap() - cm.ObjectMeta.Labels[Key910] = "ascend=310" + cm := controller.mockConfigMap() + cm.ObjectMeta.Labels[controller.Key910] = "ascend=310" mockCm.EXPECT().Get(gomock.Any(), gomock.Any()).Return(cm, nil) - mockV1.EXPECT().ConfigMaps(gomock.Any()).Return(mockCm).Times(three) - mockK8s.EXPECT().CoreV1().Return(mockV1).Times(three) + mockV1.EXPECT().ConfigMaps(gomock.Any()).Return(mockCm).Times(controller.three) + mockK8s.EXPECT().CoreV1().Return(mockV1).Times(controller.three) tests := []struct { job *v1alpha1.Job want *v1.ConfigMap @@ -360,7 +361,7 @@ func Test_businessAgent_CheckConfigmapCreation(t *testing.T) { { name: "test", job: mockJob(), - want: mockConfigMap(), + want: controller.mockConfigMap(), wantErr: false, }, { @@ -377,7 +378,7 @@ func Test_businessAgent_CheckConfigmapCreation(t *testing.T) { }, } b := createAgent(false) - b.kubeClientSet = mockK8s + b.KubeClientSet = mockK8s for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { _, err := b.CheckConfigmapCreation(tt.job) diff --git a/pkg/ring-controller/controller/businessworker_test.go b/pkg/ring-controller/agent/businessworker_test.go similarity index 84% rename from pkg/ring-controller/controller/businessworker_test.go rename to pkg/ring-controller/agent/businessworker_test.go index cbab7fb4f32d942b08ddef3d88224e9af327a847..0f962a33ccef8fd892c2399ea6dff1afd6f63ae3 100644 --- a/pkg/ring-controller/controller/businessworker_test.go +++ b/pkg/ring-controller/agent/businessworker_test.go @@ -15,11 +15,12 @@ */ // Package controller for run the logic -package controller +package agent import ( "encoding/json" "github.com/golang/mock/gomock" + "hccl-controller/pkg/ring-controller/controller" "hccl-controller/pkg/ring-controller/controller/mock_kubernetes" "hccl-controller/pkg/ring-controller/controller/mock_v1" apiCoreV1 "k8s.io/api/core/v1" @@ -34,7 +35,7 @@ import ( func Test_businessWorker_statistic(t *testing.T) { tests := []struct { - worker *businessWorker + worker *controller.VCJobWorker name string }{ { @@ -47,7 +48,7 @@ func Test_businessWorker_statistic(t *testing.T) { }, { name: "test3:cachePod number don't equals task", - worker: newMockBusinessWorkerforStatistic(int32(two), 1, true), + worker: newMockBusinessWorkerforStatistic(int32(controller.two), 1, true), }, } for _, tt := range tests { @@ -59,14 +60,14 @@ func Test_businessWorker_statistic(t *testing.T) { b.statisticSwitch <- struct{}{} }() } - b.statistic(twosecond) + b.statistic(controller.twosecond) }) } } -func newMockBusinessWorkerforStatistic(cachedPodNum, taskReplicasTotal int32, statisticStopped bool) *businessWorker { - return &businessWorker{ +func newMockBusinessWorkerforStatistic(cachedPodNum, taskReplicasTotal int32, statisticStopped bool) VCJobWorker { + return &VCJobWorker{ statisticSwitch: make(chan struct{}), podsIndexer: nil, jobVersion: 1, @@ -146,7 +147,7 @@ func mockPodIdentify(event string) *podIdentifier { } } -func generateInstance(worker *businessWorker) { +func generateInstance(worker *controller.VCJobWorker) { var instance Instance deviceInfo := mockJSON() err := json.Unmarshal([]byte(deviceInfo), &instance) @@ -180,13 +181,13 @@ func Test_businessWorker_handleDeleteEvent(t *testing.T) { mockK8s := mock_kubernetes.NewMockInterface(ctrl) mockV1 := mock_v1.NewMockCoreV1Interface(ctrl) mockCm := mock_v1.NewMockConfigMapInterface(ctrl) - mockCm.EXPECT().Get(gomock.Any(), gomock.Any()).Return(mockConfigMap(), nil).Times(two) - mockCm.EXPECT().Update(gomock.Any()).Return(mockConfigMap(), nil).Times(two) - mockV1.EXPECT().ConfigMaps(gomock.Any()).Return(mockCm).Times(four) - mockK8s.EXPECT().CoreV1().Return(mockV1).Times(four) - job := mockJob() + mockCm.EXPECT().Get(gomock.Any(), gomock.Any()).Return(controller.mockConfigMap(), nil).Times(controller.two) + mockCm.EXPECT().Update(gomock.Any()).Return(controller.mockConfigMap(), nil).Times(controller.two) + mockV1.EXPECT().ConfigMaps(gomock.Any()).Return(mockCm).Times(controller.four) + mockK8s.EXPECT().CoreV1().Return(mockV1).Times(controller.four) + job := controller.mockJob() tests := []struct { - worker *businessWorker + worker *controller.VCJobWorker podInf *podIdentifier wantErr bool name string @@ -222,11 +223,11 @@ func Test_businessWorker_handleAddUpdateEvent(t *testing.T) { mockK8s, job, pod := initMock(t) var pods []*apiCoreV1.Pod pods = append(pods, pod.DeepCopy()) - pod.Annotations[PodDeviceKey] = mockJSON() + pod.Annotations[controller.PodDeviceKey] = mockJSON() pods = append(pods, pod.DeepCopy()) pod.Name = "test" pods = append(pods, pod.DeepCopy()) - pod.Annotations[PodDeviceKey] = "xxx" + pod.Annotations[controller.PodDeviceKey] = "xxx" pods = append(pods, pod.DeepCopy()) tests := []testCaseForWorker{ newTestCaseForWorker("test1:no device info,run cachezeroPodInfo,no error", false, @@ -253,12 +254,12 @@ func initMock(t *testing.T) (kubernetes.Interface, *v1alpha1.Job, *apiCoreV1.Pod mockK8s := mock_kubernetes.NewMockInterface(ctrl) mockV1 := mock_v1.NewMockCoreV1Interface(ctrl) mockCm := mock_v1.NewMockConfigMapInterface(ctrl) - mockCm.EXPECT().Get(gomock.Any(), gomock.Any()).Return(mockConfigMap(), nil).Times(two) - mockCm.EXPECT().Update(gomock.Any()).Return(mockConfigMap(), nil).Times(two) - mockV1.EXPECT().ConfigMaps(gomock.Any()).Return(mockCm).Times(four) - mockK8s.EXPECT().CoreV1().Return(mockV1).Times(four) - job := mockJob() - pod := mockPod() + mockCm.EXPECT().Get(gomock.Any(), gomock.Any()).Return(controller.mockConfigMap(), nil).Times(controller.two) + mockCm.EXPECT().Update(gomock.Any()).Return(controller.mockConfigMap(), nil).Times(controller.two) + mockV1.EXPECT().ConfigMaps(gomock.Any()).Return(mockCm).Times(controller.four) + mockK8s.EXPECT().CoreV1().Return(mockV1).Times(controller.four) + job := controller.mockJob() + pod := controller.mockPod() return mockK8s, job, pod } diff --git a/pkg/ring-controller/agent/deploymentworker.go b/pkg/ring-controller/agent/deploymentworker.go new file mode 100644 index 0000000000000000000000000000000000000000..9fd048f47b48f7c955242730b266e456d137873a --- /dev/null +++ b/pkg/ring-controller/agent/deploymentworker.go @@ -0,0 +1,116 @@ +/* + * Copyright(C) 2021. Huawei Technologies Co.,Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package agent + +import ( + "fmt" + v1 "hccl-controller/pkg/ring-controller/ranktable/v1" + apiCoreV1 "k8s.io/api/core/v1" + "k8s.io/klog" + "strconv" + "time" +) + +// NewDeploymentWorker : to create Deployment Worker +func NewDeploymentWorker(agent *BusinessAgent, deploy DeployInfo, ranktable v1.RankTabler, + replicasTotal int32) *DeployWorker { + return &DeployWorker{WorkerInfo: WorkerInfo{kubeclientset: agent.KubeClientSet, podsIndexer: agent.PodsIndexer, + recorder: agent.recorder, dryRun: agent.dryRun, statisticSwitch: make(chan struct{}), + configmapName: fmt.Sprintf("%s-%s", ConfigmapPrefix, deploy.DeployName), + configmapData: ranktable, statisticStopped: false, cachedPodNum: 0, taskReplicasTotal: replicasTotal, + rankMap: make(map[string]int, 1)}, DeployInfo: deploy} +} + +func (w *DeployWorker) doWorker(pod *apiCoreV1.Pod, podInfo *podIdentifier) (forgetQueue, retry bool) { + // scenario check A: For an identical job, create it immediately after deletion + // check basis: job uid + creationTimestamp + if pod.CreationTimestamp.Before(&w.DeployCreationTimestamp) { + // old pod + new worker + klog.V(L3).Infof("syncing '%s' terminated: corresponding job worker is no "+ + "longer exist (basis: job uid + creationTimestamp)", podInfo) + return true, false + } + // scenario check C: if current pod use chip, its' device info may not be ready + // check basis: limits + annotations + if (podInfo.eventType == EventAdd || podInfo.eventType == EventUpdate) && !isPodAnnotationsReady(pod, + podInfo.String()) { + return false, false + } + if configmapComplete := + w.configmapData.GetStatus() == ConfigmapCompleted; configmapComplete { + klog.V(L3).Infof("syncing '%s' terminated: corresponding rank table is completed", + podInfo) + return true, true + } + + // start to sync current pod + if err := w.syncHandler(pod, podInfo); err != nil { + klog.Errorf("error syncing '%s': %s", podInfo, err.Error()) + return true, true + } + return true, true +} + +// Statistic : no need to add lock here, deviation from true value is acceptable +func (w *DeployWorker) Statistic(stopTime time.Duration) { + for { + select { + case c, ok := <-w.statisticSwitch: + if !ok { + klog.Error(c) + } + return + default: + if w.taskReplicasTotal == w.cachedPodNum { + klog.V(L1).Infof("rank table build progress for %s/%s is completed", + w.DeployNamespace, w.DeployName) + w.CloseStatistic() + return + } + klog.V(L1).Infof("rank table build progress for %s/%s: pods need to be cached = %d,"+ + "pods already cached = %d", w.DeployNamespace, w.DeployName, w.taskReplicasTotal, w.cachedPodNum) + time.Sleep(stopTime) + } + } +} + +func (w *DeployWorker) handleDeleteEvent(podInfo *podIdentifier) error { + klog.V(L3).Infof("current handleDeleteEvent pod is %s", podInfo) + + w.cmMu.Lock() + defer w.cmMu.Unlock() + rank, ok := w.rankMap[podInfo.namespace+"/"+podInfo.name] + + if !ok { + return fmt.Errorf("rank map not exist, key is %s/%s", podInfo.namespace, podInfo.name) + } + rankIndex := strconv.Itoa(rank) + err := w.configmapData.RemovePodInfo(podInfo.namespace, rankIndex) + if err != nil { + return err + } + + klog.V(L3).Infof("start to remove data of pod %s/%s", podInfo.namespace, podInfo.name) + err = updateConfigMap(&w.WorkerInfo, podInfo.namespace) + if err != nil { + return err + } + w.modifyStatistics(-1) + klog.V(L3).Infof("data of pod %s/%s is removed", podInfo.namespace, podInfo.name) + + return nil +} diff --git a/pkg/ring-controller/agent/types.go b/pkg/ring-controller/agent/types.go new file mode 100644 index 0000000000000000000000000000000000000000..b4e731d69ad8f9451d1a92973d15b9b3545c17ac --- /dev/null +++ b/pkg/ring-controller/agent/types.go @@ -0,0 +1,190 @@ +/* + * Copyright(C) 2021. Huawei Technologies Co.,Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package agent + +import ( + v1 "hccl-controller/pkg/ring-controller/ranktable/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + "sync" +) + +const ( + // Key910 to get Configmap + Key910 = "ring-controller.atlas" + // Val910 to get Configmap + Val910 = "ascend-910" // Val910 to get Configmap + // ResourceName for 910 + ResourceName = "huawei.com/Ascend910" + // ConfigmapPrefix to get from configmap + ConfigmapPrefix = "rings-config" + // ConfigmapCompleted Staus + ConfigmapCompleted = "completed" + // ConfigmapInitializing status + ConfigmapInitializing = "initializing" + // ConfigmapKey configmap Data Name + ConfigmapKey = "hccl.json" + // VolcanoJobNameKey to get job name + VolcanoJobNameKey = "volcano.sh/job-name" + // PodJobVersion to get job version + PodJobVersion = "volcano.sh/job-version" + // PodDeviceKey Pod annoation Key + PodDeviceKey = "ascend.kubectl.kubernetes.io/ascend-910-configuration" + + // DeploymentNameKey pod label + DeploymentNameKey = "deploy-name" + // EventAdd event add + EventAdd = "add" + // EventUpdate event to update + EventUpdate = "update" + // EventDelete event to delete + EventDelete = "delete" + + // L1 log level 1 + L1 = 1 + // L2 log level 2 + L2 = 2 + // L3 log level 3 + L3 = 3 + // L4 log level 4 + L4 = 4 + retryMilliSecond = 5 + threeMinutes = 180 + splitNum = 4 +) + +var ( + // JSONVersion of hccl.json + JSONVersion = "v2" +) + +// BusinessAgent Agent for all businessWorkers, responsibilities: +// * list/watch 910 pods, and assign each pod to corresponding handler +// (each business worker belongs to a volcano job, and contains a handler for building rank table) +type BusinessAgent struct { + // Config Agent configuration file + Config *Config + // business worker for each volcano job + BusinessWorker map[string]Worker + informerFactory informers.SharedInformerFactory + podInformer cache.SharedIndexInformer + // PodsIndexer to get pod index by namespace&name + PodsIndexer cache.Indexer + // KubeClientSet : ClientSet to contact kube apiServer + KubeClientSet kubernetes.Interface + agentSwitch <-chan struct{} + + // RwMutex : to lock Agent Resource eg. Workqueue & BusinessWorker + RwMutex sync.RWMutex + + // event recorder + recorder record.EventRecorder + // Workqueue : A queue with a limited rate.This queue is used to put pod event information + Workqueue workqueue.RateLimitingInterface + + // if print only, do not delete anything. + dryRun bool +} + +// Config controller init configure +type Config struct { + // DryRun:Is it a test + DryRun bool + // DisplayStatistic : a flag if starts to report rank table build statistic for job + DisplayStatistic bool + // PodParallelism : how many goroutine to run in the agent + PodParallelism int + // CmCheckInterval: ConfigMap Interval + CmCheckInterval int + // CmCheckTimeout :ConfigMap TimeOut + CmCheckTimeout int +} + +type podIdentifier struct { + namespace string + name string + jobName string + eventType string +} + +// VCJobWorker controller for each volcano job, list/watch corresponding pods and build configmap (rank table) +type VCJobWorker struct { + // WorkerInfo: normal Worker info + WorkerInfo + // JobInfo: VCJob Worker Info + JobInfo +} + +// JobInfo Job Worker Info +type JobInfo struct { + // JobVersion: When a job restart, JobVersion is needed to identify if a pod is old + // with respect to this job + JobVersion int32 + // JobUID: For an identical job, create it immediately after deletion, new + // vcjob Worker will cache old pod info without a identifier to distinguish + JobUID string + // JobCreationTimestamp: when pod reference job uid is different with uid of VCJobWorker + // creationTimestamp is needed to distinguish cases between: 1. old pod + new worker OR 2. new pod + old worker + JobCreationTimestamp metav1.Time + // JobNamespace: Job namespace + JobNamespace string + // JobName : Job name + JobName string +} + +// DeployWorker for deployment model +type DeployWorker struct { + // WorkerInfo: normal Worker info + WorkerInfo + // DeployInfo: Deployment Worker info + DeployInfo +} + +// WorkerInfo :normal Worker info +type WorkerInfo struct { + kubeclientset kubernetes.Interface + recorder record.EventRecorder + cmMu, statisticMu sync.Mutex + dryRun bool + statisticSwitch chan struct{} + + podsIndexer cache.Indexer + + configmapName string + configmapData v1.RankTabler + + statisticStopped bool + rankIndex int + rankMap map[string]int + cachedPodNum int32 + taskReplicasTotal int32 +} + +// DeployInfo : deployment Worker info +type DeployInfo struct { + // DeployCreationTimestamp: when pod reference job uid is different with uid of VCJobWorker + // creationTimestamp is needed to distinguish cases between: 1. old pod + new worker OR 2. new pod + old worker + DeployCreationTimestamp metav1.Time + // DeployNamespace :deployment namespace + DeployNamespace string + // DeployName : deployment name + DeployName string +} diff --git a/pkg/ring-controller/controller/businessworker.go b/pkg/ring-controller/agent/vcjobworker.go similarity index 33% rename from pkg/ring-controller/controller/businessworker.go rename to pkg/ring-controller/agent/vcjobworker.go index fb56db43362c1d76f3c51e918981cf7ea2d6dad2..c7437632e6004b4e52da950ee5096ef57826273b 100644 --- a/pkg/ring-controller/controller/businessworker.go +++ b/pkg/ring-controller/agent/vcjobworker.go @@ -14,112 +14,192 @@ * limitations under the License. */ -// Package controller for logic -package controller +// Package agent for logic +package agent import ( "encoding/json" "fmt" + v1 "hccl-controller/pkg/ring-controller/ranktable/v1" apiCoreV1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" "k8s.io/klog" - "sync" + "strconv" + "strings" "time" ) -// controller for each volcano job, list/watch corresponding pods and build configmap (rank table) -type businessWorker struct { - kubeclientset kubernetes.Interface - recorder record.EventRecorder - cmMu, statisticMu sync.Mutex - dryRun bool - statisticSwitch chan struct{} - - podsIndexer cache.Indexer - - // jobVersion: When a job restart, jobVersion is needed to identify if a pod is old - // with respect to this job - jobVersion int32 - // jobUID: For an identical job, create it immediately after deletion, new - // businessWorker will cache old pod info without a identifier to distinguish - jobUID string - // jobCreationTimestamp: when pod reference job uid is different with uid of businessWorker - // creationTimestamp is needed to distinguish cases between: 1. old pod + new worker OR 2. new pod + old worker - jobCreationTimestamp metav1.Time - jobNamespace string - jobName string - configmapName string - configmapData RankTable - - statisticStopped bool - cachedPodNum int32 - taskReplicasTotal int32 +// Worker :The main function of Worker is to get the information of NPU from the generated POD, +// and then assemble it into a complete HCCL.JSON file. +type Worker interface { + doWorker(pod *apiCoreV1.Pod, podInfo *podIdentifier) (forgetQueue, retry bool) + Statistic(stopTime time.Duration) + WorkerCommon } -func (b *businessWorker) tableConstructionFinished() bool { - b.statisticMu.Lock() - defer b.statisticMu.Unlock() +// NewVCJobWorker : Generates a Worker that handles the VCJob type +func NewVCJobWorker(agent *BusinessAgent, job JobInfo, ranktable v1.RankTabler, replicasTotal int32) *VCJobWorker { + jobWorker := &VCJobWorker{WorkerInfo: WorkerInfo{kubeclientset: agent.KubeClientSet, podsIndexer: agent.PodsIndexer, + recorder: agent.recorder, dryRun: agent.dryRun, statisticSwitch: make(chan struct{}), + configmapName: fmt.Sprintf("%s-%s", ConfigmapPrefix, job.JobName), + configmapData: ranktable, statisticStopped: false, cachedPodNum: 0, taskReplicasTotal: replicasTotal, + rankMap: make(map[string]int, 1)}, JobInfo: job} + return jobWorker +} - return b.cachedPodNum == b.taskReplicasTotal +func (b *VCJobWorker) doWorker(pod *apiCoreV1.Pod, podInfo *podIdentifier) (forgetQueue, retry bool) { + // scenario check A: For an identical job, create it immediately after deletion + // check basis: job uid + creationTimestamp + if !isReferenceJobSameWithBsnsWorker(pod, podInfo.jobName, b.JobUID) { + if pod.CreationTimestamp.Before(&b.JobCreationTimestamp) { + // old pod + new worker + klog.V(L3).Infof("syncing '%s' terminated: corresponding job worker is no "+ + "longer exist (basis: job uid + creationTimestamp)", podInfo) + return true, false + } + // new pod + old worker + klog.V(L3).Infof("syncing '%s' delayed: corresponding job worker is "+ + "uninitialized (basis: job uid + creationTimestamp)", podInfo) + return false, false + + } + // scenario check B: job set restart policy, delete pod + // check basis: job version + version64, err := strconv.ParseInt(pod.Annotations[PodJobVersion], 10, 32) + if err != nil { + klog.Errorf("syncing '%s' failed, parse pod annotation error: %v", podInfo, err) + return true, false + } + version32 := int32(version64) + // job restart action will increase job version number + if version32 < b.JobVersion { + + klog.V(L3).Infof("syncing '%s' terminated: corresponding job worker "+ + "is no longer exist (basis: job version number)", podInfo) + return true, false + } + if version32 > b.JobVersion { + klog.V(L3).Infof("syncing '%s' delayed: corresponding job worker "+ + "is uninitialized (basis: job version number)", podInfo) + return false, false + } + // scenario check C: if current pod use chip, its' device info may not be ready + // check basis: limits + annotations + if (podInfo.eventType == EventAdd || podInfo.eventType == EventUpdate) && !isPodAnnotationsReady(pod, + podInfo.String()) { + return false, false + } + if configmapComplete := + b.configmapData.GetStatus() == ConfigmapCompleted; configmapComplete { + klog.V(L3).Infof("syncing '%s' terminated: corresponding rank table is completed", + podInfo) + return true, true + } + + // start to sync current pod + if err := b.syncHandler(pod, podInfo); err != nil { + klog.Errorf("error syncing '%s': %s", podInfo, err.Error()) + return true, true + } + return true, true +} + +// Statistic : Determine whether CM has been built, process the build completion or change the goroutine exit signal. +// No need to add lock here, deviation from true value is acceptable +func (b *VCJobWorker) Statistic(stopTime time.Duration) { + for { + select { + case c, ok := <-b.statisticSwitch: + if !ok { + klog.Error(c) + } + return + default: + if b.taskReplicasTotal == b.cachedPodNum { + klog.V(L1).Infof("rank table build progress for %s/%s is completed", + b.JobNamespace, b.JobName) + b.CloseStatistic() + return + } + klog.V(L2).Infof("rank table build progress for %s/%s: pods need to be cached = %d,"+ + "pods already cached = %d", b.JobNamespace, b.JobName, b.taskReplicasTotal, b.cachedPodNum) + time.Sleep(stopTime) + } + } } -func (b *businessWorker) syncHandler(pod *apiCoreV1.Pod, podExist bool, podInfo *podIdentifier) error { +// WorkerCommon : The common methods of Worker, these methods have a certain degree of fixedness, +// if the new Worker type does not apply to these methods, they can be overwritten. +type WorkerCommon interface { + handleAddUpdateEvent(podInfo *podIdentifier, pod *apiCoreV1.Pod) error + handleDeleteEvent(podInfo *podIdentifier) error + tableConstructionFinished() bool + endRankTableConstruction(string) error + modifyStatistics(diff int32) + // CloseStatistic : to close statisticSwitch chan + CloseStatistic() + syncHandler(pod *apiCoreV1.Pod, podInfo *podIdentifier) error +} + +func (b *WorkerInfo) syncHandler(pod *apiCoreV1.Pod, podInfo *podIdentifier) error { klog.V(L3).Infof("syncHandler start, current pod is %s", podInfo) // if use 0 chip, end pod sync if b.taskReplicasTotal == 0 && b.tableConstructionFinished() { klog.V(L2).Infof("job %s/%s doesn't use d chip, rank table construction is finished", - b.jobNamespace, b.jobName) - if err := b.endRankTableConstruction(); err != nil { + podInfo.namespace, podInfo.jobName) + if err := b.endRankTableConstruction(pod.Namespace); err != nil { return err } + klog.V(L2).Infof("rank table for job %s/%s has finished construction", podInfo.namespace, podInfo.jobName) return nil // need return directly } // dryRun is for test if b.dryRun { - klog.V(L3).Infof("I'am handling %s, exist: %t", podInfo, podExist) + klog.V(L3).Infof("I'am handling %s", podInfo) return nil } - if (podInfo.eventType == EventAdd) && podExist { - err := b.handleAddUpdateEvent(podInfo, pod) - if err != nil { - return err - } + if podInfo.eventType == EventAdd { + return b.handleAddUpdateEvent(podInfo, pod) + } - if podInfo.eventType == EventDelete && !podExist { - err := b.handleDeleteEvent(podInfo) - if err != nil { - return err - } + if podInfo.eventType == EventDelete { + return b.handleDeleteEvent(podInfo) + } - klog.V(L3).Infof("undefined condition, pod: %s, exist: %t", podInfo, podExist) + klog.V(L3).Infof("undefined condition, pod: %s", podInfo) return nil } -func (b *businessWorker) handleAddUpdateEvent(podInfo *podIdentifier, pod *apiCoreV1.Pod) error { - klog.V(L3).Infof("current addUpdate pod is %s", podInfo) +func (b *WorkerInfo) tableConstructionFinished() bool { + b.statisticMu.Lock() + defer b.statisticMu.Unlock() + + return b.cachedPodNum == b.taskReplicasTotal +} +func (b *WorkerInfo) handleAddUpdateEvent(podInfo *podIdentifier, pod *apiCoreV1.Pod) error { + klog.V(L4).Infof("current addUpdate pod is %s", podInfo) // because this annotation is already used to filter pods in previous step (podExist - scenario C) // it can be used to identify if pod use chip here deviceInfo, exist := pod.Annotations[PodDeviceKey] - klog.V(L3).Info("deviceId =>", deviceInfo) - klog.V(L4).Info("isExist ==>", exist) + klog.V(L3).Infof("deviceId => %s", deviceInfo) + klog.V(L4).Infof("isExist ==> %s", exist) b.cmMu.Lock() defer b.cmMu.Unlock() - - err := b.configmapData.cachePodInfo(pod, deviceInfo) + b.rankMap[podInfo.namespace+"/"+podInfo.name] = b.rankIndex + err := b.configmapData.CachePodInfo(pod, deviceInfo, &b.rankIndex) if err != nil { return err } b.modifyStatistics(1) + klog.V(L3).Infof("rank table build progress for %s/%s: pods need to be cached = %d, "+ + "pods already cached = %d", podInfo.namespace, podInfo.jobName, b.taskReplicasTotal, b.cachedPodNum) // update configmap if finishing caching all pods' info - errs := updateWithFinish(b) + errs := updateWithFinish(b, podInfo.namespace) if errs != nil { return errs } @@ -127,19 +207,20 @@ func (b *businessWorker) handleAddUpdateEvent(podInfo *podIdentifier, pod *apiCo return nil } -func (b *businessWorker) handleDeleteEvent(podInfo *podIdentifier) error { +func (b *WorkerInfo) handleDeleteEvent(podInfo *podIdentifier) error { klog.V(L3).Infof("current handleDeleteEvent pod is %s", podInfo) b.cmMu.Lock() defer b.cmMu.Unlock() - - err := b.configmapData.removePodInfo(podInfo.namespace, podInfo.name) + split := strings.Split(podInfo.name, "-") + podID := split[len(split)-1] + err := b.configmapData.RemovePodInfo(podInfo.namespace, podID) if err != nil { return err } klog.V(L3).Infof("start to remove data of pod %s/%s", podInfo.namespace, podInfo.name) - err = b.updateConfigmap() + err = updateConfigMap(b, podInfo.namespace) if err != nil { return err } @@ -149,53 +230,54 @@ func (b *businessWorker) handleDeleteEvent(podInfo *podIdentifier) error { return nil } -func updateWithFinish(b *businessWorker) error { - if b.tableConstructionFinished() { - if err := b.endRankTableConstruction(); err != nil { - return err - } - } - return nil -} - -func checkPodCache(group *Group, pod *apiCoreV1.Pod) bool { - for _, instance := range group.InstanceList { - if instance.PodName == pod.Name { - klog.V(L3).Infof("ANOMALY: pod %s/%s is already cached", pod.Namespace, - pod.Name) - return true - } - } - return false -} - -func (b *businessWorker) endRankTableConstruction() error { - err := b.configmapData.setStatus(ConfigmapCompleted) +func (b *WorkerInfo) endRankTableConstruction(namespace string) error { + err := b.configmapData.SetStatus(ConfigmapCompleted) if err != nil { klog.Error("fail to set configmap status: %v", err) return err } - err = b.updateConfigmap() + err = updateConfigMap(b, namespace) if err != nil { klog.Error("update configmap failed") return err } - klog.V(L2).Infof("rank table for job %s/%s has finished construction", b.jobNamespace, b.jobName) return nil } -// statistic about how many pods have already cached -func (b *businessWorker) modifyStatistics(diff int32) { +// modifyStatistics statistic about how many pods have already cached +func (b *WorkerInfo) modifyStatistics(diff int32) { b.statisticMu.Lock() defer b.statisticMu.Unlock() b.cachedPodNum += diff - klog.V(L3).Infof("rank table build progress for %s/%s: pods need to be cached = %d, "+ - "pods already cached = %d", b.jobNamespace, b.jobName, b.taskReplicasTotal, b.cachedPodNum) + +} + +// CloseStatistic : to close statisticSwitch chan +func (b *WorkerInfo) CloseStatistic() { + if !b.statisticStopped { + close(b.statisticSwitch) + b.statisticStopped = true + } +} + +func updateWithFinish(b *WorkerInfo, namespace string) error { + if b.tableConstructionFinished() { + if err := b.endRankTableConstruction(namespace); err != nil { + return err + } + } + return nil +} + +func getWorkName(labels map[string]string) string { + if label, ok := labels[VolcanoJobNameKey]; ok { + return label + } + return labels[DeploymentNameKey] } -// update configmap's data field -func (b *businessWorker) updateConfigmap() error { - cm, err := b.kubeclientset.CoreV1().ConfigMaps(b.jobNamespace).Get(b.configmapName, metav1.GetOptions{}) +func updateConfigMap(w *WorkerInfo, namespace string) error { + cm, err := w.kubeclientset.CoreV1().ConfigMaps(namespace).Get(w.configmapName, metav1.GetOptions{}) if err != nil { return fmt.Errorf("get configmap error: %v", err) } @@ -205,46 +287,14 @@ func (b *businessWorker) updateConfigmap() error { return fmt.Errorf("invalid configmap label" + label910) } - dataByteArray, err := json.Marshal(b.configmapData) + dataByteArray, err := json.Marshal(w.configmapData) if err != nil { return fmt.Errorf("marshal configmap data error: %v", err) } cm.Data[ConfigmapKey] = string(dataByteArray[:]) - if _, err := b.kubeclientset.CoreV1().ConfigMaps(b.jobNamespace).Update(cm); err != nil { - return fmt.Errorf("failed to update ConfigMap for Job %s/%s: %v", b.jobNamespace, b.jobName, err) + if _, err := w.kubeclientset.CoreV1().ConfigMaps(namespace).Update(cm); err != nil { + return fmt.Errorf("failed to update ConfigMap for Job %v", err) } - return nil } - -func (b *businessWorker) closeStatistic() { - if !b.statisticStopped { - close(b.statisticSwitch) - b.statisticStopped = true - } -} - -// no need to add lock here, deviation from true value is acceptable -func (b *businessWorker) statistic(stopTime time.Duration) { - for { - select { - case c, ok := <-b.statisticSwitch: - if !ok { - klog.Error(c) - } - return - default: - if b.taskReplicasTotal == b.cachedPodNum { - klog.V(L1).Infof("rank table build progress for %s/%s is completed", - b.jobNamespace, b.jobName) - b.closeStatistic() - return - } - klog.V(L1).Infof("rank table build progress for %s/%s: pods need to be cached = %d,"+ - "pods already cached = %d", b.jobNamespace, b.jobName, b.taskReplicasTotal, b.cachedPodNum) - time.Sleep(stopTime) - } - } - -} diff --git a/pkg/ring-controller/controller/controller.go b/pkg/ring-controller/controller/controller.go index c70eddf12a39ace9c6fd77505c68c99630df6839..7fe26f7880adbfb42a10ce21d2a3d57956ba449e 100644 --- a/pkg/ring-controller/controller/controller.go +++ b/pkg/ring-controller/controller/controller.go @@ -19,16 +19,9 @@ package controller import ( "fmt" - "net" - "net/http" - "net/http/pprof" - "reflect" - "strconv" - "strings" - "time" - + "hccl-controller/pkg/ring-controller/agent" + "hccl-controller/pkg/ring-controller/model" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/meta" pkgutilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" @@ -38,56 +31,19 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog" - "volcano.sh/volcano/pkg/apis/batch/v1alpha1" - v1alpha1apis "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + "net" + "net/http" + "net/http/pprof" + "reflect" + "strings" + "time" clientset "volcano.sh/volcano/pkg/client/clientset/versioned" samplescheme "volcano.sh/volcano/pkg/client/clientset/versioned/scheme" - v1alpha1informers "volcano.sh/volcano/pkg/client/informers/externalversions/batch/v1alpha1" ) -// Controller initialize business agent -type Controller struct { - // component for recycle resources - businessAgent *businessAgent - - // kubeclientset is a standard kubernetes clientset - kubeclientset kubernetes.Interface - - // jobclientset is a clientset for volcano job - jobclientset clientset.Interface - - // component for resource batch/v1alpha1/Job - jobsSynced cache.InformerSynced - jobsIndexer cache.Indexer - - // workqueue is a rate limited work queue. This is used to queue work to be - // processed instead of performing it as soon as a change happens. This - // means we can ensure we only process a fixed amount of resources at a - // time, and makes it easy to ensure we are never processing the same item - // simultaneously in two different workers. - workqueue workqueue.RateLimitingInterface - // recorder is an event recorder for recording Event resources to the - // Kubernetes API. - recorder record.EventRecorder - workAgentInterface WorkAgentInterface -} - -// Config controller init configure -type Config struct { - DryRun bool - DisplayStatistic bool - PodParallelism int - CmCheckInterval int - CmCheckTimeout int -} - // NewController returns a new sample controller -func NewController( - kubeclientset kubernetes.Interface, - jobclientset clientset.Interface, - config *Config, - jobInformer v1alpha1informers.JobInformer, - stopCh <-chan struct{}) *Controller { +func NewController(kubeclientset kubernetes.Interface, jobclientset clientset.Interface, config *agent.Config, + informerInfo InformerInfo, stopCh <-chan struct{}) *Controller { // Create event broadcaster // Add ring-controller types to the default Kubernetes Scheme so Events can be // logged for ring-controller types. @@ -97,47 +53,32 @@ func NewController( eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerName}) - businessAgent, err := newBusinessAgent(kubeclientset, recorder, config, stopCh) + agents, err := agent.NewBusinessAgent(kubeclientset, recorder, config, stopCh) if err != nil { klog.Fatalf("Error creating business agent: %s", err.Error()) } - controller := &Controller{ - kubeclientset: kubeclientset, - jobclientset: jobclientset, - jobsSynced: jobInformer.Informer().HasSynced, - jobsIndexer: jobInformer.Informer().GetIndexer(), - businessAgent: businessAgent, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Jobs"), - recorder: recorder, - workAgentInterface: businessAgent, + c := &Controller{ + kubeclientset: kubeclientset, + jobclientset: jobclientset, + jobsSynced: informerInfo.JobInformer.Informer().HasSynced, + deploySynced: informerInfo.DeployInformer.Informer().HasSynced, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "model"), + recorder: recorder, + agent: agents, + cacheIndexers: informerInfo.CacheIndexers, } - - klog.V(L1).Info("Setting up event handlers") - // Set up an event handler for when Job resources change - jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - controller.enqueueJob(obj, EventAdd) - }, - UpdateFunc: func(old, new interface{}) { - if !reflect.DeepEqual(old, new) { - controller.enqueueJob(new, EventUpdate) - } - }, - DeleteFunc: func(obj interface{}) { - controller.enqueueJob(obj, EventDelete) - }, - }) - return controller + informerInfo.addEventHandle(c) + return c } // Run will set up the event handlers for types we are interested in, as well // as syncing informer caches and starting workers. It will block until stopCh -// is closed, at which point it will shutdown the workqueue and wait for +// is closed, at which point it will shutdown the WorkQueue and wait for // workers to finish processing their current work items. func (c *Controller) Run(threadiness int, monitorPerformance bool, stopCh <-chan struct{}) error { defer pkgutilruntime.HandleCrash() defer c.workqueue.ShutDown() - defer c.businessAgent.workqueue.ShuttingDown() + defer c.agent.Workqueue.ShuttingDown() // monitor performance if monitorPerformance { go startPerformanceMonitorServer() @@ -145,7 +86,9 @@ func (c *Controller) Run(threadiness int, monitorPerformance bool, stopCh <-chan // Wait for the caches to be synced before starting workers klog.V(L4).Info("Waiting for informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.jobsSynced); !ok { + ok := cache.WaitForCacheSync(stopCh, c.jobsSynced) + ok2 := cache.WaitForCacheSync(stopCh, c.deploySynced) + if !(ok && ok2) { return fmt.Errorf("failed to wait for caches to sync") } @@ -164,9 +107,9 @@ func (c *Controller) Run(threadiness int, monitorPerformance bool, stopCh <-chan return nil } -// runWorker is a long-running function that will continually call the +// runMasterWorker is a long-running function that will continually call the // processNextWorkItem function in order to read and process a message on the -// workqueue. +// WorkQueue. func (c *Controller) runMasterWorker() { for c.processNextWorkItem() { } @@ -190,35 +133,35 @@ func (c *Controller) processNextWorkItem() bool { // put back on the workqueue and attempted again after a back-off // period. defer c.workqueue.Done(obj) - var key string + var mo model.ResourceEventHandler var ok bool // We expect strings to come off the workqueue. These are of the // form namespace/name. We do this as the delayed nature of the // workqueue means the items in the informer cache may actually be // more up to date that when the item was initially put onto the // workqueue. - if key, ok = obj.(string); !ok { + if mo, ok = obj.(model.ResourceEventHandler); !ok { // As the item in the workqueue is actually invalid, we call // Forget here else we'd go into a loop of attempting to // process a work item that is invalid. c.workqueue.Forget(obj) - pkgutilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) - return nil + return fmt.Errorf("expected string in workqueue but got %#v", obj) } // Run the syncHandler, passing it the namespace/name string of the - // Job resource to be synced. - if err := c.syncHandler(key); err != nil { + // Job/Deployment resource to be synced. + if err := c.syncHandler(mo); err != nil { c.workqueue.Forget(obj) - return fmt.Errorf("error syncing '%s': %s", key, err.Error()) + return fmt.Errorf("error syncing '%s': %s", mo.GetModelKey(), err.Error()) } // Finally, if no error occurs we Forget this item so it does not // get queued again until another change happens. c.workqueue.Forget(obj) - klog.V(L2).Infof("Successfully synced '%s'", key) + klog.V(L4).Infof("Successfully synced %+v ", mo) return nil }(obj) if err != nil { + klog.Errorf("controller processNextWorkItem is failed, err %v", err) pkgutilruntime.HandleError(err) return true } @@ -230,166 +173,84 @@ func (c *Controller) processNextWorkItem() bool { // it into a namespace/name string which is then put onto the work queue. This method // should *not* be passed resources of any type other than Job. func (c *Controller) enqueueJob(obj interface{}, eventType string) { - var key string - var err error - if key, err = c.KeyGenerationFunc(obj, eventType); err != nil { + models, err := model.Factory(obj, eventType, c.cacheIndexers) + if err != nil { pkgutilruntime.HandleError(err) return } - c.workqueue.AddRateLimited(key) -} - -// KeyGenerationFunc to generate key -func (c *Controller) KeyGenerationFunc(obj interface{}, eventType string) (string, error) { - metaData, err := meta.Accessor(obj) - - if err != nil { - return "", fmt.Errorf("object has no meta: %v", err) - } - - if len(metaData.GetNamespace()) > 0 { - return metaData.GetNamespace() + "/" + metaData.GetName() + "/" + eventType, nil - } - return metaData.GetName() + "/" + eventType, nil -} - -// SplitKeyFunc to splite key by format namespace,jobname,eventType -func (c *Controller) SplitKeyFunc(key string) (namespace, name, eventType string, err error) { - parts := strings.Split(key, "/") - switch len(parts) { - case 2: - // name only, no namespace - return "", parts[0], parts[1], nil - case 3: - // namespace and name - return parts[0], parts[1], parts[2], nil - default: - return "", "", "", fmt.Errorf("unexpected key format: %q", key) - } + c.workqueue.AddRateLimited(models) } -func (c *Controller) syncHandler(key string) error { - klog.V(L2).Infof("syncHandler start, current key is %s", key) - - namespace, name, eventType, err := c.SplitKeyFunc(key) +func (c *Controller) syncHandler(model model.ResourceEventHandler) error { + key := model.GetModelKey() + klog.V(L2).Infof("syncHandler start, current key is %v", key) + namespace, name, eventType, err := splitKeyFunc(key) if err != nil { return fmt.Errorf("failed to split key: %v", err) } - tempObj, exists, err := c.jobsIndexer.GetByKey(namespace + "/" + name) + _, exists, err := model.GetCacheIndex().GetByKey(namespace + "/" + name) if err != nil { return fmt.Errorf("failed to get obj from indexer: %s", key) } + if !exists { + if eventType == agent.EventDelete { + agent.DeleteWorker(namespace, name, c.agent) + } + return fmt.Errorf("undefined condition, eventType is %s, current key is %s", eventType, key) + } switch eventType { - case EventAdd: - err := c.eventAdd(exists, namespace, name, tempObj, key) + case agent.EventAdd: + klog.V(L2).Infof("exist + add, current job is %s/%s", namespace, name) + err := model.EventAdd(c.agent) if err != nil { return err } - case EventDelete: - if exists { - klog.V(L2).Infof("undefined condition, exist + delete, current key is %s", key) - return fmt.Errorf("undefined condition, exist + delete, current key is %s", key) - } - c.businessAgent.DeleteBusinessWorker(namespace, name) - case EventUpdate: + case agent.EventUpdate: // unnecessary to handle - err := c.eventUpdate(exists, tempObj, namespace, name, key) + err := model.EventUpdate(c.agent) if err != nil { return err } default: - // abnormal - klog.V(L2).Infof("undefined condition, eventType is %s, current key is %s", eventType, key) return fmt.Errorf("undefined condition, eventType is %s, current key is %s", eventType, key) } return nil } -func (c *Controller) eventAdd(exists bool, namespace string, name string, tempObj interface{}, key string) error { - if exists { - klog.V(L2).Infof("exist + add, current job is %s/%s", namespace, name) - // check if job's corresponding configmap is created successfully via volcano controller - job, ok := tempObj.(*v1alpha1apis.Job) - if !ok { - klog.Error("event add => failed, job transform not ok") - } - err := c.createBusinessWorker(job) - if err != nil { - return err - } - } - // abnormal - klog.V(L2).Infof("undefined condition, not exist + add, current key is %s", key) - return nil -} - -func (c *Controller) eventUpdate(exists bool, tempObj interface{}, namespace string, name string, key string) error { - if exists { - job, ok := tempObj.(*v1alpha1apis.Job) - if !ok { - klog.Error("update event -> failed") - } - if string(job.Status.State.Phase) == JobRestartPhase { - c.workAgentInterface.DeleteBusinessWorker(namespace, name) - return nil - } - if !c.businessAgent.IsBusinessWorkerExist(namespace, name) { - // for job update, if create business worker at job restart phase, the version will be incorrect - err := c.createBusinessWorker(job) - if err != nil { - return err +func (in *InformerInfo) addEventHandle(controller *Controller) { + eventHandlerFunc := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + controller.enqueueJob(obj, agent.EventAdd) + }, + UpdateFunc: func(old, new interface{}) { + if !reflect.DeepEqual(old, new) { + controller.enqueueJob(new, agent.EventUpdate) } - } + }, + DeleteFunc: func(obj interface{}) { + controller.enqueueJob(obj, agent.EventDelete) + }, } - klog.V(L2).Infof("undefined condition, not exist + update, current key is %s", key) - return nil + in.JobInformer.Informer().AddEventHandler(eventHandlerFunc) + in.DeployInformer.Informer().AddEventHandler(eventHandlerFunc) } -func (c *Controller) createBusinessWorker(job *v1alpha1apis.Job) error { - // check if job's corresponding configmap is created successfully via volcano controller - cm, err := c.workAgentInterface.CheckConfigmapCreation(job) - if err != nil { - return err - } - - // retrieve configmap data - var configmapDataV1 RankTableV1 - var configmapDataV2 RankTableV2 - jobStartString := cm.Data[ConfigmapKey] - klog.V(L4).Info("jobstarting==>", jobStartString) - - var ranktable RankTable - groupList, replicasTotal, err := generateGrouplist(job) - if err != nil { - return fmt.Errorf("generate group list from job error: %v", err) - } - - if JSONVersion == "v1" { - err = configmapDataV1.unmarshalToRankTable(jobStartString) - if err != nil { - return err - } - ranktable = &RankTableV1{RankTableStatus: RankTableStatus{ConfigmapInitializing}, GroupCount: strconv.Itoa(len(job.Spec.Tasks)), - GroupList: groupList} - } else { - err = configmapDataV2.unmarshalToRankTable(jobStartString) - if err != nil { - return err - } - var serverList []*Server - ranktable = &RankTableV2{ServerCount: strconv.Itoa(len(serverList)), ServerList: serverList, - RankTableStatus: RankTableStatus{ConfigmapInitializing}, Version: "1.0"} - } - // create a business worker for current job - err = c.workAgentInterface.CreateBusinessWorker(job, ranktable, replicasTotal) - if err != nil { - return err +// splitKeyFunc to splite key by format namespace,jobname,eventType +func splitKeyFunc(key string) (namespace, name, eventType string, err error) { + parts := strings.Split(key, "/") + switch len(parts) { + case 2: + // name only, no namespace + return "", parts[0], parts[1], nil + case 3: + // namespace and name + return parts[0], parts[1], parts[2], nil + default: + return "", "", "", fmt.Errorf("unexpected key format: %q", key) } - - return nil } func startPerformanceMonitorServer() { @@ -404,32 +265,8 @@ func startPerformanceMonitorServer() { Addr: net.JoinHostPort("localhost", "6060"), Handler: mux, } - error := server.ListenAndServe() - if error != nil { - klog.Error(error) - } -} - -func generateGrouplist(job *v1alpha1.Job) ([]*Group, int32, error) { - var replicasTotal int32 - var groupList []*Group - for _, taskSpec := range job.Spec.Tasks { - var deviceTotal int32 - - for _, container := range taskSpec.Template.Spec.Containers { - quantity, exist := container.Resources.Limits[ResourceName] - quantityValue := int32(quantity.Value()) - if exist && quantityValue > 0 { - deviceTotal += quantityValue - } - } - deviceTotal *= taskSpec.Replicas - - var instanceList []*Instance - group := Group{GroupName: taskSpec.Name, DeviceCount: strconv.FormatInt(int64(deviceTotal), decimal), - InstanceCount: strconv.FormatInt(int64(taskSpec.Replicas), decimal), InstanceList: instanceList} - groupList = append(groupList, &group) - replicasTotal += taskSpec.Replicas + err := server.ListenAndServe() + if err != nil { + klog.Error(err) } - return groupList, replicasTotal, nil } diff --git a/pkg/ring-controller/controller/controller_test.go b/pkg/ring-controller/controller/controller_test.go index 0a0dd79f1a89af8e971a14a08cfe02f9984cfaf8..8620f6de443c43a9a0b651fabb46e9d51bab7e10 100644 --- a/pkg/ring-controller/controller/controller_test.go +++ b/pkg/ring-controller/controller/controller_test.go @@ -22,6 +22,7 @@ import ( "github.com/golang/mock/gomock" "github.com/prashantv/gostub" "github.com/stretchr/testify/assert" + "hccl-controller/pkg/ring-controller/agent" "hccl-controller/pkg/ring-controller/controller/mock_cache" "hccl-controller/pkg/ring-controller/controller/mock_controller" "hccl-controller/pkg/ring-controller/controller/mock_kubernetes" @@ -102,7 +103,7 @@ func TestController_createBusinessWorker(t *testing.T) { t.Run(tt.name, func(t *testing.T) { mockAgent.EXPECT().CheckConfigmapCreation(gomock.Any()).Return(tt.configMap, nil) c := &Controller{ - workAgentInterface: mockAgent, + agent: mockAgent, } if err := c.createBusinessWorker(&v1alpha1.Job{}); (err != nil) != tt.wantErr { t.Errorf("createBusinessWorker() error = %v, wantErr %v", err, tt.wantErr) @@ -118,7 +119,7 @@ func TestController_syncHandler(t *testing.T) { mockAgent.EXPECT().CreateBusinessWorker(gomock.Any()).Return(nil).Times(three) mockAgent.EXPECT().CheckConfigmapCreation(gomock.Any()).Return(mockConfigMap(), nil).Times(three) mockIndexr := mock_cache.NewMockIndexer(ctrl) - mockIndexr.EXPECT().GetByKey(gomock.Any()).Return(mockJob(), true, nil).Times(four) + mockIndexr.EXPECT().GetByKey(gomock.Any()).Return(agent.mockJob(), true, nil).Times(four) mockIndexr.EXPECT().GetByKey(gomock.Any()).Return(nil, false, fmt.Errorf("mock error")) mockIndexr.EXPECT().GetByKey(gomock.Any()).Return(nil, false, nil) // no pod existed @@ -134,9 +135,9 @@ func TestController_syncHandler(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { c := &Controller{ - workAgentInterface: mockAgent, - jobsIndexer: mockIndexr, - businessAgent: createAgent(true), + agent: mockAgent, + jobsIndexer: mockIndexr, + businessAgent: agent.createAgent(true), } if err := c.syncHandler(tt.key); (err != nil) != tt.wantErr { t.Errorf("syncHandler() error = %v, wantErr %v", err, tt.wantErr) @@ -188,7 +189,7 @@ func TestNewController(t *testing.T) { mockShared.EXPECT().AddEventHandler(gomock.Any()).Return() mockShared.EXPECT().GetIndexer().Return(nil) mockInformer.EXPECT().Informer().Return(mockShared).Times(three) - stub := gostub.StubFunc(&newBusinessAgent, createAgentForController(false), nil) + stub := gostub.StubFunc(&agent.newBusinessAgent, agent.createAgentForController(false), nil) defer stub.Reset() tests := []struct { want *Controller @@ -197,7 +198,7 @@ func TestNewController(t *testing.T) { { name: "normal situation,return controller instance", want: &Controller{ - businessAgent: createAgentForController(false), + businessAgent: agent.createAgentForController(false), }, }, } @@ -210,7 +211,7 @@ func TestNewController(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := NewController(mockK8s, nil, config, mockInformer, make(chan struct{})) + got := NewController(mockK8s, nil, config, mockInformer, nil, make(chan struct{})) if !reflect.DeepEqual(got.businessAgent, tt.want.businessAgent) { t.Errorf("NewController() = %v, want %v", got, tt.want) } @@ -248,11 +249,11 @@ func TestController_Run(t *testing.T) { jobsSynced: func() bool { return true }, - jobsIndexer: nil, - businessAgent: createAgent(false), - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Jobs"), - recorder: nil, - workAgentInterface: createAgent(false), + jobsIndexer: nil, + businessAgent: agent.createAgent(false), + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Jobs"), + recorder: nil, + agent: agent.createAgent(false), } go func() { time.Sleep(1 * time.Second) @@ -274,7 +275,7 @@ func TestController_enqueueJob(t *testing.T) { }{ { name: "test1: Jod be added to queue", - obj: mockJob(), + obj: agent.mockJob(), eventType: "add", }, } @@ -291,14 +292,14 @@ func TestController_enqueueJob(t *testing.T) { func mockController() *Controller { return &Controller{ - kubeclientset: nil, - jobclientset: nil, - jobsSynced: nil, - jobsIndexer: nil, - businessAgent: createAgent(false), - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Jobs"), - recorder: nil, - workAgentInterface: createAgent(false), + kubeclientset: nil, + jobclientset: nil, + jobsSynced: nil, + jobsIndexer: nil, + businessAgent: agent.createAgent(false), + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Jobs"), + recorder: nil, + agent: agent.createAgent(false), } } @@ -306,7 +307,7 @@ func mockController() *Controller { func TestController_processNextWorkItem(t *testing.T) { ctrl := gomock.NewController(t) mockIndexr := mock_cache.NewMockIndexer(ctrl) - mockIndexr.EXPECT().GetByKey(gomock.Any()).Return(mockJob(), true, nil).Times(four) + mockIndexr.EXPECT().GetByKey(gomock.Any()).Return(agent.mockJob(), true, nil).Times(four) var controllers []*Controller contrl := mockController() contrl.workqueue.AddRateLimited("vcjob/testpod/delete") diff --git a/pkg/ring-controller/controller/types.go b/pkg/ring-controller/controller/types.go new file mode 100644 index 0000000000000000000000000000000000000000..f9a3f8e857af76aae6388e5fbbfeca2f5e5599b8 --- /dev/null +++ b/pkg/ring-controller/controller/types.go @@ -0,0 +1,82 @@ +/* + * Copyright(C) 2020. Huawei Technologies Co.,Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package controller for controller +package controller + +import ( + "hccl-controller/pkg/ring-controller/agent" + v1 "k8s.io/client-go/informers/apps/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + clientset "volcano.sh/volcano/pkg/client/clientset/versioned" + v1alpha1informers "volcano.sh/volcano/pkg/client/informers/externalversions/batch/v1alpha1" +) + +const ( + controllerName = "ring-controller" + // L1 log level 1 + L1 = 1 + // L2 log level 2 + L2 = 2 + + // L4 log level 4 + L4 = 4 + decimal = 10 + two = 2 + three = 3 + four = 4 + status = 200 + oneMinitue = 60 +) + +// Controller initialize business agent +type Controller struct { + // component for recycle resources + agent *agent.BusinessAgent + + cacheIndexers map[string]cache.Indexer + // kubeclientset is a standard kubernetes clientset + kubeclientset kubernetes.Interface + + // jobclientset is a clientset for volcano job + jobclientset clientset.Interface + + // component for resource batch/v1alpha1/Job + jobsSynced cache.InformerSynced + deploySynced cache.InformerSynced + // workqueue is a rate limited work queue. This is used to queue work to be + // processed instead of performing it as soon as a change happens. This + // means we can ensure we only process a fixed amount of resources at a + // time, and makes it easy to ensure we are never processing the same item + // simultaneously in two different workers. + workqueue workqueue.RateLimitingInterface + // recorder is an event recorder for recording Event resources to the + // Kubernetes API. + recorder record.EventRecorder +} + +// InformerInfo : Defining what the Controller will use +type InformerInfo struct { + // CacheIndexers : to store different type cache index + CacheIndexers map[string]cache.Indexer + // JobInformer : vcjob type informer + JobInformer v1alpha1informers.JobInformer + // DeployInformer : deployment type informer + DeployInformer v1.DeploymentInformer +} diff --git a/pkg/ring-controller/model/deployment.go b/pkg/ring-controller/model/deployment.go new file mode 100644 index 0000000000000000000000000000000000000000..f288862a432f9b3d5ea477b5e684e4bda32df120 --- /dev/null +++ b/pkg/ring-controller/model/deployment.go @@ -0,0 +1,105 @@ +/* + * Copyright(C) 2021. Huawei Technologies Co.,Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package model + +import ( + agent2 "hccl-controller/pkg/ring-controller/agent" + v1 "hccl-controller/pkg/ring-controller/ranktable/v1" + "k8s.io/klog" + "strconv" +) + +// GetReplicas : to return the replicas in deployment. +func (deploy *DeployModel) GetReplicas() string { + return strconv.Itoa(int(deploy.replicas)) +} + +// EventAdd : to handle deployment add event +func (deploy *DeployModel) EventAdd(agent *agent2.BusinessAgent) error { + // check if job's corresponding configmap is created successfully via volcano controller + cm, err := checkCMCreation(deploy.DeployNamespace, deploy.DeployName, agent.KubeClientSet, agent.Config) + if err != nil { + return err + } + + // retrieve configmap data + jobStartString := cm.Data[agent2.ConfigmapKey] + klog.V(L4).Info("jobstarting==>", jobStartString) + + ranktable, replicasTotal, err := RanktableFactory(deploy, jobStartString, agent2.JSONVersion) + if err != nil { + return err + } + deploymentWorker := agent2.NewDeploymentWorker(agent, deploy.DeployInfo, ranktable, replicasTotal) + + // create a business worker for current deployment + agent.RwMutex.Lock() + defer agent.RwMutex.Unlock() + + klog.V(L2).Infof("create business worker for %s/%s", deploy.DeployNamespace, deploy.DeployName) + _, exist := agent.BusinessWorker[deploy.DeployNamespace+"/"+deploy.DeployName] + if exist { + klog.V(L2).Infof("business worker for %s/%s is already existed", deploy.DeployNamespace, deploy.DeployName) + return nil + } + + // start to report rank table build statistic for current deployment + if agent.Config.DisplayStatistic { + go deploymentWorker.Statistic(BuildStatInterval) + } + + // save current business worker + agent.BusinessWorker[deploy.DeployNamespace+"/"+deploy.DeployName] = deploymentWorker + return nil +} + +// EventUpdate : to handle deployment update event +func (deploy *DeployModel) EventUpdate(agent *agent2.BusinessAgent) error { + agent.RwMutex.RLock() + _, exist := agent.BusinessWorker[deploy.DeployNamespace+"/"+deploy.DeployName] + agent.RwMutex.RUnlock() + if !exist { + // for pod update, the version will be incorrect + err := deploy.EventAdd(agent) + if err != nil { + return err + } + } + return nil +} + +// GenerateGrouplist to create GroupList. in ranktable v1 will use it. +func (deploy *DeployModel) GenerateGrouplist() ([]*v1.Group, int32, error) { + var groupList []*v1.Group + var deviceTotal int32 + + for _, container := range deploy.containers { + quantity, exist := container.Resources.Limits[agent2.ResourceName] + quantityValue := int32(quantity.Value()) + if exist && quantityValue > 0 { + deviceTotal += quantityValue + } + } + deviceTotal *= deploy.replicas + + var instanceList []*v1.Instance + group := v1.Group{GroupName: deploy.DeployName, DeviceCount: strconv.FormatInt(int64(deviceTotal), decimal), + InstanceCount: strconv.FormatInt(int64(deploy.replicas), decimal), InstanceList: instanceList} + groupList = append(groupList, &group) + + return groupList, deploy.replicas, nil +} diff --git a/pkg/ring-controller/model/types.go b/pkg/ring-controller/model/types.go new file mode 100644 index 0000000000000000000000000000000000000000..d9c15f9e67a151bc06bd37d0b41f26c197197aa8 --- /dev/null +++ b/pkg/ring-controller/model/types.go @@ -0,0 +1,67 @@ +/* + * Copyright(C) 2021. Huawei Technologies Co.,Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package model + +import ( + "hccl-controller/pkg/ring-controller/agent" + apiCorev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" + "time" + v1alpha1apis "volcano.sh/volcano/pkg/apis/batch/v1alpha1" +) + +const ( + + // L2 log level 2 + L2 = 2 + // L3 log level 3 + L3 = 3 + // L4 log level 4 + L4 = 4 + + decimal = 10 + // VCJobType To determine the type of listening:vcjob. + VCJobType = "vcjob" + // DeploymentType To determine the type of listening:deployment. + DeploymentType = "deployment" + // JobRestartPhase restart flage + JobRestartPhase = "Restarting" + + // BuildStatInterval 1 * time.Minute + BuildStatInterval = 30 * time.Second +) + +type modelCommon struct { + key string + cacheIndexer cache.Indexer +} + +// VCJobModel : to handle vcjob type +type VCJobModel struct { + modelCommon + agent.JobInfo + jobPhase string + taskSpec []v1alpha1apis.TaskSpec +} + +// DeployModel : to handle deployment type +type DeployModel struct { + modelCommon + agent.DeployInfo + replicas int32 + containers []apiCorev1.Container +} diff --git a/pkg/ring-controller/model/vcjob.go b/pkg/ring-controller/model/vcjob.go new file mode 100644 index 0000000000000000000000000000000000000000..2540c97a3df7280c75156138bc19560adf9d1d0d --- /dev/null +++ b/pkg/ring-controller/model/vcjob.go @@ -0,0 +1,237 @@ +/* + * Copyright(C) 2021. Huawei Technologies Co.,Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package model + +import ( + "fmt" + agent2 "hccl-controller/pkg/ring-controller/agent" + v1 "hccl-controller/pkg/ring-controller/ranktable/v1" + v2 "hccl-controller/pkg/ring-controller/ranktable/v2" + appsV1 "k8s.io/api/apps/v1" + apiCoreV1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/klog" + "strconv" + "time" + v1alpha1apis "volcano.sh/volcano/pkg/apis/batch/v1alpha1" +) + +// ResourceEventHandler to define same func, controller to use this function to finish some thing. +type ResourceEventHandler interface { + EventAdd(tagentInterface *agent2.BusinessAgent) error + EventUpdate(tagentInterface *agent2.BusinessAgent) error + GenerateGrouplist() ([]*v1.Group, int32, error) + GetReplicas() string + GetCacheIndex() cache.Indexer + GetModelKey() string +} + +// GetModelKey: return model key. +func (model *modelCommon) GetModelKey() string { + return model.key +} + +// GetCacheIndex: return CacheIndex +func (model *modelCommon) GetCacheIndex() cache.Indexer { + return model.cacheIndexer +} + +// GetReplicas : return vcjob replicas +func (job *VCJobModel) GetReplicas() string { + return strconv.Itoa(len(job.taskSpec)) +} + +// EventAdd to handle vcjob add event +func (job *VCJobModel) EventAdd(agent *agent2.BusinessAgent) error { + + agent.RwMutex.RLock() + klog.V(L2).Infof("create business worker for %s/%s", job.JobNamespace, job.JobName) + _, exist := agent.BusinessWorker[job.JobNamespace+"/"+job.JobName] + if exist { + agent.RwMutex.RUnlock() + klog.V(L2).Infof("business worker for %s/%s is already existed", job.JobNamespace, job.JobName) + return nil + } + agent.RwMutex.RUnlock() + + // check if job's corresponding configmap is created successfully via volcano controller + cm, err := checkCMCreation(job.JobNamespace, job.JobName, agent.KubeClientSet, agent.Config) + if err != nil { + return err + } + + // retrieve configmap data + jobStartString := cm.Data[agent2.ConfigmapKey] + klog.V(L3).Info("jobstarting==>", jobStartString) + + ranktable, replicasTotal, err := RanktableFactory(job, jobStartString, agent2.JSONVersion) + if err != nil { + return err + } + jobWorker := agent2.NewVCJobWorker(agent, job.JobInfo, ranktable, replicasTotal) + + // create a business worker for current job + agent.RwMutex.Lock() + defer agent.RwMutex.Unlock() + + // start to report rank table build statistic for current job + if agent.Config.DisplayStatistic { + go jobWorker.Statistic(BuildStatInterval) + } + + // save current business worker + agent.BusinessWorker[job.JobNamespace+"/"+job.JobName] = jobWorker + return nil +} + +// EventUpdate : to handle vcjob update event +func (job *VCJobModel) EventUpdate(agent *agent2.BusinessAgent) error { + if job.jobPhase == JobRestartPhase { + agent2.DeleteWorker(job.JobNamespace, job.JobName, agent) + return nil + } + agent.RwMutex.RLock() + _, exist := agent.BusinessWorker[job.JobNamespace+"/"+job.JobName] + agent.RwMutex.RUnlock() + if !exist { + // for job update, if create business worker at job restart phase, the version will be incorrect + err := job.EventAdd(agent) + if err != nil { + return err + } + } + return nil +} + +// GenerateGrouplist : to generate GroupList, ranktable v1 will use it. +func (job *VCJobModel) GenerateGrouplist() ([]*v1.Group, int32, error) { + var replicasTotal int32 + var groupList []*v1.Group + for _, taskSpec := range job.taskSpec { + var deviceTotal int32 + + for _, container := range taskSpec.Template.Spec.Containers { + quantity, exist := container.Resources.Limits[agent2.ResourceName] + quantityValue := int32(quantity.Value()) + if exist && quantityValue > 0 { + deviceTotal += quantityValue + } + } + deviceTotal *= taskSpec.Replicas + + var instanceList []*v1.Instance + group := v1.Group{GroupName: taskSpec.Name, DeviceCount: strconv.FormatInt(int64(deviceTotal), decimal), + InstanceCount: strconv.FormatInt(int64(taskSpec.Replicas), decimal), InstanceList: instanceList} + groupList = append(groupList, &group) + replicasTotal += taskSpec.Replicas + } + return groupList, replicasTotal, nil +} + +// checkCMCreation check configmap +func checkCMCreation(namespace, name string, kubeClientSet kubernetes.Interface, config *agent2.Config) ( + *apiCoreV1.ConfigMap, error) { + var cm *apiCoreV1.ConfigMap + err := wait.PollImmediate(time.Duration(config.CmCheckTimeout)*time.Second, + time.Duration(config.CmCheckTimeout)*time.Second, + func() (bool, error) { + var errTmp error + + cm, errTmp = kubeClientSet.CoreV1().ConfigMaps(namespace). + Get(fmt.Sprintf("%s-%s", + agent2.ConfigmapPrefix, name), metav1.GetOptions{}) + if errTmp != nil { + if errors.IsNotFound(errTmp) { + return false, nil + } + return true, fmt.Errorf("get configmap error: %v", errTmp) + } + return true, nil + }) + if err != nil { + return nil, fmt.Errorf("failed to get configmap for job %s/%s: %v", namespace, name, err) + } + label910, exist := (*cm).Labels[agent2.Key910] + if !exist || (exist && label910 != agent2.Val910) { + return nil, fmt.Errorf("invalid configmap label" + label910) + } + + return cm, nil +} + +// Factory to generate model +func Factory(obj interface{}, eventType string, indexers map[string]cache.Indexer) (ResourceEventHandler, error) { + metaData, err := meta.Accessor(obj) + if err != nil { + return nil, fmt.Errorf("object has no meta: %v", err) + } + key := metaData.GetName() + "/" + eventType + if len(metaData.GetNamespace()) > 0 { + key = metaData.GetNamespace() + "/" + metaData.GetName() + "/" + eventType + } + var model ResourceEventHandler + switch t := obj.(type) { + case *v1alpha1apis.Job: + model = &VCJobModel{modelCommon: modelCommon{key: key, cacheIndexer: indexers[VCJobType]}, + JobInfo: agent2.JobInfo{JobUID: string(t.UID), JobVersion: t.Status.Version, + JobCreationTimestamp: t.CreationTimestamp, JobNamespace: t.Namespace, JobName: t.Name}, + jobPhase: string(t.Status.State.Phase), taskSpec: t.Spec.Tasks} + case *appsV1.Deployment: + model = &DeployModel{modelCommon: modelCommon{key: key, cacheIndexer: indexers[DeploymentType]}, + containers: t.Spec.Template.Spec.Containers, replicas: *t.Spec.Replicas, + DeployInfo: agent2.DeployInfo{DeployNamespace: t.Namespace, DeployName: t.Name, + DeployCreationTimestamp: t.CreationTimestamp}} + default: + return nil, fmt.Errorf("job factory err, %s ", key) + } + + return model, nil +} + +// RanktableFactory : return the version type of ranktable according to your input parameters +func RanktableFactory(model ResourceEventHandler, jobStartString, JSONVersion string) (v1.RankTabler, int32, error) { + var ranktable v1.RankTabler + groupList, replicasTotal, err := model.GenerateGrouplist() + if err != nil { + return nil, 0, fmt.Errorf("generate group list from job error: %v", err) + } + + if JSONVersion == "v1" { + var configmapData v1.RankTable + err = configmapData.UnmarshalToRankTable(jobStartString) + if err != nil { + return nil, 0, err + } + ranktable = &v1.RankTable{RankTableStatus: v1.RankTableStatus{Status: agent2.ConfigmapInitializing}, + GroupCount: model.GetReplicas(), GroupList: groupList} + } else { + var configmapData v2.RankTable + err = configmapData.UnmarshalToRankTable(jobStartString) + if err != nil { + return nil, 0, err + } + var serverList []*v2.Server + ranktable = &v2.RankTable{ServerCount: strconv.Itoa(len(serverList)), ServerList: serverList, + RankTableStatus: v1.RankTableStatus{Status: agent2.ConfigmapInitializing}, Version: "1.0"} + } + return ranktable, replicasTotal, nil +} diff --git a/pkg/ring-controller/controller/ranktable.go b/pkg/ring-controller/ranktable/v1/ranktable.go similarity index 31% rename from pkg/ring-controller/controller/ranktable.go rename to pkg/ring-controller/ranktable/v1/ranktable.go index a18f12b7db0d4b2fd8b335265b4382cad5ac09e3..8137f2ef92c3b9862a57e055e362f9111744cfa8 100644 --- a/pkg/ring-controller/controller/ranktable.go +++ b/pkg/ring-controller/ranktable/v1/ranktable.go @@ -1,5 +1,5 @@ /* - * Copyright(C) 2020. Huawei Technologies Co.,Ltd. All rights reserved. + * Copyright(C) 2021. Huawei Technologies Co.,Ltd. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,84 +14,43 @@ * limitations under the License. */ -// Package controller for controller -package controller +package v1 import ( "encoding/json" "fmt" - "k8s.io/klog" - "strconv" - "strings" apiCoreV1 "k8s.io/api/core/v1" + "k8s.io/klog" ) -// RankTable interface to maintain properties -type RankTable interface { - unmarshalToRankTable(jsonString string) error - cachePodInfo(pod *apiCoreV1.Pod, deviceInfo string) error - removePodInfo(namespace string, name string) error - setStatus(status string) error - getStatus() string -} - -// RankTableStatus to hccl -type RankTableStatus struct { - Status string `json:"status"` // get hccl_json status -} - -// RankTableV1 to hccl -type RankTableV1 struct { - RankTableStatus - GroupList []*Group `json:"group_list"` // hccl group list - GroupCount string `json:"group_count, string"` // hccl_json grouoCount -} - -// Group to hccl -type Group struct { - InstanceList []*Instance `json:"instance_list"` // hccl InstaceList - GroupName string `json:"group_name"` // hccl GroupName - DeviceCount string `json:"device_count, string"` // hccl Devicecount - InstanceCount string `json:"instance_count, string"` // hccl Instance Count -} - -// Instance to hccl -type Instance struct { - Devices []Device `json:"devices"` // hccl Deviceid - PodName string `json:"pod_name"` // hccl PodName - ServerID string `json:"server_id"` // hccl servceId -} - -// Device to hccl -type Device struct { - DeviceID string `json:"device_id"` // hccl deviceId - DeviceIP string `json:"device_ip"` // hccl deviceid -} - -// RankTableV2 to hccl -type RankTableV2 struct { - RankTableStatus - ServerList []*Server `json:"server_list"` // hccl_json server list - ServerCount string `json:"server_count"` // hccl_json server count - Version string `json:"version"` // hccl_json version -} - -// Server to hccl -type Server struct { - DeviceList []*DeviceV2 `json:"device"` // device list in each server - ServerID string `json:"server_id"` // server id, represented by ip address - PodID string `json:"-"` // pod id, equal to the last integer of pod name +// RankTabler interface to maintain properties +type RankTabler interface { + // UnmarshalToRankTable:Unmarshal json string to RankTable + UnmarshalToRankTable(jsonString string) error + // CachePodInfo: cache pod info to RankTableV1 + CachePodInfo(pod *apiCoreV1.Pod, deviceInfo string, rankIndex *int) error + // RemovePodInfo : Remove pod info from RankTable + RemovePodInfo(namespace string, name string) error + // SetStatus Set status of RankTableStatus + SetStatus(status string) error + // GetStatus: Get status of RankTableStatus + GetStatus() string +} + +// SetStatus Set status of RankTableStatus +func (r *RankTableStatus) SetStatus(status string) error { + r.Status = status + return nil } -// DeviceV2 to hccl -type DeviceV2 struct { - Device - RankID string `json:"rank_id"` // rank id +// GetStatus : Get status of RankTableStatus +func (r *RankTableStatus) GetStatus() string { + return r.Status } -// Unmarshal json string to RankTable -func (r *RankTableStatus) unmarshalToRankTable(jsonString string) error { +// UnmarshalToRankTable : Unmarshal json string to RankTable +func (r *RankTableStatus) UnmarshalToRankTable(jsonString string) error { err := json.Unmarshal([]byte(jsonString), &r) if err != nil { return fmt.Errorf("parse configmap data error: %v", err) @@ -102,15 +61,12 @@ func (r *RankTableStatus) unmarshalToRankTable(jsonString string) error { return nil } -// Cache pod info to RankTableV1 -func (r *RankTableV1) cachePodInfo(pod *apiCoreV1.Pod, deviceInfo string) error { +// CachePodInfo : cache pod info to RankTableV1 +func (r *RankTable) CachePodInfo(pod *apiCoreV1.Pod, deviceInfo string, rankIndex *int) error { if len(r.GroupList) < 1 { return fmt.Errorf("grouplist of ranktable is empty") } group := r.GroupList[0] - if group.GroupName != pod.Annotations[PodGroupKey] { - return nil - } done := checkPodCache(group, pod) if done { return nil @@ -126,50 +82,13 @@ func (r *RankTableV1) cachePodInfo(pod *apiCoreV1.Pod, deviceInfo string) error } group.InstanceList = append(group.InstanceList, &instance) - + *rankIndex++ return nil } -// Cache pod info to RankTableV2 -func (r *RankTableV2) cachePodInfo(pod *apiCoreV1.Pod, deviceInfo string) error { - var instance Instance - var server Server - - if err := json.Unmarshal([]byte(deviceInfo), &instance); err != nil { - return fmt.Errorf("parse annotation of pod %s/%s error: %v", pod.Namespace, pod.Name, err) - } - rankFactor := len(instance.Devices) - - // Build new server-level struct from device info - server.ServerID = instance.ServerID - server.PodID = instance.PodName - podID, err := strconv.Atoi(server.PodID) - if err != nil { - return fmt.Errorf("parse name of pod %s/%s error: %v", pod.Namespace, pod.Name, err) - } - - for _, device := range instance.Devices { - var serverDevice DeviceV2 - serverDevice.DeviceID = device.DeviceID - serverDevice.DeviceIP = device.DeviceIP - serverDevice.RankID = strconv.Itoa(podID*rankFactor + len(server.DeviceList)) - - server.DeviceList = append(server.DeviceList, &serverDevice) - } - - r.ServerList = append(r.ServerList, &server) - r.ServerCount = strconv.Itoa(len(r.ServerList)) - - return nil -} - -// Remove pod info from RankTableV1 -func (r *RankTableV1) removePodInfo(namespace string, name string) error { +// RemovePodInfo : Remove pod info from RankTable +func (r *RankTable) RemovePodInfo(namespace string, podID string) error { hasInfoToRemove := false - - // Get last bit of pod name as podID - splited := strings.Split(name, "-") - podID := splited[len(splited)-1] for _, group := range r.GroupList { for idx, instance := range group.InstanceList { // current pod's info is already cached, start to remove @@ -186,47 +105,19 @@ func (r *RankTableV1) removePodInfo(namespace string, name string) error { } } if !hasInfoToRemove { - klog.V(L3).Infof("no data of pod %s/%s can be removed", namespace, name) - return nil + return fmt.Errorf("no data of pod %s/%s can be removed", namespace, podID) } return nil } -// Remove pod info from RankTableV2 -func (r *RankTableV2) removePodInfo(namespace string, name string) error { - hasInfoToRemove := false - - // Get last bit of pod name as podID - splited := strings.Split(name, "-") - podID := splited[len(splited)-1] - serverList := r.ServerList - for idx, server := range serverList { - if server.PodID == podID { - length := len(serverList) - serverList[idx] = serverList[length-1] - serverList = serverList[:length-1] - hasInfoToRemove = true - break +func checkPodCache(group *Group, pod *apiCoreV1.Pod) bool { + for _, instance := range group.InstanceList { + if instance.PodName == pod.Name { + klog.V(L3).Infof("ANOMALY: pod %s/%s is already cached", pod.Namespace, + pod.Name) + return true } } - - if !hasInfoToRemove { - klog.V(L3).Infof("no data of pod %s/%s can be removed", namespace, name) - return nil - } - r.ServerCount = strconv.Itoa(len(r.ServerList)) - - return nil -} - -// Set status of RankTableStatus -func (r *RankTableStatus) setStatus(status string) error { - r.Status = status - return nil -} - -// Get status of RankTableStatus -func (r *RankTableStatus) getStatus() string { - return r.Status + return false } diff --git a/pkg/ring-controller/controller/type.go b/pkg/ring-controller/ranktable/v1/types.go similarity index 41% rename from pkg/ring-controller/controller/type.go rename to pkg/ring-controller/ranktable/v1/types.go index bc8f131e3553f7242036820b3332d49e02fa8c30..2acb7d2f3550c50966ed6178b3cef6854d81d8d5 100644 --- a/pkg/ring-controller/controller/type.go +++ b/pkg/ring-controller/ranktable/v1/types.go @@ -1,5 +1,5 @@ /* - * Copyright(C) 2020. Huawei Technologies Co.,Ltd. All rights reserved. + * Copyright(C) 2021. Huawei Technologies Co.,Ltd. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,48 +14,23 @@ * limitations under the License. */ -// Package controller for controller -package controller - -import ( - "time" -) +package v1 const ( - // Key910 to get Configmap - Key910 = "ring-controller.atlas" - // Val910 to get Configmap - Val910 = "ascend-910" // Val910 to get Configmap - // ResourceName for 910 - ResourceName = "huawei.com/Ascend910" - controllerName = "ring-controller" - // ConfigmapPrefix to get from configmap - ConfigmapPrefix = "rings-config" + // ResourceName NPU resource Name + ResourceName = "huawei.com/Ascend910" // ConfigmapCompleted Staus ConfigmapCompleted = "completed" // ConfigmapInitializing status ConfigmapInitializing = "initializing" - // ConfigmapKey configmap Data Name - ConfigmapKey = "hccl.json" - // VolcanoJobNameKey to get job name - VolcanoJobNameKey = "volcano.sh/job-name" - // PodJobVersion to get job version - PodJobVersion = "volcano.sh/job-version" - // PodDeviceKey Pod annoation Key - PodDeviceKey = "ascend.kubectl.kubernetes.io/ascend-910-configuration" - // PodGroupKey to get Group key - PodGroupKey = "volcano.sh/task-spec" - // JobRestartPhase restart flage - JobRestartPhase = "Restarting" + // EventAdd event add EventAdd = "add" // EventUpdate event to update EventUpdate = "update" // EventDelete event to delete EventDelete = "delete" - // BuildStatInterval 1 * time.Minute - BuildStatInterval = 30 * time.Second - serverIP = "serverIp" + // L1 log level 1 L1 = 1 // L2 log level 2 @@ -63,21 +38,38 @@ const ( // L3 log level 3 L3 = 3 // L4 log level 4 - L4 = 4 - retryMilliSecond = 5 - threeMinutes = 180 - splitNum = 4 - decimal = 10 - two = 2 - twosecond = 2 * time.Second - three = 3 - four = 4 - eight = 8 - status = 200 - oneMinitue = 60 + L4 = 4 ) -var ( - // JSONVersion of hccl.json - JSONVersion = "v2" -) +// RankTableStatus to hccl +type RankTableStatus struct { + Status string `json:"status"` // get hccl_json status +} + +// RankTable to hccl +type RankTable struct { + RankTableStatus + GroupList []*Group `json:"group_list"` // hccl group list + GroupCount string `json:"group_count, string"` // hccl_json grouoCount +} + +// Group to hccl +type Group struct { + InstanceList []*Instance `json:"instance_list"` // hccl InstaceList + GroupName string `json:"group_name"` // hccl GroupName + DeviceCount string `json:"device_count, string"` // hccl Devicecount + InstanceCount string `json:"instance_count, string"` // hccl Instance Count +} + +// Instance to hccl +type Instance struct { + Devices []Device `json:"devices"` // hccl Device + PodName string `json:"pod_name"` // hccl PodName + ServerID string `json:"server_id"` // hccl servceId +} + +// Device to hccl +type Device struct { + DeviceID string `json:"device_id"` // hccl deviceId + DeviceIP string `json:"device_ip"` // hccl deviceIp +} diff --git a/pkg/ring-controller/ranktable/v2/ranktable.go b/pkg/ring-controller/ranktable/v2/ranktable.go new file mode 100644 index 0000000000000000000000000000000000000000..389a93cc8362e19e0d51bd3299d794089a7d474a --- /dev/null +++ b/pkg/ring-controller/ranktable/v2/ranktable.go @@ -0,0 +1,76 @@ +/* + * Copyright(C) 2021. Huawei Technologies Co.,Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package v2 + +import ( + "encoding/json" + "fmt" + v1 "hccl-controller/pkg/ring-controller/ranktable/v1" + apiCoreV1 "k8s.io/api/core/v1" + "strconv" +) + +// CachePodInfo :Cache pod info to RankTableV2 +func (r *RankTable) CachePodInfo(pod *apiCoreV1.Pod, deviceInfo string, rankIndex *int) error { + var instance v1.Instance + var server Server + + if err := json.Unmarshal([]byte(deviceInfo), &instance); err != nil { + return fmt.Errorf("parse annotation of pod %s/%s error: %v", pod.Namespace, pod.Name, err) + } + rankFactor := len(instance.Devices) + + // Build new server-level struct from device info + server.ServerID = instance.ServerID + server.PodID = instance.PodName + + for _, device := range instance.Devices { + var serverDevice Device + serverDevice.DeviceID = device.DeviceID + serverDevice.DeviceIP = device.DeviceIP + serverDevice.RankID = strconv.Itoa(*rankIndex*rankFactor + len(server.DeviceList)) + + server.DeviceList = append(server.DeviceList, &serverDevice) + } + + r.ServerList = append(r.ServerList, &server) + r.ServerCount = strconv.Itoa(len(r.ServerList)) + *rankIndex++ + return nil +} + +// RemovePodInfo :Remove pod info from RankTableV2 +func (r *RankTable) RemovePodInfo(namespace string, podID string) error { + hasInfoToRemove := false + serverList := r.ServerList + for idx, server := range serverList { + if server.PodID == podID { + length := len(serverList) + serverList[idx] = serverList[length-1] + serverList = serverList[:length-1] + hasInfoToRemove = true + break + } + } + + if !hasInfoToRemove { + return fmt.Errorf("no data of pod %s/%s can be removed", namespace, podID) + } + r.ServerCount = strconv.Itoa(len(r.ServerList)) + + return nil +} diff --git a/pkg/ring-controller/controller/agent_interface.go b/pkg/ring-controller/ranktable/v2/types.go similarity index 41% rename from pkg/ring-controller/controller/agent_interface.go rename to pkg/ring-controller/ranktable/v2/types.go index 62f0badb47072646b3550791748cc4df4737e272..8281ef89906a5e7fd8c6f7ec3adbaa6d657ee122 100644 --- a/pkg/ring-controller/controller/agent_interface.go +++ b/pkg/ring-controller/ranktable/v2/types.go @@ -1,5 +1,5 @@ /* - * Copyright(C) 2020. Huawei Technologies Co.,Ltd. All rights reserved. + * Copyright(C) 2021. Huawei Technologies Co.,Ltd. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,18 +14,27 @@ * limitations under the License. */ -// Package controller for run the logic -package controller +package v2 -import ( - v1 "k8s.io/api/core/v1" - "volcano.sh/volcano/pkg/apis/batch/v1alpha1" -) +import v1 "hccl-controller/pkg/ring-controller/ranktable/v1" -// WorkAgentInterface businesswork interface -type WorkAgentInterface interface { - CheckConfigmapCreation(job *v1alpha1.Job) (*v1.ConfigMap, error) - CreateBusinessWorker(job *v1alpha1.Job, ranktable RankTable, replicasTotal int32) error - DeleteBusinessWorker(namespace string, name string) error - IsBusinessWorkerExist(namespace string, name string) bool +// RankTable : ranktable of v2 +type RankTable struct { + v1.RankTableStatus + ServerList []*Server `json:"server_list"` // hccl_json server list + ServerCount string `json:"server_count"` // hccl_json server count + Version string `json:"version"` // hccl_json version +} + +// Server to hccl +type Server struct { + DeviceList []*Device `json:"device"` // device list in each server + ServerID string `json:"server_id"` // server id, represented by ip address + PodID string `json:"-"` // pod id, equal to the last integer of pod name +} + +// Device to hccl +type Device struct { + v1.Device + RankID string `json:"rank_id"` // rank id }