使用Future
自带的cancel
方法可以取消进程池中的任务,但是只能取消还没有开始运行的任务,对于正在运行的任务,cancel
方法无效。如果需要取消正在运行的任务,可以使用psutil
模块来终止进程。
import os
import random
import time
from concurrent.futures import FIRST_COMPLETED, ProcessPoolExecutor, wait
def do_task(i: int):
print(f"Running: task-{i}, pid: {os.getpid()}")
sleep = random.randint(3, 4)
time.sleep(sleep)
print(f"Task: {i} sleep {sleep}")
return (f"Task-{i}", i * 2)
def run_pool(n: int):
pool = ProcessPoolExecutor(n)
tasks = [i for i in range(n * 2)]
fs = [pool.submit(do_task, i) for i in tasks]
for future in fs:
print(future)
print("1. --------------------------------")
done, not_done = wait(
fs,
return_when=FIRST_COMPLETED,
)
for task in done:
task_name, task_ret = task.result()
print(task_name, task_ret)
print("2. --------------------------------")
for task in not_done:
status = task.cancel()
print(task, "canceled", status)
print("3. --------------------------------")
print("All tasks finished.")
if __name__ == "__main__":
run_pool(2)
<Future at 0x7f0dac951510 state=running>
<Future at 0x7f0dac952410 state=running>
<Future at 0x7f0dac9528d0 state=pending>
<Future at 0x7f0dac952a50 state=pending>
1. --------------------------------
Running: task-0, pid: 79416
Running: task-1, pid: 79417
Task: 1 sleep 3
Running: task-2, pid: 79417
Task-1 2
2. --------------------------------
<Future at 0x7f0dac951510 state=running> canceled False
<Future at 0x7f0dac9528d0 state=running> canceled False
<Future at 0x7f0dac952a50 state=running> canceled False
3. --------------------------------
All tasks finished.
Task: 0 sleep 4
Running: task-3, pid: 79416
Task: 2 sleep 3
Task: 3 sleep 3
import os
import random
import signal
import time
from concurrent.futures import FIRST_COMPLETED, ProcessPoolExecutor, wait
import psutil
def force_cancel():
parent = psutil.Process(os.getpid())
for child in parent.children(recursive=True):
child.send_signal(signal.SIGTERM)
print(f"Child {child.pid} terminated.")
def do_task(i: int):
print(f"Running: task-{i}, pid: {os.getpid()}")
sleep = random.randint(3, 4)
time.sleep(sleep)
print(f"Task: {i} sleep {sleep}")
return (f"Task-{i}", i * 2)
def run_pool(n: int):
pool = ProcessPoolExecutor(n)
tasks = [i for i in range(n * 2)]
fs = [pool.submit(do_task, i) for i in tasks]
for future in fs:
print(future)
print("1. --------------------------------")
done, not_done = wait(
fs,
return_when=FIRST_COMPLETED,
)
for task in done:
task_name, task_ret = task.result()
print(task_name, task_ret)
print("2. --------------------------------")
for task in not_done:
status = task.cancel()
print(task, "canceled", status)
print("3. --------------------------------")
print("All tasks finished.")
force_cancel()
if __name__ == "__main__":
run_pool(2)
<Future at 0x7f9fa512e550 state=running>
<Future at 0x7f9fa4cbb690 state=pending>
<Future at 0x7f9fa4cbbc10 state=pending>
<Future at 0x7f9fa4cbbd50 state=pending>
1. --------------------------------
Running: task-0, pid: 76807
Running: task-1, pid: 76808
Task: 0 sleep 4
Task: 1 sleep 4
Running: task-2, pid: 76807
Running: task-3, pid: 76808
Task-1 2
Task-0 0
2. --------------------------------
<Future at 0x7f9fa4cbbc10 state=running> canceled False
<Future at 0x7f9fa4cbbd50 state=running> canceled False
3. --------------------------------
All tasks finished.
Child 76807 terminated.
Child 76808 terminated.