mirror of
https://github.com/nclabteam/THPA.git
synced 2025-02-12 10:31:54 +00:00
817 lines
25 KiB
Go
817 lines
25 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"github.com/comail/colog"
|
|
"github.com/julienschmidt/httprouter"
|
|
"gopkg.in/yaml.v2"
|
|
v1autoscaling "k8s.io/api/autoscaling/v1"
|
|
"k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/clientcmd"
|
|
"k8s.io/klog"
|
|
"k8s.io/kubernetes/pkg/apis/autoscaling"
|
|
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1"
|
|
"log"
|
|
"math"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
"fmt"
|
|
)
|
|
|
|
const (
|
|
versionPath = "/version"
|
|
apiPrefix = "/scheduler"
|
|
bindPath = apiPrefix + "/bind"
|
|
preemptionPath = apiPrefix + "/preemption"
|
|
predicatesPrefix = apiPrefix + "/predicates"
|
|
prioritiesPrefix = apiPrefix + "/priorities"
|
|
EPS = 0.1)
|
|
|
|
type safeIntValueMap struct {
|
|
mu sync.RWMutex
|
|
intValueMap map[string]int
|
|
}
|
|
|
|
type safeFloatValueMap struct {
|
|
mu sync.RWMutex
|
|
stringFloat64Map map[string]float64
|
|
}
|
|
|
|
type safeAppTrafficInfo struct {
|
|
mu sync.RWMutex
|
|
appTrafficInfo map[string]safeFloatValueMap
|
|
}
|
|
|
|
type safePodsDistributionMap struct {
|
|
mu sync.RWMutex
|
|
podsDistributionRatioMap map[string]safeIntValueMap
|
|
}
|
|
|
|
type safeUpdatedCheckMap struct {
|
|
mu sync.RWMutex
|
|
isMapUpdated map[string]bool
|
|
}
|
|
|
|
type Config struct {
|
|
Application struct {
|
|
Name string `yaml:"name"`
|
|
NameSpace string `yaml:"namespace"`
|
|
DefaultPods string `yaml:"defaultPods"`
|
|
}
|
|
K8sConfigPath struct {
|
|
Path string `yaml:"path"`
|
|
}
|
|
}
|
|
|
|
var (
|
|
version string // injected via ldflags at build time
|
|
//Phuc
|
|
sHpaObject = "HorizontalPodAutoscaler"
|
|
//example message HorizontalPodAutoscaler--app-example1--New size: 7; reason: cpu resource utilization (percentage of request) above target
|
|
bInit = false
|
|
bCalculating = false
|
|
neighborsInfo map[string]map[string]float64
|
|
node1Neighbors map[string]float64
|
|
node2Neighbors map[string]float64
|
|
node3Neighbors map[string]float64
|
|
//podsDistributionRatioMap safeIntValueMap
|
|
//nodesTrafficMap safeFloatValueMap
|
|
podsDistributionInfo safePodsDistributionMap
|
|
appTrafficInfo safeAppTrafficInfo
|
|
appUpdateInfoMap safeUpdatedCheckMap
|
|
//global variable
|
|
customConfig Config
|
|
config *rest.Config
|
|
clientset *kubernetes.Clientset
|
|
appPodOptions metav1.ListOptions
|
|
defaultApplicationPods int
|
|
start time.Time
|
|
// end Phuc
|
|
|
|
// Create Filter, Priority, .. for scheduling
|
|
TruePredicate Predicate
|
|
|
|
ZeroPriority Prioritize
|
|
|
|
NoBind Bind
|
|
|
|
EchoPreemption Preemption
|
|
podsDis map[string]int
|
|
)
|
|
|
|
func StringToLevel(levelStr string) colog.Level {
|
|
switch level := strings.ToUpper(levelStr); level {
|
|
case "TRACE":
|
|
return colog.LTrace
|
|
case "DEBUG":
|
|
return colog.LDebug
|
|
case "INFO":
|
|
return colog.LInfo
|
|
case "WARNING":
|
|
return colog.LWarning
|
|
case "ERROR":
|
|
return colog.LError
|
|
case "ALERT":
|
|
return colog.LAlert
|
|
default:
|
|
log.Printf("warning: LOG_LEVEL=\"%s\" is empty or invalid, fallling back to \"INFO\".\n", level)
|
|
return colog.LInfo
|
|
}
|
|
}
|
|
|
|
func main() {
|
|
|
|
// Init configs
|
|
//podsDis = make(map[string]int) // Un-comment if you would like to hard code the pods distribution
|
|
//podsDis["1node1"] = 7
|
|
//podsDis["2node2"] = 1
|
|
//podsDis["3node3"] = 1
|
|
initConfigs()
|
|
start = time.Now()
|
|
defaultApplicationPods, _ = strconv.Atoi(customConfig.Application.DefaultPods)
|
|
//Init test set
|
|
bInit = true
|
|
//TODO we must update neighbors for each node in another func
|
|
//1node1 neighbors
|
|
node1Neighbors = make(map[string]float64)
|
|
node1Neighbors["2node2"] = 5
|
|
node1Neighbors["3node3"] = 20
|
|
log.Printf("node1Neighbors len = %d", len(node1Neighbors))
|
|
//2node2 neighbors
|
|
node2Neighbors = make(map[string]float64)
|
|
node2Neighbors["1node1"] = 5
|
|
node2Neighbors["3node3"] = 10
|
|
//3node3 neighbors
|
|
node3Neighbors = make(map[string]float64)
|
|
node3Neighbors["2node2"] = 10
|
|
node3Neighbors["1node1"] = 20
|
|
|
|
neighborsInfo = make(map[string]map[string]float64)
|
|
neighborsInfo["1node1"] = node1Neighbors
|
|
neighborsInfo["2node2"] = node2Neighbors
|
|
neighborsInfo["3node3"] = node3Neighbors
|
|
appTrafficInfo = safeAppTrafficInfo{
|
|
mu: sync.RWMutex{},
|
|
appTrafficInfo: make(map[string]safeFloatValueMap),
|
|
}
|
|
podsDistributionInfo = safePodsDistributionMap{
|
|
mu: sync.RWMutex{},
|
|
podsDistributionRatioMap: make(map[string]safeIntValueMap),
|
|
}
|
|
appUpdateInfoMap = safeUpdatedCheckMap{
|
|
mu: sync.RWMutex{},
|
|
isMapUpdated: make(map[string]bool),
|
|
}
|
|
|
|
colog.SetDefaultLevel(colog.LInfo)
|
|
colog.SetMinLevel(colog.LInfo)
|
|
colog.SetFormatter(&colog.StdFormatter{
|
|
Colors: true,
|
|
Flag: log.Ldate | log.Ltime | log.Lshortfile,
|
|
})
|
|
colog.Register()
|
|
level := StringToLevel(os.Getenv("LOG_LEVEL"))
|
|
log.Print("Log level was set to ", strings.ToUpper(level.String()))
|
|
colog.SetMinLevel(level)
|
|
|
|
router := httprouter.New()
|
|
AddVersion(router)
|
|
|
|
predicates := []Predicate{TruePredicate}
|
|
for _, p := range predicates {
|
|
AddPredicate(router, p)
|
|
}
|
|
priorities := []Prioritize{ZeroPriority}
|
|
for _, p := range priorities {
|
|
AddPrioritize(router, p)
|
|
}
|
|
AddBind(router, NoBind)
|
|
|
|
// watch used to get all events
|
|
watch, err := clientset.CoreV1().Events(customConfig.Application.NameSpace).Watch(metav1.ListOptions{})
|
|
if err != nil {
|
|
log.Fatal(err.Error())
|
|
}
|
|
ch := watch.ResultChan()
|
|
// this goroutine is used to parse all the events that generated by HPA object.
|
|
go func() {
|
|
for event := range ch {
|
|
e, _ := event.Object.(*v1.Event)
|
|
if (start.Sub(e.FirstTimestamp.Time)).Minutes() > 1 {
|
|
// This if statement is used to ignore old events
|
|
continue
|
|
}
|
|
if strings.Contains(e.Message, "old size") && strings.Contains(e.Message, "new size") {
|
|
message := e.Message
|
|
oldSizeMessagePart := message[strings.Index(message, "old size: "):strings.Index(message, ", new size")]
|
|
oldSize, _ := strconv.Atoi(strings.ReplaceAll(oldSizeMessagePart, "old size: ", ""))
|
|
newSizeMessagePart := message[strings.Index(message, "new size: "):strings.Index(message, ", reason")]
|
|
newSize, _ := strconv.Atoi(strings.ReplaceAll(newSizeMessagePart, "new size: ", ""))
|
|
if newSize < oldSize {
|
|
continue
|
|
}
|
|
appName := e.InvolvedObject.Name
|
|
appUpdateInfoMap.mu.Lock()
|
|
appUpdateInfoMap.isMapUpdated[appName] = false
|
|
appUpdateInfoMap.mu.Unlock()
|
|
if _, exist := appTrafficInfo.appTrafficInfo[appName]; !exist {
|
|
appTrafficInfo.mu.Lock()
|
|
podsDistributionInfo.mu.Lock()
|
|
appTrafficInfo.appTrafficInfo[appName] = nodesTrafficMapInit()
|
|
podsDistributionInfo.podsDistributionRatioMap[appName] = nodesPodsDistributionMapInit()
|
|
appTrafficInfo.mu.Unlock()
|
|
podsDistributionInfo.mu.Unlock()
|
|
}
|
|
log.Printf("goroutine id %d", goid())
|
|
log.Print("============================")
|
|
log.Print("====HPA Upscale happends====")
|
|
log.Printf("=======New pod = %d=========", newSize - oldSize)
|
|
log.Print("============================")
|
|
updateNodesTraffic(appTrafficInfo.appTrafficInfo[appName], appName)
|
|
podsDistributionInfo.mu.Lock()
|
|
calculatePodsDistribution(newSize - oldSize, appTrafficInfo.appTrafficInfo[appName], podsDistributionInfo.podsDistributionRatioMap[appName])
|
|
podsDistributionInfo.mu.Unlock()
|
|
appUpdateInfoMap.mu.Lock()
|
|
appUpdateInfoMap.isMapUpdated[appName] = true
|
|
appUpdateInfoMap.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
}()
|
|
|
|
time.Sleep(2 * time.Second)
|
|
|
|
log.Print("info: server starting on localhost port :8888")
|
|
bInit = false // Prevent old events affect
|
|
if err := http.ListenAndServe("localhost:8888", router); err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
}
|
|
|
|
func calculatePodsDistribution(iNumberOfNewPodScheduledByHPA int, nodesTrafficMap safeFloatValueMap, podsDistributionRatioMap safeIntValueMap) {
|
|
log.Print("(Calculation Begin) Calculating pods distribution")
|
|
// We need to update latest traffic info for all nodes before calculation
|
|
sumTrafficFromMap := nodesTrafficMap.getValuesSum()
|
|
allNodesHasSameTraffic := false
|
|
if sumTrafficFromMap - EPS < 0 {
|
|
log.Printf("Sum < EPS")
|
|
nodesTrafficMap.equalizeValuesTo1()
|
|
sumTrafficFromMap = nodesTrafficMap.getValuesSum()
|
|
allNodesHasSameTraffic = true
|
|
}
|
|
sortedNodesNameBasedOnTraffic := getSortedMapKeysByValueDesc(nodesTrafficMap.getFloat64ValueMap())
|
|
podsDistributionRatioMap.mu.Lock()
|
|
iNumberOfNewPodScheduledByHPAClone := iNumberOfNewPodScheduledByHPA
|
|
for iNumberOfNewPodScheduledByHPAClone > 0 {
|
|
for i, nodeName := range sortedNodesNameBasedOnTraffic {
|
|
if iNumberOfNewPodScheduledByHPAClone == 0 {
|
|
break
|
|
}
|
|
numberOfPods := int(math.RoundToEven(float64(iNumberOfNewPodScheduledByHPA) * (nodesTrafficMap.getFloat64Value(nodeName) / sumTrafficFromMap)))
|
|
if numberOfPods > iNumberOfNewPodScheduledByHPAClone {
|
|
numberOfPods = iNumberOfNewPodScheduledByHPAClone
|
|
}
|
|
if numberOfPods == 0 && i == 0 {
|
|
allNodesHasSameTraffic = true
|
|
}
|
|
if allNodesHasSameTraffic == true {
|
|
numberOfPods = 1
|
|
}
|
|
currentPodsOnNode := podsDistributionRatioMap.getInValue(nodeName)
|
|
podsDistributionRatioMap.setIntValue(nodeName, currentPodsOnNode + numberOfPods)
|
|
iNumberOfNewPodScheduledByHPAClone = iNumberOfNewPodScheduledByHPAClone - numberOfPods
|
|
}
|
|
}
|
|
log.Print("(Calculation Done) Pods distribution calculated")
|
|
podsDistributionRatioMap.printPodsDistributionInfo()
|
|
podsDistributionRatioMap.mu.Unlock()
|
|
}
|
|
|
|
func getAllRunningAppPod () int {
|
|
// get the pod list
|
|
podList, _ := clientset.CoreV1().Pods(customConfig.Application.NameSpace).List(metav1.ListOptions{
|
|
LabelSelector: "app=" + customConfig.Application.Name,
|
|
})
|
|
count := 0
|
|
for _, pod := range podList.Items {
|
|
if pod.Status.Phase == v1.PodRunning {
|
|
count++
|
|
}
|
|
}
|
|
// List() returns a pointer to slice, derefernce it, before iterating
|
|
log.Printf("info:func GetAllActiveAppPod() => Current pods in cluster = %d", count)
|
|
|
|
return count
|
|
}
|
|
|
|
func initConfigs () {
|
|
|
|
log.Print("Initialize configuration")
|
|
//Read configuration file
|
|
log.Print("Read configuration file")
|
|
dir, err := filepath.Abs(filepath.Dir(os.Args[0]))
|
|
log.Printf("Current dir = %s", dir)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
f, err := os.Open(dir + "/config.yaml")
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
decoder := yaml.NewDecoder(f)
|
|
err = decoder.Decode(&customConfig)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
if errWhenCloseFile := f.Close(); errWhenCloseFile != nil {
|
|
log.Fatal(errWhenCloseFile)
|
|
}
|
|
log.Printf("config path %s", customConfig.K8sConfigPath.Path)
|
|
// initConfigs
|
|
config , _ = clientcmd.BuildConfigFromFlags("", customConfig.K8sConfigPath.Path)
|
|
if config == nil {
|
|
log.Print("Config is nil")
|
|
}
|
|
clientset, _ = kubernetes.NewForConfig(config)
|
|
|
|
appPodOptions = metav1.ListOptions{
|
|
LabelSelector: "app=" + customConfig.Application.Name,
|
|
}
|
|
initScheduleRules()
|
|
|
|
}
|
|
|
|
func initScheduleRules () {
|
|
|
|
log.Print("Adding schedule rules")
|
|
TruePredicate = Predicate{
|
|
Name: "always_true",
|
|
Func: func(pod v1.Pod, node v1.Node) (bool, error) {
|
|
return true, nil
|
|
},
|
|
}
|
|
|
|
ZeroPriority = Prioritize{
|
|
Name: "traffic_neighbors_scoring",
|
|
Func: func(pod v1.Pod, nodes []v1.Node) (*schedulerapi.HostPriorityList, error) {
|
|
var priorityList schedulerapi.HostPriorityList
|
|
isScheduled := false
|
|
priorityList = make([]schedulerapi.HostPriority, len(nodes))
|
|
//missingLabelErr := "Application " + pod.Name + "does not have app label"
|
|
bMissingAppLabel := false
|
|
labelMap := pod.ObjectMeta.Labels
|
|
if labelMap == nil {
|
|
bMissingAppLabel = true
|
|
}
|
|
if _, exist := labelMap["app"]; !exist {
|
|
bMissingAppLabel = true
|
|
}
|
|
appName := labelMap["app"]
|
|
isHPAReady := false
|
|
hpa, err := clientset.AutoscalingV1().HorizontalPodAutoscalers("default").Get(appName, metav1.GetOptions{})
|
|
if err == nil {
|
|
// The below code is used to check that hpa is ready or not => If it is not ready, it means that all pods pass to this extender prioritize is init pods
|
|
var hpaConditions []v1autoscaling.HorizontalPodAutoscalerCondition
|
|
if err := json.Unmarshal([]byte(hpa.ObjectMeta.Annotations[autoscaling.HorizontalPodAutoscalerConditionsAnnotation]), &hpaConditions); err != nil {
|
|
panic("Cannot get hpa conditions")
|
|
}
|
|
for _, condition := range hpaConditions {
|
|
if condition.Type == v1autoscaling.ScalingActive {
|
|
if condition.Status == "True" {
|
|
isHPAReady = true
|
|
log.Printf("### HPA %s is ready", hpa.Name)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if strings.Contains(pod.Name, appName) && isHPAReady == true && !bMissingAppLabel {
|
|
isAppNameExistInPodsDisMap := false
|
|
isAppNameExistInUpdateInfoMap := false
|
|
podsDistributionInfo.mu.RLock()
|
|
_, isAppNameExistInPodsDisMap = podsDistributionInfo.podsDistributionRatioMap[appName]
|
|
podsDistributionInfo.mu.RUnlock()
|
|
appUpdateInfoMap.mu.RLock()
|
|
_, isAppNameExistInUpdateInfoMap = appUpdateInfoMap.isMapUpdated[appName]
|
|
bIsMapUpdated := appUpdateInfoMap.isMapUpdated[appName]
|
|
appUpdateInfoMap.mu.RUnlock()
|
|
bLog1Time := true
|
|
for !isAppNameExistInPodsDisMap || !isAppNameExistInUpdateInfoMap || !bIsMapUpdated {
|
|
if bLog1Time {
|
|
log.Printf("Wait for all maps to be updated ...")
|
|
bLog1Time = false
|
|
}
|
|
podsDistributionInfo.mu.RLock()
|
|
_, isAppNameExistInPodsDisMap = podsDistributionInfo.podsDistributionRatioMap[appName]
|
|
podsDistributionInfo.mu.RUnlock()
|
|
appUpdateInfoMap.mu.RLock()
|
|
_, isAppNameExistInUpdateInfoMap = appUpdateInfoMap.isMapUpdated[appName]
|
|
bIsMapUpdated = appUpdateInfoMap.isMapUpdated[appName]
|
|
appUpdateInfoMap.mu.RUnlock()
|
|
}
|
|
log.Printf("All Maps are updated")
|
|
// Use for loop here to make sure that no anynomous host prioritize exist
|
|
for !isScheduled {
|
|
podsDistributionInfo.mu.RLock()
|
|
sortedNodeNameBasedOnPodsValue := getSortedMapKeysByPodsValueDesc(podsDistributionInfo.podsDistributionRatioMap[appName].intValueMap)
|
|
podsDistributionInfo.mu.RUnlock()
|
|
for _, nodeName := range sortedNodeNameBasedOnPodsValue {
|
|
podsDistributionInfo.mu.RLock()
|
|
podsOnNode := podsDistributionInfo.podsDistributionRatioMap[appName].intValueMap[nodeName]//podsDistributionRatioMap.getInValue(nodeName)
|
|
podsDistributionInfo.mu.RUnlock()
|
|
if podsOnNode == 0 {
|
|
continue
|
|
}
|
|
log.Print("Start to schedule 1 pod")
|
|
if nodeExistOnList(nodeName, nodes) {
|
|
iMaxScore := len(nodes) * 10
|
|
iIndex := 0
|
|
priorityList[iIndex] = schedulerapi.HostPriority{
|
|
Host: nodeName,
|
|
Score: int64(iMaxScore),
|
|
}
|
|
log.Printf("Node %s has score = %d", nodeName, iMaxScore)
|
|
iIndex += 1
|
|
iMaxScore = iMaxScore - 10
|
|
sortedNodeNameBasedOnDelayValue := getSortedMapKeysByValueAsc(neighborsInfo[nodeName])
|
|
for _, neighbors := range sortedNodeNameBasedOnDelayValue {
|
|
if nodeExistOnList(neighbors, nodes) {
|
|
priorityList[iIndex] = schedulerapi.HostPriority{
|
|
Host: neighbors,
|
|
Score: int64(iMaxScore),
|
|
}
|
|
log.Printf("Node %s has score = %d", neighbors, iMaxScore)
|
|
iIndex++
|
|
iMaxScore = iMaxScore - 10
|
|
}
|
|
}
|
|
isScheduled = true
|
|
podsDistributionInfo.mu.Lock()
|
|
oldPodsValueOnNode := podsDistributionInfo.podsDistributionRatioMap[appName].intValueMap[nodeName]//podsDistributionRatioMap.getInValue(nodeName)
|
|
podsDistributionInfo.podsDistributionRatioMap[appName].intValueMap[nodeName] = oldPodsValueOnNode - 1
|
|
podsDistributionInfo.mu.Unlock()
|
|
break
|
|
} else {
|
|
// Iterate all neighbors and give them score
|
|
iMaxScore := len(nodes) * 10
|
|
iIndex := 0
|
|
sortedNodeNameBasedOnDelayValue := getSortedMapKeysByValueAsc(neighborsInfo[nodeName])
|
|
for _, neighbors := range sortedNodeNameBasedOnDelayValue {
|
|
if nodeExistOnList(neighbors, nodes) {
|
|
priorityList[iIndex] = schedulerapi.HostPriority{
|
|
Host: neighbors,
|
|
Score: int64(iMaxScore),
|
|
}
|
|
iIndex++
|
|
iMaxScore = iMaxScore - 10
|
|
}
|
|
}
|
|
isScheduled = true
|
|
podsDistributionInfo.mu.Lock()
|
|
oldPodsValueOnNode := podsDistributionInfo.podsDistributionRatioMap[appName].intValueMap[nodeName]
|
|
podsDistributionInfo.podsDistributionRatioMap[appName].intValueMap[nodeName] = oldPodsValueOnNode - 1
|
|
podsDistributionInfo.mu.Unlock()
|
|
break
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
log.Printf("No Prioritize schedule")
|
|
iMaxScore := len(nodes) * 10
|
|
log.Printf("Len of podsDis map = %d", len(podsDis))
|
|
for k, v := range podsDis {
|
|
if v == 0 {
|
|
log.Print("Hello")
|
|
continue
|
|
}
|
|
log.Printf("k = %s", k)
|
|
for i, node := range nodes {
|
|
log.Printf("i = %d", i)
|
|
log.Printf("Filtered: node name = %s", node.Name)
|
|
if node.Name == k {
|
|
priorityList[i] = schedulerapi.HostPriority{
|
|
Host: node.Name,
|
|
Score: int64(iMaxScore),
|
|
}
|
|
} else {
|
|
priorityList[i] = schedulerapi.HostPriority{
|
|
Host: node.Name,
|
|
Score: 0,
|
|
}
|
|
}
|
|
}
|
|
podsDis[k] = podsDis[k] - 1
|
|
return &priorityList, nil
|
|
}
|
|
}
|
|
if _, exist := podsDistributionInfo.podsDistributionRatioMap[appName]; exist {
|
|
isAllNodesRemainZeroPod := true
|
|
for _, v := range podsDistributionInfo.podsDistributionRatioMap[appName].intValueMap {
|
|
if v != 0 {
|
|
isAllNodesRemainZeroPod = false
|
|
break
|
|
}
|
|
}
|
|
if isAllNodesRemainZeroPod {
|
|
appUpdateInfoMap.mu.Lock()
|
|
appUpdateInfoMap.isMapUpdated[appName] = false
|
|
appUpdateInfoMap.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
return &priorityList, nil
|
|
},
|
|
}
|
|
|
|
NoBind = Bind{
|
|
Func: func(podName string, podNamespace string, podUID types.UID, node string) error {
|
|
return fmt.Errorf("This extender doesn't support Bind. Please make 'BindVerb' be empty in your ExtenderConfig.")
|
|
},
|
|
}
|
|
|
|
EchoPreemption = Preemption{
|
|
Func: func(_ v1.Pod, _ map[string]*schedulerapi.Victims, nodeNameToMetaVictims map[string]*schedulerapi.MetaVictims, ) map[string]*schedulerapi.MetaVictims {
|
|
return nodeNameToMetaVictims
|
|
},
|
|
}
|
|
|
|
}
|
|
|
|
func getNodesTrafficInfoFromEndpoints(workerNodeName string, appName string) float64 {
|
|
realEPName := ""
|
|
endpoints, _ := clientset.CoreV1().Endpoints("default").List(metav1.ListOptions{})
|
|
for _, ep := range endpoints.Items {
|
|
if strings.Contains(ep.Name, workerNodeName) && strings.Contains(ep.Name, appName) {
|
|
realEPName = ep.Name
|
|
break
|
|
}
|
|
}
|
|
if realEPName == "" {
|
|
klog.Infof("EP name = empty")
|
|
return 0
|
|
}
|
|
realEP, _ := clientset.CoreV1().Endpoints("default").Get(realEPName, metav1.GetOptions{})
|
|
annotation := realEP.ObjectMeta.Annotations
|
|
if annotation == nil {
|
|
klog.Infof("EP %s does not have traffic info annotations", realEPName)
|
|
return 0
|
|
}
|
|
curNumRequests, _ := strconv.Atoi(annotation["numRequests"])
|
|
klog.Infof("CurrentNumRequests = %d", curNumRequests)
|
|
oldNumRequests, _ := strconv.Atoi(annotation["oldNumRequests"])
|
|
klog.Infof("oldNumRequests = %d", oldNumRequests)
|
|
trafficValue := float64(curNumRequests - oldNumRequests)
|
|
klog.Infof("Traffic value from ep %s = %f", realEPName, trafficValue)
|
|
annotation["reset"] = "true"
|
|
|
|
return trafficValue
|
|
}
|
|
|
|
func updateNodesTraffic(nodesTrafficMap safeFloatValueMap, appName string) {
|
|
//TODO update nodes traffic here by getting information from endpoint on each node
|
|
// Get all worker nodes
|
|
//nodesTrafficMap.mu.Lock()
|
|
log.Print("(Update begin) Updating nodes traffic")
|
|
workerNodes, _ := clientset.CoreV1().Nodes().List(metav1.ListOptions{
|
|
LabelSelector: "node-role.kubernetes.io/worker=true",
|
|
})
|
|
// Get traffic for all worker nodes from EP
|
|
for _, workerNode := range workerNodes.Items {
|
|
nodesTrafficMap.setFloatValue(workerNode.Name, getNodesTrafficInfoFromEndpoints(workerNode.Name, appName))
|
|
}
|
|
log.Print("(Update Done) Updated nodes traffic")
|
|
nodesTrafficMap.printNodesTrafficInfo()
|
|
//nodesTrafficMap.mu.Unlock()
|
|
}
|
|
|
|
// This func is used to sort map keys by their values
|
|
func getSortedMapKeysByValueAsc(inputMap map[string]float64) []string {
|
|
mapKeys := make([]string, 0, len(inputMap))
|
|
for key := range inputMap {
|
|
mapKeys = append(mapKeys, key)
|
|
}
|
|
sort.Slice(mapKeys, func(i, j int) bool {
|
|
return inputMap[mapKeys[i]] < inputMap[mapKeys[j]]
|
|
})
|
|
|
|
klog.Info("Neighbors name sorted by delay asc")
|
|
for i, key := range mapKeys {
|
|
klog.Infof("- Index %d: %s", i, key)
|
|
}
|
|
|
|
return mapKeys
|
|
}
|
|
|
|
// This func is used to sort map keys by their values
|
|
func getSortedMapKeysByPodsValueDesc(inputMap map[string]int) []string {
|
|
mapKeys := make([]string, 0, len(inputMap))
|
|
for key := range inputMap {
|
|
mapKeys = append(mapKeys, key)
|
|
}
|
|
sort.Slice(mapKeys, func(i, j int) bool {
|
|
return inputMap[mapKeys[i]] > inputMap[mapKeys[j]]
|
|
})
|
|
|
|
return mapKeys
|
|
}
|
|
|
|
func getSortedMapKeysByValueDesc(inputMap map[string]float64) []string {
|
|
mapKeys := make([]string, 0, len(inputMap))
|
|
for key := range inputMap {
|
|
mapKeys = append(mapKeys, key)
|
|
}
|
|
sort.Slice(mapKeys, func(i, j int) bool {
|
|
return inputMap[mapKeys[i]] > inputMap[mapKeys[j]]
|
|
})
|
|
|
|
klog.Info("Sorted keys of traffic map")
|
|
for i, key := range mapKeys {
|
|
klog.Infof("- Index %d: %s", i, key)
|
|
}
|
|
|
|
return mapKeys
|
|
}
|
|
|
|
func nodesTrafficMapInit() safeFloatValueMap {
|
|
nodesTrafficMap := safeFloatValueMap{stringFloat64Map: make(map[string]float64)}
|
|
workerNodes, _ := clientset.CoreV1().Nodes().List(metav1.ListOptions{
|
|
LabelSelector: "node-role.kubernetes.io/worker=true",
|
|
})
|
|
// Get traffic for all worker nodes from EP
|
|
for _, workerNode := range workerNodes.Items {
|
|
nodesTrafficMap.setFloatValue(workerNode.Name, 0.0)
|
|
}
|
|
return nodesTrafficMap
|
|
}
|
|
|
|
func nodesPodsDistributionMapInit() safeIntValueMap {
|
|
nodesPodsDistributionMap := safeIntValueMap{intValueMap: make(map[string]int)}
|
|
workerNodes, _ := clientset.CoreV1().Nodes().List(metav1.ListOptions{
|
|
LabelSelector: "node-role.kubernetes.io/worker=true",
|
|
})
|
|
// Get traffic for all worker nodes from EP
|
|
for _, workerNode := range workerNodes.Items {
|
|
nodesPodsDistributionMap.setIntValue(workerNode.Name, 0)
|
|
}
|
|
return nodesPodsDistributionMap
|
|
}
|
|
|
|
func calTotalFromMapValues(trafficMap map[string]float64) float64 {
|
|
result := 0.0
|
|
for _, value := range trafficMap {
|
|
result += value
|
|
}
|
|
return result
|
|
|
|
}
|
|
|
|
func nodeExistOnList(a string, list []v1.Node) bool {
|
|
for _, b := range list {
|
|
if b.Name == a {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func equalizeNodeTraffic(cloneNodesTrafficMap map[string]float64){
|
|
for nodeName, _ := range cloneNodesTrafficMap {
|
|
cloneNodesTrafficMap[nodeName] = 1.0
|
|
}
|
|
}
|
|
|
|
func updateOldTrafficForEPs() {
|
|
workerNodesName := make([]string, 0)
|
|
EPsName := make([]string, 0)
|
|
workerNodes, _ := clientset.CoreV1().Nodes().List(metav1.ListOptions{
|
|
LabelSelector: "node-role.kubernetes.io/worker=true",
|
|
})
|
|
for _, worker := range workerNodes.Items {
|
|
workerNodesName = append(workerNodesName, worker.Name)
|
|
}
|
|
if len(workerNodesName) == 0 {
|
|
log.Printf("Worker nodes name list is empty")
|
|
}
|
|
endpoints, _ := clientset.CoreV1().Endpoints("default").List(metav1.ListOptions{})
|
|
epPrefix := "-" + customConfig.Application.Name
|
|
for _, ep := range endpoints.Items {
|
|
if !strings.Contains(ep.Name, customConfig.Application.Name) || ep.Name == customConfig.Application.Name {
|
|
continue
|
|
}
|
|
if stringInSlice(ep.Name[0:strings.Index(ep.Name, epPrefix)], workerNodesName) {
|
|
EPsName = append(EPsName, ep.Name)
|
|
klog.Infof("EPsName value = %s", ep.Name)
|
|
}
|
|
}
|
|
for _, epName := range EPsName {
|
|
realEP, _ := clientset.CoreV1().Endpoints("default").Get(epName, metav1.GetOptions{})
|
|
annotation := realEP.ObjectMeta.Annotations
|
|
if annotation == nil {
|
|
klog.Infof("EP %s does not have traffic info annotations", epName)
|
|
continue
|
|
}
|
|
annotation["reset"] = "true"
|
|
realEP.ObjectMeta.Annotations = annotation
|
|
_, error := clientset.CoreV1().Endpoints("default").Update(realEP)
|
|
if error != nil {
|
|
klog.Infof("Can not update old num requests for EP %s", realEP.Name)
|
|
}
|
|
}
|
|
}
|
|
|
|
func stringInSlice(a string, list []string) bool {
|
|
for _, b := range list {
|
|
if b == a {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func isAllMapValueEqualToZero (inputMap map[string]int) bool {
|
|
for _, v := range inputMap {
|
|
if v != 0 {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (c *safeIntValueMap) setIntValue(key string, value int) {
|
|
// Lock so only one goroutine at a time can access the map c.intValueMap
|
|
c.intValueMap[key] = value
|
|
}
|
|
|
|
func (c *safeIntValueMap) getInValue(key string) int {
|
|
// Lock so only one goroutine at a time can access the map c.v.
|
|
return c.intValueMap[key]
|
|
}
|
|
|
|
func (c *safeIntValueMap) getIntValueMap() map[string]int {
|
|
return c.intValueMap
|
|
}
|
|
|
|
func (c *safeIntValueMap) printPodsDistributionInfo() {
|
|
for k, v := range c.intValueMap {
|
|
klog.Infof("Node %s has %d pods to schedule", k, v)
|
|
}
|
|
}
|
|
|
|
func (c *safeFloatValueMap) setFloatValue(key string, value float64) {
|
|
// Lock so only one goroutine at a time can access the map c.stringFloat64Map
|
|
c.stringFloat64Map[key] = value
|
|
}
|
|
|
|
func (c *safeFloatValueMap) getValuesSum() float64 {
|
|
sum := 0.0
|
|
for _, v := range c.stringFloat64Map {
|
|
sum += v
|
|
}
|
|
return sum
|
|
}
|
|
|
|
func (c *safeFloatValueMap) equalizeValuesTo1() {
|
|
// Lock so only one goroutine at a time can access the map c.stringFloat64Map
|
|
for k, _ := range c.stringFloat64Map {
|
|
c.stringFloat64Map[k] = 1.0
|
|
}
|
|
}
|
|
|
|
func (c *safeFloatValueMap) getFloat64Value(key string) float64 {
|
|
// Lock so only one goroutine at a time can access the map c.v.
|
|
return c.stringFloat64Map[key]
|
|
}
|
|
|
|
func (c *safeFloatValueMap) getFloat64ValueMap() map[string]float64 {
|
|
return c.stringFloat64Map
|
|
}
|
|
|
|
func (c *safeFloatValueMap) printNodesTrafficInfo() {
|
|
for k, v := range c.stringFloat64Map {
|
|
klog.Infof("Node %s has traffic %f", k, v)
|
|
}
|
|
}
|
|
|
|
func goid() int {
|
|
var buf [64]byte
|
|
n := runtime.Stack(buf[:], false)
|
|
idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine "))[0]
|
|
id, err := strconv.Atoi(idField)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("cannot get goroutine id: %v", err))
|
|
}
|
|
return id
|
|
}
|