从源码解析Go exec timeout 实现机制

科技资讯 投稿 7100 0 评论

从源码解析Go exec timeout 实现机制

1. 背景

测试使用golang exec 执行命令,并配置过期时间,测试脚本如下。
现象:执行脚本后,到超时时间后并为超时退出,反而阻塞住了

func TestExecWithTimeout(t *testing.T {
	ctx, cancel := context.WithTimeout(context.Background(, 5*time.Second
	defer cancel(

	start := time.Now(
	cmd := exec.CommandContext(ctx, "sh", "-c", "echo start && sleep 10 && echo end."
	out, err := cmd.CombinedOutput(
	fmt.Printf("error: [%v]\n", err
	fmt.Printf("out: [%v]\n", string(out

	if ctx.Err( == context.DeadlineExceeded {
		fmt.Printf("ctx.Err: [%v]\n", ctx.Err(
	}

	fmt.Printf("process end: %v", time.Since(start
}
error: [signal: killed]
out: [start
]
ctx.Err: [context deadline exceeded]
process end: 10.010193583sPASS

在用ps查看进程状态,发现,超时时间到达后,sh进程就被kill了,而由ssh启动的sleep 10命令还在进行中,且该进程被1号进程接管,变成了僵尸进程。当我们手动执行kill命令后,程序退出。

~/workpro//mkskit ❯ ps -ef | grep "sleep"
  501 85600 85597   0  3:34下午 ??         0:00.00 sh -c echo start && sleep 20 && echo end.
  501 85601 85600   0  3:34下午 ??         0:00.00 sleep 20
  501 85608 17687   0  3:34下午 ttys004    0:00.00 grep --color=auto --exclude-dir=.bzr --exclude-dir=CVS --exclude-dir=.git --exclude-dir=.hg --exclude-dir=.svn --exclude-dir=.idea --exclude-dir=.tox sleep
~/workpro//mkskit ❯ ps -ef | grep "sleep"
  501 85601     1   0  3:34下午 ??         0:00.00 sleep 20
  501 85652 17687   0  3:35下午 ttys004    0:00.00 grep --color=auto --exclude-dir=.bzr --exclude-dir=CVS --exclude-dir=.git --exclude-dir=.hg --exclude-dir=.svn --exclude-dir=.idea --exclude-dir=.tox sleep

2. 原因分析

从现象推测,exec 创建了两个进程,超时后 sh进程退出,但是sleep进程还在变成了僵尸进程,sleep进程还未退出,导致整改go主进程阻塞。

2.1 源码-分析

// CombinedOutput runs the command and returns its combined standard
// output and standard error.
func (c *Cmd CombinedOutput( ([]byte, error {
	if c.Stdout != nil {
		return nil, errors.New("exec: Stdout already set"
	}
	if c.Stderr != nil {
		return nil, errors.New("exec: Stderr already set"
	}
	var b bytes.Buffer
	c.Stdout = &b
	c.Stderr = &b
	err := c.Run(
	return b.Bytes(, err
}

// 执行命令,并等待命令执行完成
func (c *Cmd Run( error {
	if err := c.Start(; err != nil {
		return err
	}
	return c.Wait(
}

CombinedOutput 方法会初始化一个字节缓冲区,并标准输出和标准错误导向该缓冲区。然后启动执行命令。
Run方法中调用Start和Wait方法:

    Start方法用于启动子进程,启动后立即返回
  • Wait方法则阻塞,等待子进程结束并回收资源

2.1.1 Wait 方法

// Wait waits for the command to exit and waits for any copying to
// stdin or copying from stdout or stderr to complete.
//
// The command must have been started by Start.
//
// The returned error is nil if the command runs, has no problems
// copying stdin, stdout, and stderr, and exits with a zero exit
// status.
//
// If the command fails to run or doesn't complete successfully, the
// error is of type *ExitError. Other error types may be
// returned for I/O problems.
//
// If any of c.Stdin, c.Stdout or c.Stderr are not an *os.File, Wait also waits
// for the respective I/O loop copying to or from the process to complete.
//
// Wait releases any resources associated with the Cmd.
func (c *Cmd Wait( error {
	if c.Process == nil {
		return errors.New("exec: not started"
	}
	if c.ProcessState != nil {
		return errors.New("exec: Wait was already called"
	}
	state, err := c.Process.Wait(
	if err == nil && !state.Success( {
		err = &ExitError{ProcessState: state}
	}
	c.ProcessState = state

	// Wait for the pipe-copying goroutines to complete.
	var copyError error
	for range c.goroutine {
		if err := <-c.goroutineErrs; err != nil && copyError == nil {
			copyError = err
		}
	}
	c.goroutine = nil // Allow the goroutines' closures to be GC'd.

	if c.ctxErr != nil {
		interruptErr := <-c.ctxErr
		// If c.Process.Wait returned an error, prefer that.
		// Otherwise, report any error from the interrupt goroutine.
		if interruptErr != nil && err == nil {
			err = interruptErr
		}
	}
	// Report errors from the copying goroutines only if the program otherwise
	// exited normally on its own. Otherwise, the copying error may be due to the
	// abnormal termination.
	if err == nil {
		err = copyError
	}

	c.closeDescriptors(c.closeAfterWait
	c.closeAfterWait = nil

	return err
}

根据debug可知,阻塞发生在第34行, err := <-c.goroutineErrs;这句,从goroutineErrs中读取错误信息并返回第一次错误给调用者。而 <-ch从通道中获取数据阻塞的原因只有发送发未准备好,那么goroutineErr对应的发送方是谁呢?

2.1.2 Start 方法

// Start starts the specified command but does not wait for it to complete.
//
// If Start returns successfully, the c.Process field will be set.
//
// After a successful call to Start the Wait method must be called in
// order to release associated system resources.
func (c *Cmd Start( error {
	if c.Path == "" && c.Err == nil && c.lookPathErr == nil {
		c.Err = errors.New("exec: no command"
	}
	if c.Err != nil || c.lookPathErr != nil {
		c.closeDescriptors(c.closeAfterStart
		c.closeDescriptors(c.closeAfterWait
		if c.lookPathErr != nil {
			return c.lookPathErr
		}
		return c.Err
	}
	if runtime.GOOS == "windows" {
		lp, err := lookExtensions(c.Path, c.Dir
		if err != nil {
			c.closeDescriptors(c.closeAfterStart
			c.closeDescriptors(c.closeAfterWait
			return err
		}
		c.Path = lp
	}
	if c.Process != nil {
		return errors.New("exec: already started"
	}
	if c.ctx != nil {
		select {
		case <-c.ctx.Done(:
			c.closeDescriptors(c.closeAfterStart
			c.closeDescriptors(c.closeAfterWait
			return c.ctx.Err(
		default:
		}
	}

	c.childFiles = make([]*os.File, 0, 3+len(c.ExtraFiles
	type F func(*Cmd (*os.File, error
	for _, setupFd := range []F{(*Cmd.stdin, (*Cmd.stdout, (*Cmd.stderr} {
		fd, err := setupFd(c
		if err != nil {
			c.closeDescriptors(c.closeAfterStart
			c.closeDescriptors(c.closeAfterWait
			return err
		}
		c.childFiles = append(c.childFiles, fd
	}
	c.childFiles = append(c.childFiles, c.ExtraFiles...

	env, err := c.environ(
	if err != nil {
		return err
	}

	c.Process, err = os.StartProcess(c.Path, c.argv(, &os.ProcAttr{
		Dir:   c.Dir,
		Files: c.childFiles,
		Env:   env,
		Sys:   c.SysProcAttr,
	}
	if err != nil {
		c.closeDescriptors(c.closeAfterStart
		c.closeDescriptors(c.closeAfterWait
		return err
	}

	c.closeDescriptors(c.closeAfterStart

	// Don't allocate the goroutineErrs channel unless there are goroutines to fire.
	if len(c.goroutine > 0 {
		errc := make(chan error, len(c.goroutine
		c.goroutineErrs = errc
		for _, fn := range c.goroutine {
			go func(fn func( error {
				errc <- fn(
			}(fn
		}
	}

	c.ctxErr = c.watchCtx(

	return nil
}

从第74-82行可以看到,创建了一个len(c.goroutine的channel,然后启动goroutine执行c.goroutine中的方法并将错误写入errc,由wait的现象可知,应该是这个fn(调用阻塞了。
继续追踪,c.goroutine是在哪里被赋值的。同样是在Start方法中
主要看第43行,学习到了,原来函数内还能定义类型。这里主要是调用了三个函数来初始化stdin, stderr, stdout。

func (c *Cmd stdout( (f *os.File, err error {
	return c.writerDescriptor(c.Stdout
}

func (c *Cmd stderr( (f *os.File, err error {
    // 如果stdrr 和 stdout 是同一个输出目标 则跳过
	if c.Stderr != nil && interfaceEqual(c.Stderr, c.Stdout {
		return c.childFiles[1], nil
	}
	return c.writerDescriptor(c.Stderr
}

func (c *Cmd writerDescriptor(w io.Writer (f *os.File, err error {
    // case1
	if w == nil {
		f, err = os.OpenFile(os.DevNull, os.O_WRONLY, 0
		if err != nil {
			return
		}
		c.closeAfterStart = append(c.closeAfterStart, f
		return
	}

    // case2
	if f, ok := w.(*os.File; ok {
		return f, nil
	}

    // case3
	pr, pw, err := os.Pipe(
	if err != nil {
		return
	}

	c.closeAfterStart = append(c.closeAfterStart, pw
	c.closeAfterWait = append(c.closeAfterWait, pr
	c.goroutine = append(c.goroutine, func( error {
		_, err := io.Copy(w, pr
		pr.Close( // in case io.Copy stopped due to write error
		return err
	}
	return pw, nil
}

两个函数都调用了writerDescriptor,看代码主要由三个分支逻辑

    case1:如果没有制定stderr或者stdout,就直接写入os.DevNull 即 /dev/null
  • case2:如果制定的stderr或stdout 是*os.File类型也直接返回,会把输出写入文件
  • case3:最后一种情况,创建管道,返回写端点,在Start函数中的第59行,子进程和管道fd进行绑定。关键在第38行,goroutine中绑定了一个func(,从管道的读端点读取数据并copy到指定的stderr或者stdout(均实现Writer。
// Implementations must not retain p.
type Writer interface {
	Write(p []byte (n int, err error
}
io.Copy方法会一直阻塞到reader被关闭才会返回,这也是为什么会产生阻塞到原因了。在Wait方法到注释中可以得知

// If any of c.Stdin, c.Stdout or c.Stderr are not an *os.File, Wait also waits
// for the respective I/O loop copying to or from the process to complete.
//
// Wait releases any resources associated with the Cmd.
func (c *Cmd Wait( error {}

到这里了解到了阻塞到根本原因,那超时子进程被kill是在哪里触发的。在看Start方法时,可以看到最后一行有一个watchCtx调用

// watchCtx conditionally starts a goroutine that waits until either c.ctx is
// done or c.Process.Wait has completed (called from Wait.
// If c.ctx is done first, the goroutine terminates c.Process.
//
// If a goroutine was started, watchCtx returns a channel on which its result
// must be received.
func (c *Cmd watchCtx( <-chan error {
	if c.ctx == nil {
		return nil
	}

	errc := make(chan error
	go func( {
		select {
		case errc <- nil:
			return
		case <-c.ctx.Done(:
		}

		var err error
		if killErr := c.Process.Kill(; killErr == nil {
			// We appear to have successfully delivered a kill signal, so any
			// program behavior from this point may be due to ctx.
			err = c.ctx.Err(
		} else if !errors.Is(killErr, os.ErrProcessDone {
			err = wrappedError{
				prefix: "exec: error sending signal to Cmd",
				err:    killErr,
			}
		}
		errc <- err
	}(

	return errc
}

可以看到,启动了一个goroutine,绑定了一个 error chan,selec监听ctx的状态,当ctx超时或者提前cancel了,则会出发process.kill,将子进程关闭。
正常情况下,当ctx超时,子进程会被kill,此时管道的写入端点自然会被关闭,io.Copy则会在copy完成后正常返回,给e.errch中发送一个nil,Wait方法则中c.errch中读取到nil。在Wait方法中的第26、40、44行可以看到,err最终被赋值为一个state第结构体对象。
但是在开始的demo中,除了sh这个子进程之外还启动了一个sleep子进程,context超时后,sleep进程依旧在运行,并持有管道的写端点,导致io.Copy阻塞。

2.2 相关流程

3. 解决方案

3.1 使用*os.File类型接收输入输出

可避免阻塞,但存在的问题:

    需要额外处理输出,如: 从文件读取并写入到需要的地方
  • 程序退出后,子子进程被1号进程托管称为僵尸进程
func TestExecUseFileOutput(t *testing.T {
	ctx, cancel := context.WithTimeout(context.Background(, 8*time.Second
	defer cancel(

	cmd := exec.CommandContext(ctx, "sh", "-c", "echo start && sleep 15 && echo end."
	combinedOutput, err := ioutil.TempFile("", "stdouterr"
	if err != nil {
		fmt.Println(err
		return
	}

	defer func( {
		_ = os.Remove(combinedOutput.Name(
	}(
	cmd.Stdout = combinedOutput
	cmd.Stderr = combinedOutput
	err = cmd.Run(
	if err != nil {
		fmt.Println(err
		return
	}

	_, err = combinedOutput.Seek(0, 0
	var b bytes.Buffer
	_, err = io.Copy(&b, combinedOutput
	if err != nil {
		fmt.Println(err
		return
	}

	fmt.Println("output:", b.String(
	fmt.Printf("ctx.Err: [%v]\n", ctx.Err(
	fmt.Printf("error: [%v]\n", err
}

3.2 避免产生子进程

linux执行脚本的5种方式

    使用绝对路径执行:/root/sleep.sh
  • 使用相对路径:./sleep.sh 需要x权限
  • 使用sh或者bash命令来执行:bash /root/sleep.sh
  • 使用.(空格脚本名称来执行: . ./sleep.sh
  • 使用source来执行(一般用来生效配置文件): source /root/sleep.sh

因为go中没有shell环境,如果执行复杂的命令,肯定只是用bash 或者 sh方式运行,肯定会产生一个新的进程,so这个方法无效

3.3 sh -c

原理:
单条命令时:启动bash进程后发现是一个简单的命令,在不fork新进程的情况下直接调用exec执行命令,然后将子shell替换为sleep命令。
多条命令时:需要使用子shell 来处理 && 操作符,它需要等待第一个命令终止的SIGCHLD,然后根据第一个命令的exit status 决定是否需要运行第二个命令,因此不能将子shell替换为sleep命令。

3.4 kill -pid kill进程组

linux kill(2 指定 pid 为负数时会给这个进程组中的所有进程发送信号

If pid is less than -1, then sig is sent to every process in the        
process group whose ID is -pid.
func TestExecuteInOnPgid(t *testing.T {
	ctx, cancel := context.WithTimeout(context.Background(, 8*time.Second
	defer cancel(

	cmd := exec.CommandContext(ctx, "sh", "-c", "echo start && sleep 15 && echo end."
	cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
	go func( {
		select {
		case <-ctx.Done(:
			if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL; err != nil {
				fmt.Println("kill failed: ", err
			}
		}
	}(

	output, err := cmd.CombinedOutput(
	if err != nil {
		fmt.Println(err
		return
	}

	fmt.Println("output:", string(output
	fmt.Printf("ctx.Err : [%v]\n", ctx.Err(
	fmt.Printf("error   : [%v]\n", err
}

3.5 社区提案

该问题其实很早就存在了,最早可以追溯到这个 2017 年的 Issue #23019,不过为了保持向后兼容,在方案上一直没有达成共识,最新提案见这个 Issue #50436,根据 #53400 中的最新消息,该提案可能会在 Go 1.20 中实现。
大致方案为在 exec.Cmd 中添加一个 Interrupt(os.Signal 字段,在 context 超时后将这个信号发送给子进程以关闭所有子进程。

	// Context is the context that controls the lifetime of the command
	// (typically the one passed to CommandContext.
	Context context.Context

	// If Interrupt is non-nil, Context must also be non-nil and Interrupt will be
	// sent to the child process when Context is done.
	//
	// If the command exits with a success code after the Interrupt signal has
	// been sent, Wait and similar methods will return Context.Err(
	// instead of nil.
	//
	// If the Interrupt signal is not supported on the current platform
	// (for example, if it is os.Interrupt on Windows, Start may fail
	// (and return a non-nil error.
	Interrupt os.Signal

	// If WaitDelay is non-zero, the command's I/O pipes will be closed after
	// WaitDelay has elapsed after either the command's process has exited or
	// (if Context is non-nil Context is done, whichever occurs first.
	// If the command's process is still running after WaitDelay has elapsed,
	// it will be terminated with os.Kill before the pipes are closed.
	//
	// If the command exits with a success code after pipes are closed due to
	// WaitDelay and no Interrupt signal has been sent, Wait and similar methods
	// will return ErrWaitDelay instead of nil.
	//
	// If WaitDelay is zero (the default, I/O pipes will be read until EOF,
	// which might not occur until orphaned subprocesses of the command have
	// also closed their descriptors for the pipes.
	WaitDelay time.Duration

4. 总结

现象

使用 os/exec 执行 shell 脚本并设置超时时间,然后到超时时间之后程序并未超时退出,反而一直阻塞。

原因

如果存在子子进程,占有管道则会导致 kill 掉子进程后管道依旧未能释放,读取输出的 goroutine 被阻塞,最终导致程序超时后也无法返回。

解决

package ssh

import "syscall"

// 实现Cancel方法
func (e *SomeExecuteCmdForward Cancel( {
	if pgid := -e.pid; pgid < 0 {
		_ = syscall.Kill(pgid, syscall.SIGKILL
		e.pid = 0
	}
}

// cmd.SysProcAttr = newSysProcAttr(
func newSysProcAttr( *syscall.SysProcAttr {
	return &syscall.SysProcAttr{
		Setpgid: true,
	}
}

kill -- -pid

5. 思考

c.closeDescriptors(c.closeAfterStart,管道还可以继续读写?
在close之前已经将文件句柄传递给子进程了,相当于此时fd的引用计数为2,start调用的是关闭主进程引用的文件句柄,相当于引用计数-1。直到管道写入端close管道后,管道读取端就可以收到管道被关闭的信号,结束读取。
追踪closeDescriptors底层调用函数,可以看到也是进行引用计数-1,直到为0就关闭。

// Close closes the FD. The underlying file descriptor is closed by the
// destroy method when there are no remaining references.
func (fd *FD Close( error {
	if !fd.fdmu.increfAndClose( {
		return errClosing(fd.isFile
	}

	// Unblock any I/O.  Once it all unblocks and returns,
	// so that it cannot be referring to fd.sysfd anymore,
	// the final decref will close fd.sysfd. This should happen
	// fairly quickly, since all the I/O is non-blocking, and any
	// attempts to block in the pollDesc will return errClosing(fd.isFile.
	fd.pd.evict(

    // 关键代码
	// The call to decref will call destroy if there are no other
	// references.
	err := fd.decref(

	// Wait until the descriptor is closed. If this was the only
	// reference, it is already closed. Only wait if the file has
	// not been set to blocking mode, as otherwise any current I/O
	// may be blocking, and that would block the Close.
	// No need for an atomic read of isBlocking, increfAndClose means
	// we have exclusive access to fd.
	if fd.isBlocking == 0 {
		runtime_Semacquire(&fd.csema
	}

	return err
}

// decref removes a reference from fd.
// It also closes fd when the state of fd is set to closed and there
// is no remaining reference.
func (fd *FD decref( error {
	if fd.fdmu.decref( {
		return fd.destroy(
	}
	return nil
}


https://stackoverflow.com/questions/70175281/what-is-the-purpose-of-closeafterstart-in-exec

6. exec案例

6.1 实时读取标准输出

func TestReadStdoutRealTime(t *testing.T {
	cmd := exec.Command("ping", "-c", "5", "192.168.0.1"
	stdout, _ := cmd.StdoutPipe(
	cmd.Start(

	collect := func(output io.Reader error {
		scanner := bufio.NewScanner(output
		scanner.Split(bufio.SplitFunc(bufio.ScanLines
		for scanner.Scan( {
			if scanner.Err( != nil {
				fmt.Println(scanner.Err(
				return scanner.Err(
			}
			line := scanner.Text(
			fmt.Println(line
		}

		return nil
	}

	if err := collect(stdout; err != nil {
		fmt.Println("collect stdout failed", err
	}

	if err := cmd.Wait(; err != nil {
		fmt.Println("wait exec failed", err.Error(
	}
}

6.2 实时读取标准输出(错误用法

func collector(ctx context.Context, output io.Reader error {
	scanner := bufio.NewReader(output
	for {
		// 当异步收集日志比较慢,但命令执行很快就退出后,会将pipe关闭,导致管道无法读取,
        // 报错 read |0: file already closed
		// time.Sleep(2 * time.Second
		select {
		case <-ctx.Done(:
			if ctx.Err( != nil {
				return ctx.Err(
			}

			return errors.New("process existed"
		default:
			readStr, readErr := scanner.ReadString('\n'
			if readErr != nil {
				if readErr != io.EOF {
					return readErr
				}

				return nil
			}
			fmt.Println(readStr
		}
	}
}

func TestReadStdoutWrongExample(t *testing.T {
	ctx, cancel := context.WithTimeout(context.Background(, 20*time.Second
	defer cancel(
	cmd := exec.CommandContext(ctx, "ping", "-c", "5", "192.168.0.1"

	stdout, _ := cmd.StdoutPipe(
	cmd.Start(

	go func( {
		err := collector(ctx, stdout
		if err != nil {
			fmt.Println(err
		}
	}(

	if err := cmd.Wait(; err != nil {
		fmt.Println("wait exec failed", err
	}
}

7. 参考

Go exec 包执行命令超时失效问题分析及解决方案

编程笔记 » 从源码解析Go exec timeout 实现机制

赞同 (35) or 分享 (0)
游客 发表我的评论   换个身份
取消评论

表情
(0)个小伙伴在吐槽