summaryrefslogtreecommitdiff
path: root/libgo/go/cmd/go/internal/par/work.go
diff options
context:
space:
mode:
Diffstat (limited to 'libgo/go/cmd/go/internal/par/work.go')
-rw-r--r--libgo/go/cmd/go/internal/par/work.go149
1 files changed, 149 insertions, 0 deletions
diff --git a/libgo/go/cmd/go/internal/par/work.go b/libgo/go/cmd/go/internal/par/work.go
new file mode 100644
index 00000000000..a568c86f60b
--- /dev/null
+++ b/libgo/go/cmd/go/internal/par/work.go
@@ -0,0 +1,149 @@
+// Copyright 2018 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package par implements parallel execution helpers.
+package par
+
+import (
+ "math/rand"
+ "sync"
+ "sync/atomic"
+)
+
+// Work manages a set of work items to be executed in parallel, at most once each.
+// The items in the set must all be valid map keys.
+type Work struct {
+ f func(interface{}) // function to run for each item
+ running int // total number of runners
+
+ mu sync.Mutex
+ added map[interface{}]bool // items added to set
+ todo []interface{} // items yet to be run
+ wait sync.Cond // wait when todo is empty
+ waiting int // number of runners waiting for todo
+}
+
+func (w *Work) init() {
+ if w.added == nil {
+ w.added = make(map[interface{}]bool)
+ }
+}
+
+// Add adds item to the work set, if it hasn't already been added.
+func (w *Work) Add(item interface{}) {
+ w.mu.Lock()
+ w.init()
+ if !w.added[item] {
+ w.added[item] = true
+ w.todo = append(w.todo, item)
+ if w.waiting > 0 {
+ w.wait.Signal()
+ }
+ }
+ w.mu.Unlock()
+}
+
+// Do runs f in parallel on items from the work set,
+// with at most n invocations of f running at a time.
+// It returns when everything added to the work set has been processed.
+// At least one item should have been added to the work set
+// before calling Do (or else Do returns immediately),
+// but it is allowed for f(item) to add new items to the set.
+// Do should only be used once on a given Work.
+func (w *Work) Do(n int, f func(item interface{})) {
+ if n < 1 {
+ panic("par.Work.Do: n < 1")
+ }
+ if w.running >= 1 {
+ panic("par.Work.Do: already called Do")
+ }
+
+ w.running = n
+ w.f = f
+ w.wait.L = &w.mu
+
+ for i := 0; i < n-1; i++ {
+ go w.runner()
+ }
+ w.runner()
+}
+
+// runner executes work in w until both nothing is left to do
+// and all the runners are waiting for work.
+// (Then all the runners return.)
+func (w *Work) runner() {
+ for {
+ // Wait for something to do.
+ w.mu.Lock()
+ for len(w.todo) == 0 {
+ w.waiting++
+ if w.waiting == w.running {
+ // All done.
+ w.wait.Broadcast()
+ w.mu.Unlock()
+ return
+ }
+ w.wait.Wait()
+ w.waiting--
+ }
+
+ // Pick something to do at random,
+ // to eliminate pathological contention
+ // in case items added at about the same time
+ // are most likely to contend.
+ i := rand.Intn(len(w.todo))
+ item := w.todo[i]
+ w.todo[i] = w.todo[len(w.todo)-1]
+ w.todo = w.todo[:len(w.todo)-1]
+ w.mu.Unlock()
+
+ w.f(item)
+ }
+}
+
+// Cache runs an action once per key and caches the result.
+type Cache struct {
+ m sync.Map
+}
+
+type cacheEntry struct {
+ done uint32
+ mu sync.Mutex
+ result interface{}
+}
+
+// Do calls the function f if and only if Do is being called for the first time with this key.
+// No call to Do with a given key returns until the one call to f returns.
+// Do returns the value returned by the one call to f.
+func (c *Cache) Do(key interface{}, f func() interface{}) interface{} {
+ entryIface, ok := c.m.Load(key)
+ if !ok {
+ entryIface, _ = c.m.LoadOrStore(key, new(cacheEntry))
+ }
+ e := entryIface.(*cacheEntry)
+ if atomic.LoadUint32(&e.done) == 0 {
+ e.mu.Lock()
+ if atomic.LoadUint32(&e.done) == 0 {
+ e.result = f()
+ atomic.StoreUint32(&e.done, 1)
+ }
+ e.mu.Unlock()
+ }
+ return e.result
+}
+
+// Get returns the cached result associated with key.
+// It returns nil if there is no such result.
+// If the result for key is being computed, Get does not wait for the computation to finish.
+func (c *Cache) Get(key interface{}) interface{} {
+ entryIface, ok := c.m.Load(key)
+ if !ok {
+ return nil
+ }
+ e := entryIface.(*cacheEntry)
+ if atomic.LoadUint32(&e.done) == 0 {
+ return nil
+ }
+ return e.result
+}