取消进程池中正在运行的进程

关注微信公众号塔容万物

使用Future自带的cancel方法可以取消进程池中的任务,但是只能取消还没有开始运行的任务,对于正在运行的任务,cancel方法无效。如果需要取消正在运行的任务,可以使用psutil模块来终止进程。

import os
import random
import time
from concurrent.futures import FIRST_COMPLETED, ProcessPoolExecutor, wait


def do_task(i: int):
    print(f"Running: task-{i}, pid: {os.getpid()}")
    sleep = random.randint(3, 4)
    time.sleep(sleep)
    print(f"Task: {i} sleep {sleep}")
    return (f"Task-{i}", i * 2)


def run_pool(n: int):
    pool = ProcessPoolExecutor(n)
    tasks = [i for i in range(n * 2)]
    fs = [pool.submit(do_task, i) for i in tasks]
    for future in fs:
        print(future)
    print("1. --------------------------------")
    done, not_done = wait(
        fs,
        return_when=FIRST_COMPLETED,
    )
    for task in done:
        task_name, task_ret = task.result()
        print(task_name, task_ret)
    print("2. --------------------------------")
    for task in not_done:
        status = task.cancel()
        print(task, "canceled", status)
    print("3. --------------------------------")
    print("All tasks finished.")


if __name__ == "__main__":
    run_pool(2)
<Future at 0x7f0dac951510 state=running>
<Future at 0x7f0dac952410 state=running>
<Future at 0x7f0dac9528d0 state=pending>
<Future at 0x7f0dac952a50 state=pending>
1. --------------------------------
Running: task-0, pid: 79416
Running: task-1, pid: 79417
Task: 1 sleep 3
Running: task-2, pid: 79417
Task-1 2
2. --------------------------------
<Future at 0x7f0dac951510 state=running> canceled False
<Future at 0x7f0dac9528d0 state=running> canceled False
<Future at 0x7f0dac952a50 state=running> canceled False
3. --------------------------------
All tasks finished.
Task: 0 sleep 4
Running: task-3, pid: 79416
Task: 2 sleep 3
Task: 3 sleep 3
import os
import random
import signal
import time
from concurrent.futures import FIRST_COMPLETED, ProcessPoolExecutor, wait

import psutil


def force_cancel():
    parent = psutil.Process(os.getpid())
    for child in parent.children(recursive=True):
        child.send_signal(signal.SIGTERM)
        print(f"Child {child.pid} terminated.")


def do_task(i: int):
    print(f"Running: task-{i}, pid: {os.getpid()}")
    sleep = random.randint(3, 4)
    time.sleep(sleep)
    print(f"Task: {i} sleep {sleep}")
    return (f"Task-{i}", i * 2)


def run_pool(n: int):
    pool = ProcessPoolExecutor(n)
    tasks = [i for i in range(n * 2)]
    fs = [pool.submit(do_task, i) for i in tasks]
    for future in fs:
        print(future)
    print("1. --------------------------------")
    done, not_done = wait(
        fs,
        return_when=FIRST_COMPLETED,
    )
    for task in done:
        task_name, task_ret = task.result()
        print(task_name, task_ret)
    print("2. --------------------------------")
    for task in not_done:
        status = task.cancel()
        print(task, "canceled", status)
    print("3. --------------------------------")
    print("All tasks finished.")
    force_cancel()


if __name__ == "__main__":
    run_pool(2)
<Future at 0x7f9fa512e550 state=running>
<Future at 0x7f9fa4cbb690 state=pending>
<Future at 0x7f9fa4cbbc10 state=pending>
<Future at 0x7f9fa4cbbd50 state=pending>
1. --------------------------------
Running: task-0, pid: 76807
Running: task-1, pid: 76808
Task: 0 sleep 4
Task: 1 sleep 4
Running: task-2, pid: 76807
Running: task-3, pid: 76808
Task-1 2
Task-0 0
2. --------------------------------
<Future at 0x7f9fa4cbbc10 state=running> canceled False
<Future at 0x7f9fa4cbbd50 state=running> canceled False
3. --------------------------------
All tasks finished.
Child 76807 terminated.
Child 76808 terminated.