營銷型網(wǎng)站有哪些出名的沈陽seo關(guān)鍵詞排名
singlefligt使用方法和源碼解讀
介紹
- sync.once保證其整個生命周期內(nèi)只調(diào)用一次;而singleflight則可以保證在一定范圍內(nèi)其只調(diào)用一次。
背景|使用場景
- 應(yīng)對緩存擊穿:加鎖可以解決這個問題,但是加鎖不太靈活(不能控制訪問頻率之類的),singlefilght可以通過定時清除的方式限制頻率
- 去除重復(fù)請求:當一定時間范圍內(nèi)存在了大量的重復(fù)請求,可以考慮使用:一致性hash負載均衡+singlefilght收束請求。用戶根據(jù)key使用一致性hash請求到特定的服務(wù)機器上,服務(wù)對請求執(zhí)行??
singlefilght
??后,再去請求下游,以此收束重復(fù)請求。
用法
基礎(chǔ)用法:合并請求,合并第一個請求執(zhí)行過程中到達的所有請求。
從下面的代碼和輸出可以看出每次的輸出query...
?都是基本上每10ms一次,而其中穿插了很多次打印結(jié)果,代表著在函數(shù)真正執(zhí)行過程中所有的請求都是被攔截住了,當函數(shù)執(zhí)行完時,會將所有的?**結(jié)果共享?給這個期間到來的所有請求**。
package mainimport ("fmt""golang.org/x/sync/singleflight""log""math/rand""time"
)func getData(id int64) string {log.Printf("query...%d\n", id)time.Sleep(10 * time.Millisecond) // 模擬一個比較耗時的操作,10msreturn "liwenzhou.com" + fmt.Sprintf("%d", id)
}func main() {log.SetFlags(log.Lmicroseconds)g := new(singleflight.Group)var i int64 = 0for {time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond)// 調(diào)用go func() {icopy := ii++v1, _, shared := g.Do("getData", func() (interface{}, error) {ret := getData(icopy)return ret, nil})log.Printf("call: v1:%v, shared:%v , i:%d\n", v1, shared, i)}()}}/**
query...1
1st call: v1:liwenzhou.com1, shared:true
2nd call: v2:liwenzhou.com1, shared:true
*/
? test21 git:(main) ? go run main.go
20:11:31.995346 query...0
20:11:32.005800 call: v1:liwenzhou.com0, shared:true , i:4
20:11:32.005799 call: v1:liwenzhou.com0, shared:true , i:4
20:11:32.005804 call: v1:liwenzhou.com0, shared:true , i:4
20:11:32.005807 call: v1:liwenzhou.com0, shared:true , i:4
20:11:32.006386 query...4
20:11:32.016671 call: v1:liwenzhou.com4, shared:true , i:9
20:11:32.016687 call: v1:liwenzhou.com4, shared:true , i:9
20:11:32.016691 call: v1:liwenzhou.com4, shared:true , i:9
20:11:32.016693 call: v1:liwenzhou.com4, shared:true , i:9
20:11:32.016694 call: v1:liwenzhou.com4, shared:true , i:9
20:11:32.017366 query...9
20:11:32.027418 call: v1:liwenzhou.com9, shared:true , i:16
20:11:32.027433 call: v1:liwenzhou.com9, shared:true , i:16
20:11:32.027436 call: v1:liwenzhou.com9, shared:true , i:16
20:11:32.027437 call: v1:liwenzhou.com9, shared:true , i:16
進階用法1:超時控制DoChan
?
在基礎(chǔ)方法中,在某一次請求執(zhí)行過程中,所有到來的新的請求都會阻塞等待這個請求的執(zhí)行結(jié)果。如果真正執(zhí)行的過程超時出錯了,其他并發(fā)的請求就只能等待。
考慮到singleflight庫通常用于避免緩存擊穿,需要查詢外部數(shù)據(jù)庫,這樣的出錯、超時的場景是必須要考慮的。
比如正常是10ms,但是超時時間設(shè)置的是50ms。由于并發(fā)數(shù)量并不需要真正為1,因此想12ms就停止阻塞。
// 使用DoChan進行超時控制
func CtrTimeout(ctx context.Context, req interface{}){ch := g.DoChan(key, func() (interface{}, error) {return call(ctx, req)})select {case <-time.After(500 * time.Millisecond): returncase <-ctx.Done()returncase ret := <-ch: go handle(ret)}
}
進階用法2:手動合并請求頻率控制
在基礎(chǔ)用法中,請求合并的頻率是 一個請求的執(zhí)行時間 ,希望達到自己控制合并的時間,不限制為一個請求的執(zhí)行時間。
方法:另起一個協(xié)程刪除對應(yīng)的key
?,一般是在go.Do中另起Forget
?一次即可。
// 另外啟用協(xié)程定時刪除key,提高請求下游次數(shù),提高成功率
func CtrRate(ctx context.Context, req interface{}){res, _, shared := g.Do(key, func() (interface{}, error) {// 另外其一個goroutine,等待一段時間后,刪除key// 刪除key后的調(diào)用,會重新執(zhí)行Dogo func() {time.Sleep(10 * time.Millisecond)g.Forget(key)}()return call(ctx, req)})handle(res)
}
總結(jié)
singlefligt可以合并多個請求達到限頻率的目的??梢允褂?code>DoChan?方法或Forget
?來手動控制請求的頻率和超時返回。
源碼解讀
singleflight的源碼就一個文件,因此就放在備注里了:
// call is an in-flight or completed singleflight.Do call
type call struct {wg sync.WaitGroup //并發(fā)調(diào)用的的時候,一個協(xié)程執(zhí)行其它協(xié)程阻塞,用于執(zhí)行完之后通知其他阻塞的協(xié)程// These fields are written once before the WaitGroup is done// and are only read after the WaitGroup is done.val interface{} //保存執(zhí)行的結(jié)果值err error //保存執(zhí)行過程中的錯誤,只會寫入一次// These fields are read and written with the singleflight// mutex held before the WaitGroup is done, and are read but// not written after the WaitGroup is done.dups int //記錄并發(fā)數(shù)量,用于執(zhí)行后返回時候共享的結(jié)果(Shared)chans []chan<- Result //用于支持結(jié)果通過channel返回出去
}// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
type Group struct {mu sync.Mutex // protects m 保證m的并發(fā)安全m map[string]*call // lazily initialized //m代表任務(wù),一個key-val代表一個任務(wù)正在執(zhí)行// 任務(wù)執(zhí)行完之后會從map里面被刪除
}// Result holds the results of Do, so they can be passed
// on a channel.
// 保存結(jié)果的結(jié)構(gòu)體,這樣才能支持通過channel返回結(jié)果
type Result struct { Val interface{}Err errorShared bool //是否和其他協(xié)程共享的結(jié)果
}// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {g.mu.Lock()if g.m == nil {g.m = make(map[string]*call)}if c, ok := g.m[key]; ok {c.dups++g.mu.Unlock()c.wg.Wait()if e, ok := c.err.(*panicError); ok {panic(e)} else if c.err == errGoexit {runtime.Goexit()}return c.val, c.err, true}c := new(call)c.wg.Add(1)g.m[key] = cg.mu.Unlock()g.doCall(c, key, fn)return c.val, c.err, c.dups > 0
}// DoChan is like Do but returns a channel that will receive the
// results when they are ready.
//
// The returned channel will not be closed.
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {ch := make(chan Result, 1)g.mu.Lock()if g.m == nil {g.m = make(map[string]*call)}if c, ok := g.m[key]; ok {c.dups++c.chans = append(c.chans, ch)g.mu.Unlock()return ch}c := &call{chans: []chan<- Result{ch}}c.wg.Add(1)g.m[key] = cg.mu.Unlock()go g.doCall(c, key, fn)return ch
}// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {normalReturn := falserecovered := false// use double-defer to distinguish panic from runtime.Goexit,// more details see https://golang.org/cl/134395// 這塊設(shè)計非常有意思,主要是針對兩種異常退出的情況使用了雙重defer來保證正確處理,具體見下面注釋defer func() {// the given function invoked runtime.Goexitif !normalReturn && !recovered { //結(jié)合下面代碼分析,一定是出現(xiàn)了exitc.err = errGoexit}g.mu.Lock()defer g.mu.Unlock()c.wg.Done() //通知其他協(xié)程可以拿結(jié)果了if g.m[key] == c { //刪掉任務(wù)delete(g.m, key)}if e, ok := c.err.(*panicError); ok {// In order to prevent the waiting channels from being blocked forever,// needs to ensure that this panic cannot be recovered.if len(c.chans) > 0 {go panic(e)select {} // Keep this goroutine around so that it will appear in the crash dump.} else {panic(e)}} else if c.err == errGoexit {// Already in the process of goexit, no need to call again} else {// Normal returnfor _, ch := range c.chans {ch <- Result{c.val, c.err, c.dups > 0}}}}()func() {defer func() {if !normalReturn {//到這里說明:要么發(fā)生panic,要么發(fā)生exit// Ideally, we would wait to take a stack trace until we've determined// whether this is a panic or a runtime.Goexit.//// Unfortunately, the only way we can distinguish the two is to see// whether the recover stopped the goroutine from terminating, and by// the time we know that, the part of the stack trace relevant to the// panic has been discarded.if r := recover(); r != nil { //panic就賦值c.err = newPanicError(r)}}}()c.val, c.err = fn()normalReturn = true //到這里說明:沒有發(fā)生panic,任務(wù)里面沒有g(shù)o exit}()//執(zhí)行不到這里的話說明是exitif !normalReturn { //要么發(fā)生panic,要么發(fā)生exit recovered = true //結(jié)合分析------> 一定是panic,所以賦值}
}// Forget tells the singleflight to forget about a key. Future calls
// to Do for this key will call the function rather than waiting for
// an earlier call to complete.
func (g *Group) Forget(key string) {g.mu.Lock()delete(g.m, key)g.mu.Unlock()
}
?
雙重defer來確定是exit
?還是發(fā)生了panic。
這塊設(shè)計非常有意思,主要是針對兩種異常退出的情況使用了雙重defer來保證
總結(jié)
singleflight使用map
?來隔離不同的任務(wù),map
?的key存在性標識任務(wù)是否在執(zhí)行(執(zhí)行完會馬上從map中刪除)。
對于一個任務(wù),使用call結(jié)構(gòu)體進行管理,其主要用于:控制并發(fā)(waitGroup
?),執(zhí)行,和傳遞結(jié)果(支持channel
?傳遞)。
執(zhí)行中很有意思的一點是使用了雙重defer來判斷異常退出是??panic
??退出還是調(diào)用了??exit()
? ?退出。
?