diff --git a/pkg/ring-controller/agent/businessagent.go b/pkg/ring-controller/agent/businessagent.go index 61b33dcebf32a8c8e3af50f4a1c2ca72bdb57f43..ec9756bd99385916c7b8b0d3fc083535780fadd3 100644 --- a/pkg/ring-controller/agent/businessagent.go +++ b/pkg/ring-controller/agent/businessagent.go @@ -42,7 +42,8 @@ import ( // String to return podIdentifier string style : // namespace:%s,name:%s,jobName:%s,eventType:%s func (p *podIdentifier) String() string { - return fmt.Sprintf("namespace:%s,name:%s,jobName:%s,eventType:%s", p.namespace, p.name, p.jobName, p.eventType) + return fmt.Sprintf("namespace:%s,name:%s,jobName:%s,jobUid: %s, eventType:%s", p.namespace, + p.name, p.jobName, p.jobUid, p.eventType) } // NewBusinessAgent to create a agent. Agent is a framework, all types of workers can be diff --git a/pkg/ring-controller/agent/types.go b/pkg/ring-controller/agent/types.go index e48bfbefb9c4fdf87294e5a8e54da153656e2f12..c754bda09a7a1e15fcf24e22f9adb8cfcafb5b00 100644 --- a/pkg/ring-controller/agent/types.go +++ b/pkg/ring-controller/agent/types.go @@ -45,6 +45,8 @@ const ( ConfigmapInitializing = "initializing" // ConfigmapKey configmap Data Name ConfigmapKey = "hccl.json" + // ConfigmapVersion hccl.json version Name + ConfigmapVersion = "version" // VolcanoJobNameKey to get job name VolcanoJobNameKey = "volcano.sh/job-name" // VolcanoJobNamespaceKey to get job namespace @@ -198,6 +200,7 @@ type WorkerInfo struct { cachedPods *sync.Map cachedPodNum int32 taskReplicasTotal int32 + resourceVersion int } // DeployInfo : deployment Worker info diff --git a/pkg/ring-controller/agent/vcjobworker.go b/pkg/ring-controller/agent/vcjobworker.go index 11e6ead89b140fff064266ec3ed424e1b43a49fd..c9c820665f48ab200c50667fc3b7b76ed66f18ec 100644 --- a/pkg/ring-controller/agent/vcjobworker.go +++ b/pkg/ring-controller/agent/vcjobworker.go @@ -47,7 +47,7 @@ type Worker interface { // NewVCJobWorker : Generates a Worker that handles the VCJob type func NewVCJobWorker(agent *BusinessAgent, job JobInfo, ranktable ranktablev1.RankTabler, - replicasTotal int32) *VCJobWorker { + replicasTotal int32, version int) *VCJobWorker { jobWorker := &VCJobWorker{ WorkerInfo: WorkerInfo{ kubeclientset: agent.KubeClientSet, @@ -63,6 +63,7 @@ func NewVCJobWorker(agent *BusinessAgent, job JobInfo, ranktable ranktablev1.Ran taskReplicasTotal: replicasTotal, cachedPods: &sync.Map{}, cachedIndex: newCachedIndex(int(replicasTotal)), + resourceVersion: version, }, JobInfo: job, } @@ -434,7 +435,7 @@ func (b *WorkerInfo) handleDeleteEvent(podInfo *podIdentifier) error { err := b.configmapData.RemovePodInfo(podInfo.namespace, podInfo.uid) if err != nil { - return err + hwlog.RunLog.Warn(err) } hwlog.RunLog.Infof("start to remove data of pod %s/%s", podInfo.namespace, podInfo.name) @@ -468,9 +469,11 @@ func (b *WorkerInfo) handleDeleteEvent(podInfo *podIdentifier) error { func (b *WorkerInfo) endRankTableConstruction(namespace string) error { b.configmapData.SetStatus(ConfigmapCompleted) b.configmapData.BeforeUpdate() + b.resourceVersion++ hwlog.RunLog.Infof("job is ready, start to update configmap(%s/%s) to completed", namespace, b.configmapName) if err := updateConfigMap(b, namespace); err != nil { hwlog.RunLog.Error("update configmap failed") + b.resourceVersion-- return err } @@ -531,11 +534,12 @@ func updateConfigMap(w *WorkerInfo, namespace string) error { return fmt.Errorf("marshal configmap data error: %v", err) } cm.Data[ConfigmapKey] = string(dataByteArray[:]) + cm.Data[ConfigmapVersion] = strconv.Itoa(w.resourceVersion) if _, err = w.kubeclientset.CoreV1().ConfigMaps(namespace).Update(context.TODO(), cm, metav1.UpdateOptions{}); err != nil { return fmt.Errorf("failed to update ConfigMap for Job %v", err) } - hwlog.RunLog.Debugf("new cm ranktable %s", cm.Data[ConfigmapKey]) + hwlog.RunLog.Debugf("new cm ranktable %s, version: %d", cm.Data[ConfigmapKey], w.resourceVersion) return nil } diff --git a/pkg/ring-controller/model/vcjob.go b/pkg/ring-controller/model/vcjob.go index d916875cece8422011ad76c44d57a9c97010c32d..da0f56cfd85bf42313e4fd068840d79ce12fc5f0 100644 --- a/pkg/ring-controller/model/vcjob.go +++ b/pkg/ring-controller/model/vcjob.go @@ -73,7 +73,6 @@ func (model *modelCommon) GetUID() types.UID { // EventAdd to handle vcjob add event func (job *VCJobModel) EventAdd(businessAgent *agent.BusinessAgent) error { - businessAgent.RwMutex.RLock() hwlog.RunLog.Infof("create business worker for %s/%s", job.JobNamespace, job.JobName) _, exist := businessAgent.BusinessWorker[job.uid] @@ -82,44 +81,56 @@ func (job *VCJobModel) EventAdd(businessAgent *agent.BusinessAgent) error { hwlog.RunLog.Infof("business worker for %s/%s is already existed", job.JobNamespace, job.JobName) return nil } - - // check if job's corresponding configmap is created successfully via volcano controller - cm, err := checkCMCreation(job.JobNamespace, job.JobName, businessAgent.KubeClientSet, businessAgent.Config) + rst, version, err := job.getRanktableInfo(businessAgent) if err != nil { return err } - - // retrieve configmap data - jobStartString, ok := cm.Data[agent.ConfigmapKey] - if !ok { - return errors.New("the key of " + agent.ConfigmapKey + " does not exist") - } - var rst ranktablev1.RankTableStatus - if err = rst.UnmarshalToRankTable(jobStartString); err != nil { - return err - } - hwlog.RunLog.Debugf("jobStarting: %#v", jobStartString) - ranktable, replicasTotal, err := RanktableFactory(job, rst, agent.GetJSONVersion()) if err != nil { return err } - jobWorker := agent.NewVCJobWorker(businessAgent, job.JobInfo, ranktable, replicasTotal) - + jobWorker := agent.NewVCJobWorker(businessAgent, job.JobInfo, ranktable, replicasTotal, version) // create a business worker for current job businessAgent.RwMutex.Lock() defer businessAgent.RwMutex.Unlock() - // start to report rank table build statistic for current job if businessAgent.Config.DisplayStatistic { go jobWorker.Statistic(BuildStatInterval) } - // save current business worker businessAgent.BusinessWorker[job.uid] = jobWorker return nil } +func (job *VCJobModel) getRanktableInfo(businessAgent *agent.BusinessAgent) (ranktablev1.RankTableStatus, int, + error) { + cm, err := checkCMCreation(job.JobNamespace, job.JobName, businessAgent.KubeClientSet, businessAgent.Config) + if err != nil { + return ranktablev1.RankTableStatus{}, 0, err + } + // retrieve configmap data + jobStartString, ok := cm.Data[agent.ConfigmapKey] + if !ok { + return ranktablev1.RankTableStatus{}, 0, errors.New("the key of " + agent.ConfigmapKey + " does not exist") + } + hwlog.RunLog.Debugf("jobStarting: %#v", jobStartString) + var rst ranktablev1.RankTableStatus + if err = rst.UnmarshalToRankTable(jobStartString); err != nil { + return ranktablev1.RankTableStatus{}, 0, err + } + hwlog.RunLog.Debugf("rankTable status: %#v", rst) + versionStr, ok := cm.Data[agent.ConfigmapVersion] + if !ok { + versionStr = "-1" + } + hwlog.RunLog.Debugf("ranktable version: %s", versionStr) + version, err := strconv.Atoi(versionStr) + if err != nil { + return ranktablev1.RankTableStatus{}, 0, err + } + return rst, version, err +} + // EventUpdate : to handle vcjob update event func (job *VCJobModel) EventUpdate(businessAgent *agent.BusinessAgent) error { businessAgent.RwMutex.RLock() diff --git a/pkg/ring-controller/ranktable/v2/types.go b/pkg/ring-controller/ranktable/v2/types.go index d17b5c6401727f4e7d8b3981131436a07c292b29..73d21a391cc6de822ef18994d874757c31826bcf 100644 --- a/pkg/ring-controller/ranktable/v2/types.go +++ b/pkg/ring-controller/ranktable/v2/types.go @@ -25,7 +25,7 @@ import ( // RankTable : ranktable of v2 type RankTable struct { v1.RankTableStatus - Servers *sync.Map + Servers *sync.Map `json:"-"` ServerList []*Server `json:"server_list"` // hccl_json server list ServerCount string `json:"server_count"` // hccl_json server count Version string `json:"version"` // hccl_json version