2022-08-22 16:15:23 +09:00

817 lines
25 KiB

package main
import (
v1autoscaling ""
metav1 ""
schedulerapi ""
const (
versionPath = "/version"
apiPrefix = "/scheduler"
bindPath = apiPrefix + "/bind"
preemptionPath = apiPrefix + "/preemption"
predicatesPrefix = apiPrefix + "/predicates"
prioritiesPrefix = apiPrefix + "/priorities"
EPS = 0.1)
type safeIntValueMap struct {
mu sync.RWMutex
intValueMap map[string]int
type safeFloatValueMap struct {
mu sync.RWMutex
stringFloat64Map map[string]float64
type safeAppTrafficInfo struct {
mu sync.RWMutex
appTrafficInfo map[string]safeFloatValueMap
type safePodsDistributionMap struct {
mu sync.RWMutex
podsDistributionRatioMap map[string]safeIntValueMap
type safeUpdatedCheckMap struct {
mu sync.RWMutex
isMapUpdated map[string]bool
type Config struct {
Application struct {
Name string `yaml:"name"`
NameSpace string `yaml:"namespace"`
DefaultPods string `yaml:"defaultPods"`
K8sConfigPath struct {
Path string `yaml:"path"`
var (
version string // injected via ldflags at build time
sHpaObject = "HorizontalPodAutoscaler"
//example message HorizontalPodAutoscaler--app-example1--New size: 7; reason: cpu resource utilization (percentage of request) above target
bInit = false
bCalculating = false
neighborsInfo map[string]map[string]float64
node1Neighbors map[string]float64
node2Neighbors map[string]float64
node3Neighbors map[string]float64
//podsDistributionRatioMap safeIntValueMap
//nodesTrafficMap safeFloatValueMap
podsDistributionInfo safePodsDistributionMap
appTrafficInfo safeAppTrafficInfo
appUpdateInfoMap safeUpdatedCheckMap
//global variable
customConfig Config
config *rest.Config
clientset *kubernetes.Clientset
appPodOptions metav1.ListOptions
defaultApplicationPods int
start time.Time
// end Phuc
// Create Filter, Priority, .. for scheduling
TruePredicate Predicate
ZeroPriority Prioritize
NoBind Bind
EchoPreemption Preemption
podsDis map[string]int
func StringToLevel(levelStr string) colog.Level {
switch level := strings.ToUpper(levelStr); level {
case "TRACE":
return colog.LTrace
case "DEBUG":
return colog.LDebug
case "INFO":
return colog.LInfo
case "WARNING":
return colog.LWarning
case "ERROR":
return colog.LError
case "ALERT":
return colog.LAlert
log.Printf("warning: LOG_LEVEL=\"%s\" is empty or invalid, fallling back to \"INFO\".\n", level)
return colog.LInfo
func main() {
// Init configs
//podsDis = make(map[string]int) // Un-comment if you would like to hard code the pods distribution
//podsDis["1node1"] = 7
//podsDis["2node2"] = 1
//podsDis["3node3"] = 1
start = time.Now()
defaultApplicationPods, _ = strconv.Atoi(customConfig.Application.DefaultPods)
//Init test set
bInit = true
//TODO we must update neighbors for each node in another func
//1node1 neighbors
node1Neighbors = make(map[string]float64)
node1Neighbors["2node2"] = 5
node1Neighbors["3node3"] = 20
log.Printf("node1Neighbors len = %d", len(node1Neighbors))
//2node2 neighbors
node2Neighbors = make(map[string]float64)
node2Neighbors["1node1"] = 5
node2Neighbors["3node3"] = 10
//3node3 neighbors
node3Neighbors = make(map[string]float64)
node3Neighbors["2node2"] = 10
node3Neighbors["1node1"] = 20
neighborsInfo = make(map[string]map[string]float64)
neighborsInfo["1node1"] = node1Neighbors
neighborsInfo["2node2"] = node2Neighbors
neighborsInfo["3node3"] = node3Neighbors
appTrafficInfo = safeAppTrafficInfo{
mu: sync.RWMutex{},
appTrafficInfo: make(map[string]safeFloatValueMap),
podsDistributionInfo = safePodsDistributionMap{
mu: sync.RWMutex{},
podsDistributionRatioMap: make(map[string]safeIntValueMap),
appUpdateInfoMap = safeUpdatedCheckMap{
mu: sync.RWMutex{},
isMapUpdated: make(map[string]bool),
Colors: true,
Flag: log.Ldate | log.Ltime | log.Lshortfile,
level := StringToLevel(os.Getenv("LOG_LEVEL"))
log.Print("Log level was set to ", strings.ToUpper(level.String()))
router := httprouter.New()
predicates := []Predicate{TruePredicate}
for _, p := range predicates {
AddPredicate(router, p)
priorities := []Prioritize{ZeroPriority}
for _, p := range priorities {
AddPrioritize(router, p)
AddBind(router, NoBind)
// watch used to get all events
watch, err := clientset.CoreV1().Events(customConfig.Application.NameSpace).Watch(metav1.ListOptions{})
if err != nil {
ch := watch.ResultChan()
// this goroutine is used to parse all the events that generated by HPA object.
go func() {
for event := range ch {
e, _ := event.Object.(*v1.Event)
if (start.Sub(e.FirstTimestamp.Time)).Minutes() > 1 {
// This if statement is used to ignore old events
if strings.Contains(e.Message, "old size") && strings.Contains(e.Message, "new size") {
message := e.Message
oldSizeMessagePart := message[strings.Index(message, "old size: "):strings.Index(message, ", new size")]
oldSize, _ := strconv.Atoi(strings.ReplaceAll(oldSizeMessagePart, "old size: ", ""))
newSizeMessagePart := message[strings.Index(message, "new size: "):strings.Index(message, ", reason")]
newSize, _ := strconv.Atoi(strings.ReplaceAll(newSizeMessagePart, "new size: ", ""))
if newSize < oldSize {
appName := e.InvolvedObject.Name
appUpdateInfoMap.isMapUpdated[appName] = false
if _, exist := appTrafficInfo.appTrafficInfo[appName]; !exist {
appTrafficInfo.appTrafficInfo[appName] = nodesTrafficMapInit()
podsDistributionInfo.podsDistributionRatioMap[appName] = nodesPodsDistributionMapInit()
log.Printf("goroutine id %d", goid())
log.Print("====HPA Upscale happends====")
log.Printf("=======New pod = %d=========", newSize - oldSize)
updateNodesTraffic(appTrafficInfo.appTrafficInfo[appName], appName)
calculatePodsDistribution(newSize - oldSize, appTrafficInfo.appTrafficInfo[appName], podsDistributionInfo.podsDistributionRatioMap[appName])
appUpdateInfoMap.isMapUpdated[appName] = true
time.Sleep(2 * time.Second)
log.Print("info: server starting on localhost port :8888")
bInit = false // Prevent old events affect
if err := http.ListenAndServe("localhost:8888", router); err != nil {
func calculatePodsDistribution(iNumberOfNewPodScheduledByHPA int, nodesTrafficMap safeFloatValueMap, podsDistributionRatioMap safeIntValueMap) {
log.Print("(Calculation Begin) Calculating pods distribution")
// We need to update latest traffic info for all nodes before calculation
sumTrafficFromMap := nodesTrafficMap.getValuesSum()
allNodesHasSameTraffic := false
if sumTrafficFromMap - EPS < 0 {
log.Printf("Sum < EPS")
sumTrafficFromMap = nodesTrafficMap.getValuesSum()
allNodesHasSameTraffic = true
sortedNodesNameBasedOnTraffic := getSortedMapKeysByValueDesc(nodesTrafficMap.getFloat64ValueMap())
iNumberOfNewPodScheduledByHPAClone := iNumberOfNewPodScheduledByHPA
for iNumberOfNewPodScheduledByHPAClone > 0 {
for i, nodeName := range sortedNodesNameBasedOnTraffic {
if iNumberOfNewPodScheduledByHPAClone == 0 {
numberOfPods := int(math.RoundToEven(float64(iNumberOfNewPodScheduledByHPA) * (nodesTrafficMap.getFloat64Value(nodeName) / sumTrafficFromMap)))
if numberOfPods > iNumberOfNewPodScheduledByHPAClone {
numberOfPods = iNumberOfNewPodScheduledByHPAClone
if numberOfPods == 0 && i == 0 {
allNodesHasSameTraffic = true
if allNodesHasSameTraffic == true {
numberOfPods = 1
currentPodsOnNode := podsDistributionRatioMap.getInValue(nodeName)
podsDistributionRatioMap.setIntValue(nodeName, currentPodsOnNode + numberOfPods)
iNumberOfNewPodScheduledByHPAClone = iNumberOfNewPodScheduledByHPAClone - numberOfPods
log.Print("(Calculation Done) Pods distribution calculated")
func getAllRunningAppPod () int {
// get the pod list
podList, _ := clientset.CoreV1().Pods(customConfig.Application.NameSpace).List(metav1.ListOptions{
LabelSelector: "app=" + customConfig.Application.Name,
count := 0
for _, pod := range podList.Items {
if pod.Status.Phase == v1.PodRunning {
// List() returns a pointer to slice, derefernce it, before iterating
log.Printf("info:func GetAllActiveAppPod() => Current pods in cluster = %d", count)
return count
func initConfigs () {
log.Print("Initialize configuration")
//Read configuration file
log.Print("Read configuration file")
dir, err := filepath.Abs(filepath.Dir(os.Args[0]))
log.Printf("Current dir = %s", dir)
if err != nil {
f, err := os.Open(dir + "/config.yaml")
if err != nil {
decoder := yaml.NewDecoder(f)
err = decoder.Decode(&customConfig)
if err != nil {
if errWhenCloseFile := f.Close(); errWhenCloseFile != nil {
log.Printf("config path %s", customConfig.K8sConfigPath.Path)
// initConfigs
config , _ = clientcmd.BuildConfigFromFlags("", customConfig.K8sConfigPath.Path)
if config == nil {
log.Print("Config is nil")
clientset, _ = kubernetes.NewForConfig(config)
appPodOptions = metav1.ListOptions{
LabelSelector: "app=" + customConfig.Application.Name,
func initScheduleRules () {
log.Print("Adding schedule rules")
TruePredicate = Predicate{
Name: "always_true",
Func: func(pod v1.Pod, node v1.Node) (bool, error) {
return true, nil
ZeroPriority = Prioritize{
Name: "traffic_neighbors_scoring",
Func: func(pod v1.Pod, nodes []v1.Node) (*schedulerapi.HostPriorityList, error) {
var priorityList schedulerapi.HostPriorityList
isScheduled := false
priorityList = make([]schedulerapi.HostPriority, len(nodes))
//missingLabelErr := "Application " + pod.Name + "does not have app label"
bMissingAppLabel := false
labelMap := pod.ObjectMeta.Labels
if labelMap == nil {
bMissingAppLabel = true
if _, exist := labelMap["app"]; !exist {
bMissingAppLabel = true
appName := labelMap["app"]
isHPAReady := false
hpa, err := clientset.AutoscalingV1().HorizontalPodAutoscalers("default").Get(appName, metav1.GetOptions{})
if err == nil {
// The below code is used to check that hpa is ready or not => If it is not ready, it means that all pods pass to this extender prioritize is init pods
var hpaConditions []v1autoscaling.HorizontalPodAutoscalerCondition
if err := json.Unmarshal([]byte(hpa.ObjectMeta.Annotations[autoscaling.HorizontalPodAutoscalerConditionsAnnotation]), &hpaConditions); err != nil {
panic("Cannot get hpa conditions")
for _, condition := range hpaConditions {
if condition.Type == v1autoscaling.ScalingActive {
if condition.Status == "True" {
isHPAReady = true
log.Printf("### HPA %s is ready", hpa.Name)
if strings.Contains(pod.Name, appName) && isHPAReady == true && !bMissingAppLabel {
isAppNameExistInPodsDisMap := false
isAppNameExistInUpdateInfoMap := false
_, isAppNameExistInPodsDisMap = podsDistributionInfo.podsDistributionRatioMap[appName]
_, isAppNameExistInUpdateInfoMap = appUpdateInfoMap.isMapUpdated[appName]
bIsMapUpdated := appUpdateInfoMap.isMapUpdated[appName]
bLog1Time := true
for !isAppNameExistInPodsDisMap || !isAppNameExistInUpdateInfoMap || !bIsMapUpdated {
if bLog1Time {
log.Printf("Wait for all maps to be updated ...")
bLog1Time = false
_, isAppNameExistInPodsDisMap = podsDistributionInfo.podsDistributionRatioMap[appName]
_, isAppNameExistInUpdateInfoMap = appUpdateInfoMap.isMapUpdated[appName]
bIsMapUpdated = appUpdateInfoMap.isMapUpdated[appName]
log.Printf("All Maps are updated")
// Use for loop here to make sure that no anynomous host prioritize exist
for !isScheduled {
sortedNodeNameBasedOnPodsValue := getSortedMapKeysByPodsValueDesc(podsDistributionInfo.podsDistributionRatioMap[appName].intValueMap)
for _, nodeName := range sortedNodeNameBasedOnPodsValue {
podsOnNode := podsDistributionInfo.podsDistributionRatioMap[appName].intValueMap[nodeName]//podsDistributionRatioMap.getInValue(nodeName)
if podsOnNode == 0 {
log.Print("Start to schedule 1 pod")
if nodeExistOnList(nodeName, nodes) {
iMaxScore := len(nodes) * 10
iIndex := 0
priorityList[iIndex] = schedulerapi.HostPriority{
Host: nodeName,
Score: int64(iMaxScore),
log.Printf("Node %s has score = %d", nodeName, iMaxScore)
iIndex += 1
iMaxScore = iMaxScore - 10
sortedNodeNameBasedOnDelayValue := getSortedMapKeysByValueAsc(neighborsInfo[nodeName])
for _, neighbors := range sortedNodeNameBasedOnDelayValue {
if nodeExistOnList(neighbors, nodes) {
priorityList[iIndex] = schedulerapi.HostPriority{
Host: neighbors,
Score: int64(iMaxScore),
log.Printf("Node %s has score = %d", neighbors, iMaxScore)
iMaxScore = iMaxScore - 10
isScheduled = true
oldPodsValueOnNode := podsDistributionInfo.podsDistributionRatioMap[appName].intValueMap[nodeName]//podsDistributionRatioMap.getInValue(nodeName)
podsDistributionInfo.podsDistributionRatioMap[appName].intValueMap[nodeName] = oldPodsValueOnNode - 1
} else {
// Iterate all neighbors and give them score
iMaxScore := len(nodes) * 10
iIndex := 0
sortedNodeNameBasedOnDelayValue := getSortedMapKeysByValueAsc(neighborsInfo[nodeName])
for _, neighbors := range sortedNodeNameBasedOnDelayValue {
if nodeExistOnList(neighbors, nodes) {
priorityList[iIndex] = schedulerapi.HostPriority{
Host: neighbors,
Score: int64(iMaxScore),
iMaxScore = iMaxScore - 10
isScheduled = true
oldPodsValueOnNode := podsDistributionInfo.podsDistributionRatioMap[appName].intValueMap[nodeName]
podsDistributionInfo.podsDistributionRatioMap[appName].intValueMap[nodeName] = oldPodsValueOnNode - 1
} else {
log.Printf("No Prioritize schedule")
iMaxScore := len(nodes) * 10
log.Printf("Len of podsDis map = %d", len(podsDis))
for k, v := range podsDis {
if v == 0 {
log.Printf("k = %s", k)
for i, node := range nodes {
log.Printf("i = %d", i)
log.Printf("Filtered: node name = %s", node.Name)
if node.Name == k {
priorityList[i] = schedulerapi.HostPriority{
Host: node.Name,
Score: int64(iMaxScore),
} else {
priorityList[i] = schedulerapi.HostPriority{
Host: node.Name,
Score: 0,
podsDis[k] = podsDis[k] - 1
return &priorityList, nil
if _, exist := podsDistributionInfo.podsDistributionRatioMap[appName]; exist {
isAllNodesRemainZeroPod := true
for _, v := range podsDistributionInfo.podsDistributionRatioMap[appName].intValueMap {
if v != 0 {
isAllNodesRemainZeroPod = false
if isAllNodesRemainZeroPod {
appUpdateInfoMap.isMapUpdated[appName] = false
return &priorityList, nil
NoBind = Bind{
Func: func(podName string, podNamespace string, podUID types.UID, node string) error {
return fmt.Errorf("This extender doesn't support Bind. Please make 'BindVerb' be empty in your ExtenderConfig.")
EchoPreemption = Preemption{
Func: func(_ v1.Pod, _ map[string]*schedulerapi.Victims, nodeNameToMetaVictims map[string]*schedulerapi.MetaVictims, ) map[string]*schedulerapi.MetaVictims {
return nodeNameToMetaVictims
func getNodesTrafficInfoFromEndpoints(workerNodeName string, appName string) float64 {
realEPName := ""
endpoints, _ := clientset.CoreV1().Endpoints("default").List(metav1.ListOptions{})
for _, ep := range endpoints.Items {
if strings.Contains(ep.Name, workerNodeName) && strings.Contains(ep.Name, appName) {
realEPName = ep.Name
if realEPName == "" {
klog.Infof("EP name = empty")
return 0
realEP, _ := clientset.CoreV1().Endpoints("default").Get(realEPName, metav1.GetOptions{})
annotation := realEP.ObjectMeta.Annotations
if annotation == nil {
klog.Infof("EP %s does not have traffic info annotations", realEPName)
return 0
curNumRequests, _ := strconv.Atoi(annotation["numRequests"])
klog.Infof("CurrentNumRequests = %d", curNumRequests)
oldNumRequests, _ := strconv.Atoi(annotation["oldNumRequests"])
klog.Infof("oldNumRequests = %d", oldNumRequests)
trafficValue := float64(curNumRequests - oldNumRequests)
klog.Infof("Traffic value from ep %s = %f", realEPName, trafficValue)
annotation["reset"] = "true"
return trafficValue
func updateNodesTraffic(nodesTrafficMap safeFloatValueMap, appName string) {
//TODO update nodes traffic here by getting information from endpoint on each node
// Get all worker nodes
log.Print("(Update begin) Updating nodes traffic")
workerNodes, _ := clientset.CoreV1().Nodes().List(metav1.ListOptions{
LabelSelector: "",
// Get traffic for all worker nodes from EP
for _, workerNode := range workerNodes.Items {
nodesTrafficMap.setFloatValue(workerNode.Name, getNodesTrafficInfoFromEndpoints(workerNode.Name, appName))
log.Print("(Update Done) Updated nodes traffic")
// This func is used to sort map keys by their values
func getSortedMapKeysByValueAsc(inputMap map[string]float64) []string {
mapKeys := make([]string, 0, len(inputMap))
for key := range inputMap {
mapKeys = append(mapKeys, key)
sort.Slice(mapKeys, func(i, j int) bool {
return inputMap[mapKeys[i]] < inputMap[mapKeys[j]]
klog.Info("Neighbors name sorted by delay asc")
for i, key := range mapKeys {
klog.Infof("- Index %d: %s", i, key)
return mapKeys
// This func is used to sort map keys by their values
func getSortedMapKeysByPodsValueDesc(inputMap map[string]int) []string {
mapKeys := make([]string, 0, len(inputMap))
for key := range inputMap {
mapKeys = append(mapKeys, key)
sort.Slice(mapKeys, func(i, j int) bool {
return inputMap[mapKeys[i]] > inputMap[mapKeys[j]]
return mapKeys
func getSortedMapKeysByValueDesc(inputMap map[string]float64) []string {
mapKeys := make([]string, 0, len(inputMap))
for key := range inputMap {
mapKeys = append(mapKeys, key)
sort.Slice(mapKeys, func(i, j int) bool {
return inputMap[mapKeys[i]] > inputMap[mapKeys[j]]
klog.Info("Sorted keys of traffic map")
for i, key := range mapKeys {
klog.Infof("- Index %d: %s", i, key)
return mapKeys
func nodesTrafficMapInit() safeFloatValueMap {
nodesTrafficMap := safeFloatValueMap{stringFloat64Map: make(map[string]float64)}
workerNodes, _ := clientset.CoreV1().Nodes().List(metav1.ListOptions{
LabelSelector: "",
// Get traffic for all worker nodes from EP
for _, workerNode := range workerNodes.Items {
nodesTrafficMap.setFloatValue(workerNode.Name, 0.0)
return nodesTrafficMap
func nodesPodsDistributionMapInit() safeIntValueMap {
nodesPodsDistributionMap := safeIntValueMap{intValueMap: make(map[string]int)}
workerNodes, _ := clientset.CoreV1().Nodes().List(metav1.ListOptions{
LabelSelector: "",
// Get traffic for all worker nodes from EP
for _, workerNode := range workerNodes.Items {
nodesPodsDistributionMap.setIntValue(workerNode.Name, 0)
return nodesPodsDistributionMap
func calTotalFromMapValues(trafficMap map[string]float64) float64 {
result := 0.0
for _, value := range trafficMap {
result += value
return result
func nodeExistOnList(a string, list []v1.Node) bool {
for _, b := range list {
if b.Name == a {
return true
return false
func equalizeNodeTraffic(cloneNodesTrafficMap map[string]float64){
for nodeName, _ := range cloneNodesTrafficMap {
cloneNodesTrafficMap[nodeName] = 1.0
func updateOldTrafficForEPs() {
workerNodesName := make([]string, 0)
EPsName := make([]string, 0)
workerNodes, _ := clientset.CoreV1().Nodes().List(metav1.ListOptions{
LabelSelector: "",
for _, worker := range workerNodes.Items {
workerNodesName = append(workerNodesName, worker.Name)
if len(workerNodesName) == 0 {
log.Printf("Worker nodes name list is empty")
endpoints, _ := clientset.CoreV1().Endpoints("default").List(metav1.ListOptions{})
epPrefix := "-" + customConfig.Application.Name
for _, ep := range endpoints.Items {
if !strings.Contains(ep.Name, customConfig.Application.Name) || ep.Name == customConfig.Application.Name {
if stringInSlice(ep.Name[0:strings.Index(ep.Name, epPrefix)], workerNodesName) {
EPsName = append(EPsName, ep.Name)
klog.Infof("EPsName value = %s", ep.Name)
for _, epName := range EPsName {
realEP, _ := clientset.CoreV1().Endpoints("default").Get(epName, metav1.GetOptions{})
annotation := realEP.ObjectMeta.Annotations
if annotation == nil {
klog.Infof("EP %s does not have traffic info annotations", epName)
annotation["reset"] = "true"
realEP.ObjectMeta.Annotations = annotation
_, error := clientset.CoreV1().Endpoints("default").Update(realEP)
if error != nil {
klog.Infof("Can not update old num requests for EP %s", realEP.Name)
func stringInSlice(a string, list []string) bool {
for _, b := range list {
if b == a {
return true
return false
func isAllMapValueEqualToZero (inputMap map[string]int) bool {
for _, v := range inputMap {
if v != 0 {
return false
return true
func (c *safeIntValueMap) setIntValue(key string, value int) {
// Lock so only one goroutine at a time can access the map c.intValueMap
c.intValueMap[key] = value
func (c *safeIntValueMap) getInValue(key string) int {
// Lock so only one goroutine at a time can access the map c.v.
return c.intValueMap[key]
func (c *safeIntValueMap) getIntValueMap() map[string]int {
return c.intValueMap
func (c *safeIntValueMap) printPodsDistributionInfo() {
for k, v := range c.intValueMap {
klog.Infof("Node %s has %d pods to schedule", k, v)
func (c *safeFloatValueMap) setFloatValue(key string, value float64) {
// Lock so only one goroutine at a time can access the map c.stringFloat64Map
c.stringFloat64Map[key] = value
func (c *safeFloatValueMap) getValuesSum() float64 {
sum := 0.0
for _, v := range c.stringFloat64Map {
sum += v
return sum
func (c *safeFloatValueMap) equalizeValuesTo1() {
// Lock so only one goroutine at a time can access the map c.stringFloat64Map
for k, _ := range c.stringFloat64Map {
c.stringFloat64Map[k] = 1.0
func (c *safeFloatValueMap) getFloat64Value(key string) float64 {
// Lock so only one goroutine at a time can access the map c.v.
return c.stringFloat64Map[key]
func (c *safeFloatValueMap) getFloat64ValueMap() map[string]float64 {
return c.stringFloat64Map
func (c *safeFloatValueMap) printNodesTrafficInfo() {
for k, v := range c.stringFloat64Map {
klog.Infof("Node %s has traffic %f", k, v)
func goid() int {
var buf [64]byte
n := runtime.Stack(buf[:], false)
idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine "))[0]
id, err := strconv.Atoi(idField)
if err != nil {
panic(fmt.Sprintf("cannot get goroutine id: %v", err))
return id