fix: fix ConfigClient data race (#483)
This commit is contained in:
parent
5cb07119d4
commit
dec3976954
|
@ -25,9 +25,11 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/aliyun/alibaba-cloud-sdk-go/services/kms"
|
||||
|
||||
"github.com/nacos-group/nacos-sdk-go/clients/cache"
|
||||
"github.com/nacos-group/nacos-sdk-go/clients/nacos_client"
|
||||
"github.com/nacos-group/nacos-sdk-go/common/constant"
|
||||
|
@ -46,7 +48,7 @@ type ConfigClient struct {
|
|||
mutex sync.Mutex
|
||||
configProxy ConfigProxy
|
||||
configCacheDir string
|
||||
currentTaskCount int
|
||||
currentTaskCount int32
|
||||
cacheMap cache.ConcurrentMap
|
||||
schedulerMap cache.ConcurrentMap
|
||||
}
|
||||
|
@ -253,7 +255,7 @@ func (client *ConfigClient) DeleteConfig(param vo.ConfigParam) (deleted bool, er
|
|||
return client.configProxy.DeleteConfigProxy(param, clientConfig.NamespaceId, clientConfig.AccessKey, clientConfig.SecretKey)
|
||||
}
|
||||
|
||||
//Cancel Listen Config
|
||||
// Cancel Listen Config
|
||||
func (client *ConfigClient) CancelListenConfig(param vo.ConfigParam) (err error) {
|
||||
clientConfig, err := client.GetClientConfig()
|
||||
if err != nil {
|
||||
|
@ -263,13 +265,14 @@ func (client *ConfigClient) CancelListenConfig(param vo.ConfigParam) (err error)
|
|||
client.cacheMap.Remove(util.GetConfigCacheKey(param.DataId, param.Group, clientConfig.NamespaceId))
|
||||
logger.Infof("Cancel listen config DataId:%s Group:%s", param.DataId, param.Group)
|
||||
remakeId := int(math.Ceil(float64(client.cacheMap.Count()) / float64(perTaskConfigSize)))
|
||||
if remakeId < client.currentTaskCount {
|
||||
currentTaskCount := int(atomic.LoadInt32(&client.currentTaskCount))
|
||||
if remakeId < currentTaskCount {
|
||||
client.remakeCacheDataTaskId(remakeId)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
//Remake cache data taskId
|
||||
// Remake cache data taskId
|
||||
func (client *ConfigClient) remakeCacheDataTaskId(remakeId int) {
|
||||
for i := 0; i < remakeId; i++ {
|
||||
count := 0
|
||||
|
@ -335,9 +338,9 @@ func (client *ConfigClient) ListenConfig(param vo.ConfigParam) (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
//Delay Scheduler
|
||||
//initialDelay the time to delay first execution
|
||||
//delay the delay between the termination of one execution and the commencement of the next
|
||||
// Delay Scheduler
|
||||
// initialDelay the time to delay first execution
|
||||
// delay the delay between the termination of one execution and the commencement of the next
|
||||
func (client *ConfigClient) delayScheduler(t *time.Timer, delay time.Duration, taskId string, execute func() error) {
|
||||
for {
|
||||
if v, ok := client.schedulerMap.Get(taskId); ok {
|
||||
|
@ -354,31 +357,31 @@ func (client *ConfigClient) delayScheduler(t *time.Timer, delay time.Duration, t
|
|||
}
|
||||
}
|
||||
|
||||
//Listen for the configuration executor
|
||||
// Listen for the configuration executor
|
||||
func (client *ConfigClient) listenConfigExecutor() func() error {
|
||||
return func() error {
|
||||
listenerSize := client.cacheMap.Count()
|
||||
taskCount := int(math.Ceil(float64(listenerSize) / float64(perTaskConfigSize)))
|
||||
|
||||
if taskCount > client.currentTaskCount {
|
||||
for i := client.currentTaskCount; i < taskCount; i++ {
|
||||
currentTaskCount := int(atomic.LoadInt32(&client.currentTaskCount))
|
||||
if taskCount > currentTaskCount {
|
||||
for i := currentTaskCount; i < taskCount; i++ {
|
||||
client.schedulerMap.Set(strconv.Itoa(i), true)
|
||||
go client.delayScheduler(time.NewTimer(1*time.Millisecond), 10*time.Millisecond, strconv.Itoa(i), client.longPulling(i))
|
||||
}
|
||||
client.currentTaskCount = taskCount
|
||||
} else if taskCount < client.currentTaskCount {
|
||||
for i := taskCount; i < client.currentTaskCount; i++ {
|
||||
atomic.StoreInt32(&client.currentTaskCount, int32(taskCount))
|
||||
} else if taskCount < currentTaskCount {
|
||||
for i := taskCount; i < currentTaskCount; i++ {
|
||||
if _, ok := client.schedulerMap.Get(strconv.Itoa(i)); ok {
|
||||
client.schedulerMap.Set(strconv.Itoa(i), false)
|
||||
}
|
||||
}
|
||||
client.currentTaskCount = taskCount
|
||||
atomic.StoreInt32(&client.currentTaskCount, int32(taskCount))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
//Long polling listening configuration
|
||||
// Long polling listening configuration
|
||||
func (client *ConfigClient) longPulling(taskId int) func() error {
|
||||
return func() error {
|
||||
var listeningConfigs string
|
||||
|
@ -438,7 +441,7 @@ func (client *ConfigClient) longPulling(taskId int) func() error {
|
|||
|
||||
}
|
||||
|
||||
//Execute the Listener callback func()
|
||||
// Execute the Listener callback func()
|
||||
func (client *ConfigClient) callListener(changed, tenant string) {
|
||||
changedDecoded, _ := url.QueryUnescape(changed)
|
||||
changedConfigs := strings.Split(changedDecoded, "\u0001")
|
||||
|
|
Loading…
Reference in New Issue