diff options
Diffstat (limited to 'libgo/go/runtime/chan.go')
-rw-r--r-- | libgo/go/runtime/chan.go | 90 |
1 files changed, 69 insertions, 21 deletions
diff --git a/libgo/go/runtime/chan.go b/libgo/go/runtime/chan.go index 291fe0013d1..549e56620b6 100644 --- a/libgo/go/runtime/chan.go +++ b/libgo/go/runtime/chan.go @@ -133,6 +133,21 @@ func chanbuf(c *hchan, i uint) unsafe.Pointer { return add(c.buf, uintptr(i)*uintptr(c.elemsize)) } +// full reports whether a send on c would block (that is, the channel is full). +// It uses a single word-sized read of mutable state, so although +// the answer is instantaneously true, the correct answer may have changed +// by the time the calling function receives the return value. +func full(c *hchan) bool { + // c.dataqsiz is immutable (never written after the channel is created) + // so it is safe to read at any time during channel operation. + if c.dataqsiz == 0 { + // Assumes that a pointer read is relaxed-atomic. + return c.recvq.first == nil + } + // Assumes that a uint read is relaxed-atomic. + return c.qcount == c.dataqsiz +} + // entry point for c <- x from compiled code //go:nosplit func chansend1(c *hchan, elem unsafe.Pointer) { @@ -177,7 +192,7 @@ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // // After observing that the channel is not closed, we observe that the channel is // not ready for sending. Each of these observations is a single word-sized read - // (first c.closed and second c.recvq.first or c.qcount depending on kind of channel). + // (first c.closed and second full()). // Because a closed channel cannot transition from 'ready for sending' to // 'not ready for sending', even if the channel is closed between the two observations, // they imply a moment between the two when the channel was both not yet closed @@ -186,9 +201,10 @@ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // // It is okay if the reads are reordered here: if we observe that the channel is not // ready for sending and then observe that it is not closed, that implies that the - // channel wasn't closed during the first observation. - if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || - (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) { + // channel wasn't closed during the first observation. However, nothing here + // guarantees forward progress. We rely on the side effects of lock release in + // chanrecv() and closechan() to update this thread's view of c.closed and full(). + if !block && c.closed == 0 && full(c) { return false } @@ -250,7 +266,7 @@ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) - goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3) + gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // Ensure the value being sent is kept alive until the // receiver copies it out. The sudog has a pointer to the // stack object, but sudogs aren't considered as roots of the @@ -262,6 +278,7 @@ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { throw("G waiting list is corrupted") } gp.waiting = nil + gp.activeStackChans = false if gp.param == nil { if c.closed == 0 { throw("chansend: spurious wakeup") @@ -417,6 +434,16 @@ func closechan(c *hchan) { } } +// empty reports whether a read from c would block (that is, the channel is +// empty). It uses a single atomic read of mutable state. +func empty(c *hchan) bool { + // c.dataqsiz is immutable. + if c.dataqsiz == 0 { + return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil + } + return atomic.Loaduint(&c.qcount) == 0 +} + // entry points for <- c from compiled code //go:nosplit func chanrecv1(c *hchan, elem unsafe.Pointer) { @@ -457,21 +484,33 @@ func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) } // Fast path: check for failed non-blocking operation without acquiring the lock. - // - // After observing that the channel is not ready for receiving, we observe that the - // channel is not closed. Each of these observations is a single word-sized read - // (first c.sendq.first or c.qcount, and second c.closed). - // Because a channel cannot be reopened, the later observation of the channel - // being not closed implies that it was also not closed at the moment of the - // first observation. We behave as if we observed the channel at that moment - // and report that the receive cannot proceed. - // - // The order of operations is important here: reversing the operations can lead to - // incorrect behavior when racing with a close. - if !block && (c.dataqsiz == 0 && c.sendq.first == nil || - c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) && - atomic.Load(&c.closed) == 0 { - return + if !block && empty(c) { + // After observing that the channel is not ready for receiving, we observe whether the + // channel is closed. + // + // Reordering of these checks could lead to incorrect behavior when racing with a close. + // For example, if the channel was open and not empty, was closed, and then drained, + // reordered reads could incorrectly indicate "open and empty". To prevent reordering, + // we use atomic loads for both checks, and rely on emptying and closing to happen in + // separate critical sections under the same lock. This assumption fails when closing + // an unbuffered channel with a blocked send, but that is an error condition anyway. + if atomic.Load(&c.closed) == 0 { + // Because a channel cannot be reopened, the later observation of the channel + // being not closed implies that it was also not closed at the moment of the + // first observation. We behave as if we observed the channel at that moment + // and report that the receive cannot proceed. + return + } + // The channel is irreversibly closed. Re-check whether the channel has any pending data + // to receive, which could have arrived between the empty and closed checks above. + // Sequential consistency is also required here, when racing with such a send. + if empty(c) { + // The channel is irreversibly closed and empty. + if ep != nil { + typedmemclr(c.elemtype, ep) + } + return true, false + } } var t0 int64 @@ -543,13 +582,14 @@ func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) mysg.c = c gp.param = nil c.recvq.enqueue(mysg) - goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3) + gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // someone woke us up if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil + gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } @@ -616,6 +656,14 @@ func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { goready(gp, skip+1) } +func chanparkcommit(gp *g, chanLock unsafe.Pointer) bool { + // There are unlocked sudogs that point into gp's stack. Stack + // copying must lock the channels of those sudogs. + gp.activeStackChans = true + unlock((*mutex)(chanLock)) + return true +} + // compiler implements // // select { |