You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

122 lines
3.4 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package service_base
import (
"app/internal/msg_inner"
"core/tools"
"time"
)
type LB_TYPE int32
const (
LBTypeNone LB_TYPE = iota
LBTypeFrontend
LBTypeBackend
)
const (
CHECK_ALIVE_RETRY_MAX = 3 //被动检测存活 重试次数
FRONTEND_CHECKALIVE_INTERVAL = 30 //前端检测存活的间隔 秒
FRONTEND_ROUTER_REFRESH_INTERVAL = 60 //前端路由刷新时间 秒
BAKCEND_REPORT_INTERVAL = 30 //后端主动上报的间隔 秒
)
func NewLoadBalanceManager() ILoadBalanceManager {
return &LoadBalanceManager{
backendMap: make(map[string]*LoadBalanceNode),
}
}
var _ ILoadBalanceManager = (*LoadBalanceManager)(nil) //interface实现检查
type ILoadBalanceManager interface {
Init(serviceBase IServiceBase) ILoadBalanceManager
StartAsFrontend() //作为前端启动
StartAsBackend(frontendAppId string) //作为后端启动
GetBestBackend() (addr, appId string) //获得路由后的后端地址 目前策略根据后端上报的rank选择最优
}
type LoadBalanceManager struct {
serviceBase IServiceBase
frontendAppId string
bestBackendAppId string
backendMap map[string]*LoadBalanceNode
lbType LB_TYPE
}
type LoadBalanceNode struct {
appId string
address string
rank float64
isAlive bool
}
func (s *LoadBalanceManager) Init(serviceBase IServiceBase) ILoadBalanceManager {
s.serviceBase = serviceBase
s.initAPI()
return s
}
func (s *LoadBalanceManager) StartAsFrontend() {
tools.AssertTrue(s.lbType == LBTypeNone, "StartAsFrontend fail, lbType is not LBTypeNone")
s.lbType = LBTypeFrontend
act := s.serviceBase.Actor()
act.AddTimer(time.Second*FRONTEND_CHECKALIVE_INTERVAL, -1, func(dt int64) {
for _, node := range s.backendMap {
var timerId int64
retry := CHECK_ALIVE_RETRY_MAX
act.Send(act.Id(), func() {
timerId = act.AddTimer(time.Second, -1, func(dt int64) {
if _, err := act.RequestWait(node.appId, &msg_inner.CheckAlive{}); err == nil {
act.CancelTimer(timerId)
node.isAlive = true
} else {
retry--
if retry < 0 {
act.CancelTimer(timerId)
node.isAlive = false
}
}
})
})
}
})
act.AddTimer(time.Second*FRONTEND_ROUTER_REFRESH_INTERVAL, -1, func(dt int64) {
if len(s.backendMap) == 0 {
return
}
var bestAppId string
var bestRank float64
for _, node := range s.backendMap {
if bestAppId == "" || node.rank < bestRank {
bestAppId = node.appId
bestRank = node.rank
}
}
s.bestBackendAppId = bestAppId
})
}
func (s *LoadBalanceManager) StartAsBackend(frontendAppId string) {
tools.AssertTrue(s.lbType == LBTypeNone, "StartAsFrontend fail, lbType is not LBTypeNone")
s.lbType = LBTypeBackend
s.frontendAppId = frontendAppId
var addr string
if s.serviceBase.Config().Has("client_addr") {
_, port := tools.AddressSplit(s.serviceBase.Config().String("client_addr"))
addr = tools.AddressMerge(tools.GetOutboundIP().String(), port)
}
act := s.serviceBase.Actor()
act.AddTimer(time.Second*BAKCEND_REPORT_INTERVAL, -1, func(dt int64) {
act.Send(s.frontendAppId, &msg_inner.LoadBalanceReport{
Rank: 0,
Address: addr,
})
})
}
func (s *LoadBalanceManager) GetBestBackend() (addr, appId string) {
tools.AssertTrue(s.lbType == LBTypeFrontend, "GetBestBackend fail, lbType is not LBTypeFrontend")
if node, ok := s.backendMap[s.bestBackendAppId]; ok {
addr, appId = node.address, node.appId
}
return
}