You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
	
	
		
			176 lines
		
	
	
		
			4.1 KiB
		
	
	
	
		
			Go
		
	
		
		
			
		
	
	
			176 lines
		
	
	
		
			4.1 KiB
		
	
	
	
		
			Go
		
	
| 
											2 years ago
										 | // Copyright 2014 The Go Authors. All rights reserved.
 | ||
|  | // Use of this source code is governed by a BSD-style
 | ||
|  | // license that can be found in the LICENSE file.
 | ||
|  | 
 | ||
|  | package http2 | ||
|  | 
 | ||
|  | import ( | ||
|  | 	"errors" | ||
|  | 	"io" | ||
|  | 	"sync" | ||
|  | ) | ||
|  | 
 | ||
|  | // pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
 | ||
|  | // io.Pipe except there are no PipeReader/PipeWriter halves, and the
 | ||
|  | // underlying buffer is an interface. (io.Pipe is always unbuffered)
 | ||
|  | type pipe struct { | ||
|  | 	mu       sync.Mutex | ||
|  | 	c        sync.Cond     // c.L lazily initialized to &p.mu
 | ||
|  | 	b        pipeBuffer    // nil when done reading
 | ||
|  | 	unread   int           // bytes unread when done
 | ||
|  | 	err      error         // read error once empty. non-nil means closed.
 | ||
|  | 	breakErr error         // immediate read error (caller doesn't see rest of b)
 | ||
|  | 	donec    chan struct{} // closed on error
 | ||
|  | 	readFn   func()        // optional code to run in Read before error
 | ||
|  | } | ||
|  | 
 | ||
|  | type pipeBuffer interface { | ||
|  | 	Len() int | ||
|  | 	io.Writer | ||
|  | 	io.Reader | ||
|  | } | ||
|  | 
 | ||
|  | // setBuffer initializes the pipe buffer.
 | ||
|  | // It has no effect if the pipe is already closed.
 | ||
|  | func (p *pipe) setBuffer(b pipeBuffer) { | ||
|  | 	p.mu.Lock() | ||
|  | 	defer p.mu.Unlock() | ||
|  | 	if p.err != nil || p.breakErr != nil { | ||
|  | 		return | ||
|  | 	} | ||
|  | 	p.b = b | ||
|  | } | ||
|  | 
 | ||
|  | func (p *pipe) Len() int { | ||
|  | 	p.mu.Lock() | ||
|  | 	defer p.mu.Unlock() | ||
|  | 	if p.b == nil { | ||
|  | 		return p.unread | ||
|  | 	} | ||
|  | 	return p.b.Len() | ||
|  | } | ||
|  | 
 | ||
|  | // Read waits until data is available and copies bytes
 | ||
|  | // from the buffer into p.
 | ||
|  | func (p *pipe) Read(d []byte) (n int, err error) { | ||
|  | 	p.mu.Lock() | ||
|  | 	defer p.mu.Unlock() | ||
|  | 	if p.c.L == nil { | ||
|  | 		p.c.L = &p.mu | ||
|  | 	} | ||
|  | 	for { | ||
|  | 		if p.breakErr != nil { | ||
|  | 			return 0, p.breakErr | ||
|  | 		} | ||
|  | 		if p.b != nil && p.b.Len() > 0 { | ||
|  | 			return p.b.Read(d) | ||
|  | 		} | ||
|  | 		if p.err != nil { | ||
|  | 			if p.readFn != nil { | ||
|  | 				p.readFn()     // e.g. copy trailers
 | ||
|  | 				p.readFn = nil // not sticky like p.err
 | ||
|  | 			} | ||
|  | 			p.b = nil | ||
|  | 			return 0, p.err | ||
|  | 		} | ||
|  | 		p.c.Wait() | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | var errClosedPipeWrite = errors.New("write on closed buffer") | ||
|  | 
 | ||
|  | // Write copies bytes from p into the buffer and wakes a reader.
 | ||
|  | // It is an error to write more data than the buffer can hold.
 | ||
|  | func (p *pipe) Write(d []byte) (n int, err error) { | ||
|  | 	p.mu.Lock() | ||
|  | 	defer p.mu.Unlock() | ||
|  | 	if p.c.L == nil { | ||
|  | 		p.c.L = &p.mu | ||
|  | 	} | ||
|  | 	defer p.c.Signal() | ||
|  | 	if p.err != nil || p.breakErr != nil { | ||
|  | 		return 0, errClosedPipeWrite | ||
|  | 	} | ||
|  | 	return p.b.Write(d) | ||
|  | } | ||
|  | 
 | ||
|  | // CloseWithError causes the next Read (waking up a current blocked
 | ||
|  | // Read if needed) to return the provided err after all data has been
 | ||
|  | // read.
 | ||
|  | //
 | ||
|  | // The error must be non-nil.
 | ||
|  | func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) } | ||
|  | 
 | ||
|  | // BreakWithError causes the next Read (waking up a current blocked
 | ||
|  | // Read if needed) to return the provided err immediately, without
 | ||
|  | // waiting for unread data.
 | ||
|  | func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) } | ||
|  | 
 | ||
|  | // closeWithErrorAndCode is like CloseWithError but also sets some code to run
 | ||
|  | // in the caller's goroutine before returning the error.
 | ||
|  | func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) } | ||
|  | 
 | ||
|  | func (p *pipe) closeWithError(dst *error, err error, fn func()) { | ||
|  | 	if err == nil { | ||
|  | 		panic("err must be non-nil") | ||
|  | 	} | ||
|  | 	p.mu.Lock() | ||
|  | 	defer p.mu.Unlock() | ||
|  | 	if p.c.L == nil { | ||
|  | 		p.c.L = &p.mu | ||
|  | 	} | ||
|  | 	defer p.c.Signal() | ||
|  | 	if *dst != nil { | ||
|  | 		// Already been done.
 | ||
|  | 		return | ||
|  | 	} | ||
|  | 	p.readFn = fn | ||
|  | 	if dst == &p.breakErr { | ||
|  | 		if p.b != nil { | ||
|  | 			p.unread += p.b.Len() | ||
|  | 		} | ||
|  | 		p.b = nil | ||
|  | 	} | ||
|  | 	*dst = err | ||
|  | 	p.closeDoneLocked() | ||
|  | } | ||
|  | 
 | ||
|  | // requires p.mu be held.
 | ||
|  | func (p *pipe) closeDoneLocked() { | ||
|  | 	if p.donec == nil { | ||
|  | 		return | ||
|  | 	} | ||
|  | 	// Close if unclosed. This isn't racy since we always
 | ||
|  | 	// hold p.mu while closing.
 | ||
|  | 	select { | ||
|  | 	case <-p.donec: | ||
|  | 	default: | ||
|  | 		close(p.donec) | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | // Err returns the error (if any) first set by BreakWithError or CloseWithError.
 | ||
|  | func (p *pipe) Err() error { | ||
|  | 	p.mu.Lock() | ||
|  | 	defer p.mu.Unlock() | ||
|  | 	if p.breakErr != nil { | ||
|  | 		return p.breakErr | ||
|  | 	} | ||
|  | 	return p.err | ||
|  | } | ||
|  | 
 | ||
|  | // Done returns a channel which is closed if and when this pipe is closed
 | ||
|  | // with CloseWithError.
 | ||
|  | func (p *pipe) Done() <-chan struct{} { | ||
|  | 	p.mu.Lock() | ||
|  | 	defer p.mu.Unlock() | ||
|  | 	if p.donec == nil { | ||
|  | 		p.donec = make(chan struct{}) | ||
|  | 		if p.err != nil || p.breakErr != nil { | ||
|  | 			// Already hit an error.
 | ||
|  | 			p.closeDoneLocked() | ||
|  | 		} | ||
|  | 	} | ||
|  | 	return p.donec | ||
|  | } |