Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rejected status when receiving task to check for insufficient resources #204

Open
wants to merge 14 commits into
base: releases
Choose a base branch
from
2 changes: 1 addition & 1 deletion build/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const UBITaskImageIntelCpu = "filswan/ubi-worker-cpu-intel:latest"
const UBITaskImageIntelGpu = "filswan/ubi-worker-gpu-intel:latest"
const UBITaskImageAmdCpu = "filswan/ubi-worker-cpu-amd:latest"
const UBITaskImageAmdGpu = "filswan/ubi-worker-gpu-amd:latest"
const UBIResourceExporterDockerImage = "filswan/resource-exporter:v11.3.2"
const UBIResourceExporterDockerImage = "filswan/resource-exporter:v12.0.0-beta"
const TraefikServerDockerImage = "traefik:v2.10"

func UserVersion() string {
Expand Down
2 changes: 2 additions & 0 deletions cmd/computing-provider/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ func getColor(status int) []tablewriter.Colors {
rowColor = []tablewriter.Colors{{tablewriter.Bold, tablewriter.FgHiMagentaColor}}
case models.JOB_RECEIVED_STATUS:
rowColor = []tablewriter.Colors{{tablewriter.Bold, tablewriter.FgHiBlueColor}}
case models.JOB_REJECTED_STATUS:
rowColor = []tablewriter.Colors{{tablewriter.Bold, tablewriter.FgRedColor}}
default:
rowColor = []tablewriter.Colors{{tablewriter.Bold, tablewriter.FgHiCyanColor}}
}
Expand Down
35 changes: 0 additions & 35 deletions internal/computing/cron_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,37 +279,6 @@ func (task *CronTask) watchExpiredTask() {
}
continue
}
} else {
// compatible with space_uuid
spaceUuidDeployName := constants.K8S_DEPLOY_NAME_PREFIX + strings.ToLower(job.SpaceUuid)
if _, ok = deployOnK8s[spaceUuidDeployName]; ok {
if NewJobService().GetJobEntityBySpaceUuid(job.SpaceUuid) > 0 && time.Now().Unix() < job.ExpireTime {
if job.Status != models.JOB_RUNNING_STATUS {
foundDeployment, err := k8sService.k8sClient.AppsV1().Deployments(job.NameSpace).Get(context.TODO(), job.K8sDeployName, metav1.GetOptions{})
if err != nil {
continue
}
if foundDeployment.Status.AvailableReplicas > 0 {
job.PodStatus = models.POD_RUNNING_STATUS
job.Status = models.JOB_RUNNING_STATUS
NewJobService().UpdateJobEntityByJobUuid(job)
}
}
continue
}

var nameSpace = job.NameSpace
if job.Status == models.JOB_TERMINATED_STATUS || job.Status == models.JOB_COMPLETED_STATUS {
if strings.TrimSpace(nameSpace) == "" {
nameSpace = constants.K8S_NAMESPACE_NAME_PREFIX + strings.ToLower(job.WalletAddress)
}

if err = DeleteJob(nameSpace, job.SpaceUuid, "cron-task abnormal state, compatible with space_uuid"); err != nil {
logs.GetLogger().Errorf("failed to use spaceUuid: %s delete job, error: %v", job.SpaceUuid, err)
}
continue
}
}
}

if job.DeleteAt == models.DELETED_FLAG {
Expand Down Expand Up @@ -359,10 +328,6 @@ func (task *CronTask) watchExpiredTask() {
continue
}

if err = DeleteJob(job.NameSpace, job.SpaceUuid, "compatible with old versions, cron-task abnormal state"); err != nil {
logs.GetLogger().Errorf("failed to use spaceUuid: %s delete job, error: %v", job.SpaceUuid, err)
continue
}
deleteSpaceIdAndJobUuid[job.JobUuid] = job.SpaceUuid + "_" + job.JobUuid
}
}
Expand Down
13 changes: 6 additions & 7 deletions internal/computing/docker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/swanchain/go-computing-provider/constants"
"io"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
Expand Down Expand Up @@ -302,11 +301,11 @@ func (ds *DockerService) CleanResourceForDocker() {
}
}

cmd := exec.Command("docker", "system", "prune", "-f")
if err = cmd.Run(); err != nil {
logs.GetLogger().Errorf("failed to clean resource, error: %+v", err)
return
}
ctx := context.Background()
danglingFilters := filters.NewArgs()
danglingFilters.Add("dangling", "true")
ds.c.ImagesPrune(ctx, danglingFilters)
ds.c.ContainersPrune(ctx, filters.NewArgs())
}

func (ds *DockerService) PullImage(imageName string) error {
Expand Down Expand Up @@ -334,7 +333,7 @@ func (ds *DockerService) PullImage(imageName string) error {
}

func (ds *DockerService) CheckRunningContainer(containerName string) (bool, error) {
containers, err := ds.c.ContainerList(context.Background(), container.ListOptions{})
containers, err := ds.c.ContainerList(context.Background(), container.ListOptions{All: true})
if err != nil {
logs.GetLogger().Errorf("listing containers failed, error: %v", err)
return false, err
Expand Down
71 changes: 48 additions & 23 deletions internal/computing/ecp_image_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (*ImageJobService) CheckJobCondition(c *gin.Context) {
}
}

receive, _, _, _, _, err := checkResourceForImage(job.Resource)
receive, _, _, _, _, _, err := checkResourceForImage(job.Uuid, job.Resource)
if receive {
c.JSON(http.StatusOK, util.CreateSuccessResponse(map[string]interface{}{
"price": totalCost,
Expand Down Expand Up @@ -192,13 +192,17 @@ func (imageJob *ImageJobService) DeployJob(c *gin.Context) {
}

if !checkPriceFlag {
if job.Price == "-1" && job.JobType == models.MiningJobType {
taskEntity.Status = models.TASK_REJECTED_STATUS
NewTaskService().SaveTaskEntity(taskEntity)
}
logs.GetLogger().Errorf("bid below the set price, job_uuid: %s, pid: %s, need: %0.4f", job.Uuid, job.Price, totalCost)
c.JSON(http.StatusBadRequest, util.CreateErrorResponse(util.BelowPriceError))
return
}
}

isReceive, _, needCpu, _, indexs, err := checkResourceForImage(job.Resource)
isReceive, _, needCpu, _, indexs, noAvailableMsgs, err := checkResourceForImage(job.Uuid, job.Resource)
if err != nil {
if job.Price == "-1" && job.JobType == models.MiningJobType {
taskEntity.Status = models.TASK_FAILED_STATUS
Expand All @@ -212,7 +216,8 @@ func (imageJob *ImageJobService) DeployJob(c *gin.Context) {
taskEntity.Status = models.TASK_REJECTED_STATUS
NewTaskService().SaveTaskEntity(taskEntity)
}
c.JSON(http.StatusInternalServerError, util.CreateErrorResponse(util.NoAvailableResourcesError))
logs.GetLogger().Warnf("job_uuid: %s, name: %s, msg: %s", job.Uuid, job.Name, strings.Join(noAvailableMsgs, ";"))
c.JSON(http.StatusInternalServerError, util.CreateErrorResponse(util.NoAvailableResourcesError, strings.Join(noAvailableMsgs, ";")))
return
}

Expand Down Expand Up @@ -768,10 +773,10 @@ func checkPriceForDocker(userPrice string, duration int, resource *models.Hardwa
return userPayPrice >= totalCost, totalCost, nil
}

func checkResourceForImage(resource *models.HardwareResource) (bool, string, int64, int64, []string, error) {
func checkResourceForImage(jobUud string, resource *models.HardwareResource) (bool, string, int64, int64, []string, []string, error) {
list, err := NewEcpJobService().GetEcpJobList([]string{models.CreatedStatus, models.RunningStatus})
if err != nil {
return false, "", 0, 0, nil, err
return false, "", 0, 0, nil, nil, err
}

var taskGpuMap = make(map[string][]string)
Expand All @@ -784,12 +789,12 @@ func checkResourceForImage(resource *models.HardwareResource) (bool, string, int
dockerService := NewDockerService()
containerLogStr, err := dockerService.ContainerLogs("resource-exporter")
if err != nil {
return false, "", 0, 0, nil, err
return false, "", 0, 0, nil, nil, err
}

var nodeResource models.NodeResource
if err := json.Unmarshal([]byte(containerLogStr), &nodeResource); err != nil {
return false, "", 0, 0, nil, err
return false, "", 0, 0, nil, nil, err
}

needCpu := resource.CPU
Expand Down Expand Up @@ -846,25 +851,45 @@ func checkResourceForImage(resource *models.HardwareResource) (bool, string, int

logs.GetLogger().Infof("checkResourceForImage: needCpu: %d, needMemory: %.2f, needStorage: %.2f, needGpu: %d, gpuName: %s", needCpu, needMemory, needStorage, resource.GPU, resource.GPUModel)
logs.GetLogger().Infof("checkResourceForImage: remainingCpu: %d, remainingMemory: %.2f, remainingStorage: %.2f, remainingGpu: %+v", remainderCpu, remainderMemory, remainderStorage, gpuMap)
if needCpu <= remainderCpu && needMemory <= remainderMemory && needStorage <= remainderStorage {
if resource.GPUModel != "" {
var flag bool
for k, gd := range gpuMap {
if strings.ToUpper(k) == resource.GPUModel && gd.num > 0 {
indexs = gd.indexs
flag = true
break
}
}
if flag {
return true, nodeResource.CpuName, needCpu, int64(needMemory), indexs, nil
} else {
return false, nodeResource.CpuName, needCpu, int64(needMemory), indexs, nil

var noAvailableStr []string
if remainderCpu < needCpu {
noAvailableStr = append(noAvailableStr, fmt.Sprintf("cpu need: %d, remainder: %d", needCpu, remainderCpu))
}
if remainderMemory < needMemory {
noAvailableStr = append(noAvailableStr, fmt.Sprintf("memory need: %f, remainder: %f", needMemory, remainderMemory))
}
if remainderStorage < needStorage {
noAvailableStr = append(noAvailableStr, fmt.Sprintf("storage need: %f, remainder: %f", needStorage, remainderStorage))
}

if resource.GPUModel != "" {
var newGpuIndex []string
var flag bool
for k, gd := range gpuMap {
if strings.ToUpper(k) == resource.GPUModel && gd.num > 0 {
newGpuIndex = gd.indexs
flag = true
break
}
}
return true, nodeResource.CpuName, needCpu, int64(needMemory), indexs, nil
if flag {
return true, nodeResource.CpuName, needCpu, int64(needMemory), newGpuIndex, nil, nil
} else {
noAvailableStr = append(noAvailableStr, fmt.Sprintf("gpu need name:%s, num:%d, remainder:%d", resource.GPUModel, resource.GPU, len(newGpuIndex)))
logs.GetLogger().Warnf("the task_uuid: %s resource is not available. Reason: %s",
jobUud, strings.Join(noAvailableStr, ";"))
return false, nodeResource.CpuName, needCpu, int64(needMemory), newGpuIndex, noAvailableStr, nil
}
}

if len(noAvailableStr) == 0 {
return true, nodeResource.CpuName, needCpu, int64(needMemory), indexs, nil, nil
}
return false, nodeResource.CpuName, needCpu, int64(needMemory), indexs, nil

logs.GetLogger().Warnf("the task_uuid: %s resource is not available. Reason: %s",
jobUud, strings.Join(noAvailableStr, ";"))
return false, nodeResource.CpuName, needCpu, int64(needMemory), indexs, noAvailableStr, nil
}

func parsePrice(priceStr string) (float64, error) {
Expand Down
10 changes: 9 additions & 1 deletion internal/computing/entity_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ func (taskServ TaskService) UpdateTaskEntityByTaskId(task *models.TaskEntity) (e
}

func (taskServ TaskService) UpdateTaskEntityByTaskUuId(task *models.TaskEntity) (err error) {
return taskServ.Model(&models.TaskEntity{}).Where("uuid=?", task.Uuid).Updates(task).Error
return taskServ.Model(&models.TaskEntity{}).Where("uuid=?", task.Uuid).Updates(map[string]interface{}{
"status": task.Status,
}).Error
}

func (taskServ TaskService) GetTaskEntity(taskId int64) (*models.TaskEntity, error) {
Expand Down Expand Up @@ -102,6 +104,12 @@ func (jobServ JobService) UpdateJobEntityByJobUuid(job *models.JobEntity) (err e
return jobServ.Where("job_uuid=? and delete_at=?", job.JobUuid, models.UN_DELETEED_FLAG).Updates(job).Error
}

func (jobServ JobService) UpdateJobEntityStatusByJobUuid(jobUuid string, status int) (err error) {
return jobServ.Where("job_uuid=?", jobUuid).Updates(map[string]interface{}{
"status": status,
}).Error
}

func (jobServ JobService) UpdateJobResultUrlByJobUuid(jobUuid string, resultUrl string) (err error) {
return jobServ.Model(&models.JobEntity{}).Where("job_uuid=? and delete_at=?", jobUuid, models.UN_DELETEED_FLAG).Update("result_url", resultUrl).Error
}
Expand Down
2 changes: 1 addition & 1 deletion internal/computing/k8s_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,7 @@ func (s *K8sService) UpdateContainerLogToFile(jobUuid string) {
return
}

err = wait.PollUntilContextTimeout(context.Background(), 2*time.Second, 3*time.Minute, true, func(ctx context.Context) (done bool, err error) {
err = wait.PollUntilContextTimeout(context.Background(), 3*time.Second, 10*time.Minute, true, func(ctx context.Context) (done bool, err error) {
k8sService := NewK8sService()
pods, err := k8sService.k8sClient.CoreV1().Pods(jobEntity.NameSpace).List(context.TODO(), metaV1.ListOptions{
LabelSelector: fmt.Sprintf("lad_app=%s", jobUuid),
Expand Down
Loading