From 7d1d3c3c8e75ff23cc476c169870ceaa2ea19cfc Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 28 Jan 2026 08:09:12 +0100 Subject: [PATCH 01/10] Only terminate tasks on queue when cancel_futures is set to true --- src/executorlib/task_scheduler/base.py | 2 +- src/executorlib/task_scheduler/file/shared.py | 27 ++++++++++--------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/src/executorlib/task_scheduler/base.py b/src/executorlib/task_scheduler/base.py index 8940fd7a..c43c2fb5 100644 --- a/src/executorlib/task_scheduler/base.py +++ b/src/executorlib/task_scheduler/base.py @@ -198,7 +198,7 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): if cancel_futures and self._future_queue is not None: cancel_items_in_queue(que=self._future_queue) if self._process is not None and self._future_queue is not None: - self._future_queue.put({"shutdown": True, "wait": wait}) + self._future_queue.put({"shutdown": True, "wait": wait, "cancel_futures": cancel_futures}) if wait and isinstance(self._process, Thread): self._process.join() self._future_queue.join() diff --git a/src/executorlib/task_scheduler/file/shared.py b/src/executorlib/task_scheduler/file/shared.py index 177e28cd..b612eb9d 100644 --- a/src/executorlib/task_scheduler/file/shared.py +++ b/src/executorlib/task_scheduler/file/shared.py @@ -97,19 +97,20 @@ def execute_tasks_h5( for key, value in memory_dict.items() if not value.done() } - if ( - terminate_function is not None - and terminate_function == terminate_subprocess - ): - for task in process_dict.values(): - terminate_function(task=task) - elif terminate_function is not None: - for queue_id in process_dict.values(): - terminate_function( - queue_id=queue_id, - config_directory=pysqa_config_directory, - backend=backend, - ) + if task_dict["cancel_futures"]: + if ( + terminate_function is not None + and terminate_function == terminate_subprocess + ): + for task in process_dict.values(): + terminate_function(task=task) + elif terminate_function is not None: + for queue_id in process_dict.values(): + terminate_function( + queue_id=queue_id, + config_directory=pysqa_config_directory, + backend=backend, + ) future_queue.task_done() future_queue.join() break From 4a191d049448e7a62d61d8a461c411b550e3ef23 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 28 Jan 2026 08:47:40 +0100 Subject: [PATCH 02/10] Add cancel_futures_on_shutdown to FileTaskScheduler --- src/executorlib/task_scheduler/file/shared.py | 5 ++++- src/executorlib/task_scheduler/file/task_scheduler.py | 6 ++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/executorlib/task_scheduler/file/shared.py b/src/executorlib/task_scheduler/file/shared.py index b612eb9d..74d30138 100644 --- a/src/executorlib/task_scheduler/file/shared.py +++ b/src/executorlib/task_scheduler/file/shared.py @@ -57,6 +57,7 @@ def execute_tasks_h5( backend: Optional[str] = None, disable_dependencies: bool = False, pmi_mode: Optional[str] = None, + cancel_futures_on_shutdown: bool = True, ) -> None: """ Execute tasks stored in a queue using HDF5 files. @@ -72,6 +73,8 @@ def execute_tasks_h5( backend (str, optional): name of the backend used to spawn tasks. disable_dependencies (boolean): Disable resolving future objects during the submission. pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on + shutdown. Returns: None @@ -97,7 +100,7 @@ def execute_tasks_h5( for key, value in memory_dict.items() if not value.done() } - if task_dict["cancel_futures"]: + if task_dict["cancel_futures"] and cancel_futures_on_shutdown: if ( terminate_function is not None and terminate_function == terminate_subprocess diff --git a/src/executorlib/task_scheduler/file/task_scheduler.py b/src/executorlib/task_scheduler/file/task_scheduler.py index d12480d1..1ca5d8d9 100644 --- a/src/executorlib/task_scheduler/file/task_scheduler.py +++ b/src/executorlib/task_scheduler/file/task_scheduler.py @@ -35,6 +35,7 @@ def __init__( backend: Optional[str] = None, disable_dependencies: bool = False, pmi_mode: Optional[str] = None, + cancel_futures_on_shutdown: bool = True, ): """ Initialize the FileExecutor. @@ -50,6 +51,8 @@ def __init__( backend (str, optional): name of the backend used to spawn tasks. disable_dependencies (boolean): Disable resolving future objects during the submission. pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes + on shutdown. """ super().__init__(max_cores=None) default_resource_dict = { @@ -73,6 +76,7 @@ def __init__( "backend": backend, "disable_dependencies": disable_dependencies, "pmi_mode": pmi_mode, + "cancel_futures_on_shutdown": cancel_futures_on_shutdown, } self._set_process( Thread( @@ -98,6 +102,7 @@ def create_file_executor( init_function: Optional[Callable] = None, disable_dependencies: bool = False, execute_function: Callable = execute_with_pysqa, + cancel_futures_on_shutdown: bool = True, ): if block_allocation: raise ValueError( @@ -128,4 +133,5 @@ def create_file_executor( execute_function=execute_function, terminate_function=terminate_function, pmi_mode=pmi_mode, + cancel_futures_on_shutdown=cancel_futures_on_shutdown, ) From 48cca33b18450285163a9bce9ed7e0162f48a098 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 28 Jan 2026 08:48:04 +0100 Subject: [PATCH 03/10] Add cancel_futures_on_shutdown in __init__() --- src/executorlib/executor/flux.py | 19 +++++++++++++++++++ src/executorlib/executor/single.py | 18 ++++++++++++++++++ src/executorlib/executor/slurm.py | 19 +++++++++++++++++++ src/executorlib/standalone/inputcheck.py | 12 ++++++++++++ 4 files changed, 68 insertions(+) diff --git a/src/executorlib/executor/flux.py b/src/executorlib/executor/flux.py index f06d6570..bb379164 100644 --- a/src/executorlib/executor/flux.py +++ b/src/executorlib/executor/flux.py @@ -2,6 +2,7 @@ from executorlib.executor.base import BaseExecutor from executorlib.standalone.inputcheck import ( + check_cancel_futures_on_shutdown, check_command_line_argument_lst, check_init_function, check_log_obj_size, @@ -67,6 +68,8 @@ class FluxJobExecutor(BaseExecutor): plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on + shutdown. Examples: ``` @@ -108,6 +111,7 @@ def __init__( plot_dependency_graph_filename: Optional[str] = None, export_workflow_filename: Optional[str] = None, log_obj_size: bool = False, + cancel_futures_on_shutdown: bool = False, ): """ The executorlib.FluxJobExecutor leverages either the message passing interface (MPI), the SLURM workload manager @@ -156,6 +160,8 @@ def __init__( plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes + on shutdown. """ default_resource_dict: dict = { @@ -187,6 +193,7 @@ def __init__( block_allocation=block_allocation, init_function=init_function, log_obj_size=log_obj_size, + cancel_futures_on_shutdown=cancel_futures_on_shutdown, ), max_cores=max_cores, refresh_rate=refresh_rate, @@ -212,6 +219,7 @@ def __init__( block_allocation=block_allocation, init_function=init_function, log_obj_size=log_obj_size, + cancel_futures_on_shutdown=cancel_futures_on_shutdown, ) ) @@ -261,6 +269,8 @@ class FluxClusterExecutor(BaseExecutor): plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes + on shutdown. Examples: ``` @@ -300,6 +310,7 @@ def __init__( plot_dependency_graph_filename: Optional[str] = None, export_workflow_filename: Optional[str] = None, log_obj_size: bool = False, + cancel_futures_on_shutdown: bool = True, ): """ The executorlib.FluxClusterExecutor leverages either the message passing interface (MPI), the SLURM workload @@ -346,6 +357,8 @@ def __init__( plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes + on shutdown. """ default_resource_dict: dict = { @@ -366,6 +379,7 @@ def __init__( import pysqa # noqa if block_allocation: + check_cancel_futures_on_shutdown(cancel_futures_on_shutdown=cancel_futures_on_shutdown) from executorlib.task_scheduler.interactive.spawner_pysqa import ( create_pysqa_block_allocation_scheduler, ) @@ -405,6 +419,7 @@ def __init__( block_allocation=block_allocation, init_function=init_function, disable_dependencies=disable_dependencies, + cancel_futures_on_shutdown=cancel_futures_on_shutdown, ) ) else: @@ -445,6 +460,7 @@ def create_flux_executor( block_allocation: bool = False, init_function: Optional[Callable] = None, log_obj_size: bool = False, + cancel_futures_on_shutdown: bool = False, ) -> Union[OneProcessTaskScheduler, BlockAllocationTaskScheduler]: """ Create a flux executor @@ -483,6 +499,8 @@ def create_flux_executor( of the individual function. init_function (None): optional function to preset arguments for functions which are submitted later log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on + shutdown. Returns: InteractiveStepExecutor/ InteractiveExecutor @@ -504,6 +522,7 @@ def create_flux_executor( check_command_line_argument_lst( command_line_argument_lst=resource_dict.get("slurm_cmd_args", []) ) + check_cancel_futures_on_shutdown(cancel_futures_on_shutdown=cancel_futures_on_shutdown) if "openmpi_oversubscribe" in resource_dict: del resource_dict["openmpi_oversubscribe"] if "slurm_cmd_args" in resource_dict: diff --git a/src/executorlib/executor/single.py b/src/executorlib/executor/single.py index d0140fa5..71cb9581 100644 --- a/src/executorlib/executor/single.py +++ b/src/executorlib/executor/single.py @@ -2,6 +2,7 @@ from executorlib.executor.base import BaseExecutor from executorlib.standalone.inputcheck import ( + check_cancel_futures_on_shutdown, check_command_line_argument_lst, check_gpus_per_worker, check_init_function, @@ -60,6 +61,8 @@ class SingleNodeExecutor(BaseExecutor): plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on + shutdown. Examples: ``` @@ -97,6 +100,7 @@ def __init__( plot_dependency_graph_filename: Optional[str] = None, export_workflow_filename: Optional[str] = None, log_obj_size: bool = False, + cancel_futures_on_shutdown: bool = False, ): """ The executorlib.SingleNodeExecutor leverages either the message passing interface (MPI), the SLURM workload @@ -142,6 +146,8 @@ def __init__( plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes + on shutdown. """ default_resource_dict: dict = { @@ -169,6 +175,7 @@ def __init__( block_allocation=block_allocation, init_function=init_function, log_obj_size=log_obj_size, + cancel_futures_on_shutdown=cancel_futures_on_shutdown, ), max_cores=max_cores, refresh_rate=refresh_rate, @@ -190,6 +197,7 @@ def __init__( block_allocation=block_allocation, init_function=init_function, log_obj_size=log_obj_size, + cancel_futures_on_shutdown=cancel_futures_on_shutdown, ) ) @@ -232,6 +240,8 @@ class TestClusterExecutor(BaseExecutor): plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on + shutdown. Examples: ``` @@ -269,6 +279,7 @@ def __init__( plot_dependency_graph_filename: Optional[str] = None, export_workflow_filename: Optional[str] = None, log_obj_size: bool = False, + cancel_futures_on_shutdown: bool = True, ): """ The executorlib.api.TestClusterExecutor is designed to test the file based communication used in the @@ -307,6 +318,8 @@ def __init__( plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes + on shutdown. """ default_resource_dict: dict = { @@ -346,6 +359,7 @@ def __init__( init_function=init_function, disable_dependencies=disable_dependencies, execute_function=execute_in_subprocess, + cancel_futures_on_shutdown=cancel_futures_on_shutdown, ) ) else: @@ -379,6 +393,7 @@ def create_single_node_executor( block_allocation: bool = False, init_function: Optional[Callable] = None, log_obj_size: bool = False, + cancel_futures_on_shutdown: bool = False, ) -> Union[OneProcessTaskScheduler, BlockAllocationTaskScheduler]: """ Create a single node executor @@ -413,6 +428,8 @@ def create_single_node_executor( of the individual function. init_function (None): optional function to preset arguments for functions which are submitted later log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on + shutdown. Returns: InteractiveStepExecutor/ InteractiveExecutor @@ -429,6 +446,7 @@ def create_single_node_executor( check_command_line_argument_lst( command_line_argument_lst=resource_dict.get("slurm_cmd_args", []) ) + check_cancel_futures_on_shutdown(cancel_futures_on_shutdown=cancel_futures_on_shutdown) if "threads_per_core" in resource_dict: del resource_dict["threads_per_core"] if "gpus_per_core" in resource_dict: diff --git a/src/executorlib/executor/slurm.py b/src/executorlib/executor/slurm.py index 1631c914..eb5848ac 100644 --- a/src/executorlib/executor/slurm.py +++ b/src/executorlib/executor/slurm.py @@ -2,6 +2,7 @@ from executorlib.executor.base import BaseExecutor from executorlib.standalone.inputcheck import ( + check_cancel_futures_on_shutdown, check_init_function, check_log_obj_size, check_plot_dependency_graph, @@ -65,6 +66,8 @@ class SlurmClusterExecutor(BaseExecutor): plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on + shutdown. Examples: ``` @@ -104,6 +107,7 @@ def __init__( plot_dependency_graph_filename: Optional[str] = None, export_workflow_filename: Optional[str] = None, log_obj_size: bool = False, + cancel_futures_on_shutdown: bool = False, ): """ The executorlib.SlurmClusterExecutor leverages either the message passing interface (MPI), the SLURM workload @@ -150,6 +154,8 @@ def __init__( plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes + on shutdown. """ default_resource_dict: dict = { @@ -170,6 +176,7 @@ def __init__( import pysqa # noqa if block_allocation: + check_cancel_futures_on_shutdown(cancel_futures_on_shutdown=cancel_futures_on_shutdown) from executorlib.task_scheduler.interactive.spawner_pysqa import ( create_pysqa_block_allocation_scheduler, ) @@ -210,6 +217,7 @@ def __init__( block_allocation=block_allocation, init_function=init_function, disable_dependencies=disable_dependencies, + cancel_futures_on_shutdown=cancel_futures_on_shutdown, ) ) else: @@ -281,6 +289,8 @@ class SlurmJobExecutor(BaseExecutor): plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on + shutdown. Examples: ``` @@ -319,6 +329,7 @@ def __init__( plot_dependency_graph_filename: Optional[str] = None, export_workflow_filename: Optional[str] = None, log_obj_size: bool = False, + cancel_futures_on_shutdown: bool = False, ): """ The executorlib.SlurmJobExecutor leverages either the message passing interface (MPI), the SLURM workload @@ -368,6 +379,8 @@ def __init__( plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes + on shutdown. """ default_resource_dict: dict = { @@ -396,6 +409,7 @@ def __init__( block_allocation=block_allocation, init_function=init_function, log_obj_size=log_obj_size, + cancel_futures_on_shutdown=cancel_futures_on_shutdown, ), max_cores=max_cores, refresh_rate=refresh_rate, @@ -418,6 +432,7 @@ def __init__( block_allocation=block_allocation, init_function=init_function, log_obj_size=log_obj_size, + cancel_futures_on_shutdown=cancel_futures_on_shutdown ) ) @@ -432,6 +447,7 @@ def create_slurm_executor( block_allocation: bool = False, init_function: Optional[Callable] = None, log_obj_size: bool = False, + cancel_futures_on_shutdown: bool = False, ) -> Union[OneProcessTaskScheduler, BlockAllocationTaskScheduler]: """ Create a SLURM executor @@ -471,6 +487,8 @@ def create_slurm_executor( of the individual function. init_function (None): optional function to preset arguments for functions which are submitted later log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on + shutdown. Returns: InteractiveStepExecutor/ InteractiveExecutor @@ -483,6 +501,7 @@ def create_slurm_executor( resource_dict["log_obj_size"] = log_obj_size resource_dict["pmi_mode"] = pmi_mode check_init_function(block_allocation=block_allocation, init_function=init_function) + check_cancel_futures_on_shutdown(cancel_futures_on_shutdown=cancel_futures_on_shutdown) if block_allocation: resource_dict["init_function"] = init_function max_workers = validate_number_of_cores( diff --git a/src/executorlib/standalone/inputcheck.py b/src/executorlib/standalone/inputcheck.py index 6f6ab763..d08d21c5 100644 --- a/src/executorlib/standalone/inputcheck.py +++ b/src/executorlib/standalone/inputcheck.py @@ -17,6 +17,18 @@ def check_oversubscribe(oversubscribe: bool) -> None: ) +def check_cancel_futures_on_shutdown( + cancel_futures_on_shutdown: bool, +) -> None: + """ + Check if cancel_futures_on_shutdown is True and raise a ValueError if it is. + """ + if cancel_futures_on_shutdown: + raise ValueError( + "The cancel_futures_on_shutdown parameter is only supported for the executorlib.FluxClusterExecutor and executorlib.SlurmClusterExecutor." + ) + + def check_command_line_argument_lst(command_line_argument_lst: list[str]) -> None: """ Check if command_line_argument_lst is not empty and raise a ValueError if it is. From fb7ec7dd5f042a6a23a907e1a07a1a0315657da9 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 28 Jan 2026 07:49:50 +0000 Subject: [PATCH 04/10] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/executorlib/executor/flux.py | 18 +++++++++++------- src/executorlib/executor/single.py | 8 +++++--- src/executorlib/executor/slurm.py | 16 ++++++++++------ src/executorlib/task_scheduler/base.py | 4 +++- src/executorlib/task_scheduler/file/shared.py | 2 +- .../task_scheduler/file/task_scheduler.py | 2 +- 6 files changed, 31 insertions(+), 19 deletions(-) diff --git a/src/executorlib/executor/flux.py b/src/executorlib/executor/flux.py index bb379164..db3e9ca3 100644 --- a/src/executorlib/executor/flux.py +++ b/src/executorlib/executor/flux.py @@ -68,7 +68,7 @@ class FluxJobExecutor(BaseExecutor): plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on shutdown. Examples: @@ -160,7 +160,7 @@ def __init__( plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on shutdown. """ @@ -269,7 +269,7 @@ class FluxClusterExecutor(BaseExecutor): plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on shutdown. Examples: @@ -357,7 +357,7 @@ def __init__( plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on shutdown. """ @@ -379,7 +379,9 @@ def __init__( import pysqa # noqa if block_allocation: - check_cancel_futures_on_shutdown(cancel_futures_on_shutdown=cancel_futures_on_shutdown) + check_cancel_futures_on_shutdown( + cancel_futures_on_shutdown=cancel_futures_on_shutdown + ) from executorlib.task_scheduler.interactive.spawner_pysqa import ( create_pysqa_block_allocation_scheduler, ) @@ -499,7 +501,7 @@ def create_flux_executor( of the individual function. init_function (None): optional function to preset arguments for functions which are submitted later log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on shutdown. Returns: @@ -522,7 +524,9 @@ def create_flux_executor( check_command_line_argument_lst( command_line_argument_lst=resource_dict.get("slurm_cmd_args", []) ) - check_cancel_futures_on_shutdown(cancel_futures_on_shutdown=cancel_futures_on_shutdown) + check_cancel_futures_on_shutdown( + cancel_futures_on_shutdown=cancel_futures_on_shutdown + ) if "openmpi_oversubscribe" in resource_dict: del resource_dict["openmpi_oversubscribe"] if "slurm_cmd_args" in resource_dict: diff --git a/src/executorlib/executor/single.py b/src/executorlib/executor/single.py index 71cb9581..67d08c60 100644 --- a/src/executorlib/executor/single.py +++ b/src/executorlib/executor/single.py @@ -146,7 +146,7 @@ def __init__( plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on shutdown. """ @@ -318,7 +318,7 @@ def __init__( plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on shutdown. """ @@ -446,7 +446,9 @@ def create_single_node_executor( check_command_line_argument_lst( command_line_argument_lst=resource_dict.get("slurm_cmd_args", []) ) - check_cancel_futures_on_shutdown(cancel_futures_on_shutdown=cancel_futures_on_shutdown) + check_cancel_futures_on_shutdown( + cancel_futures_on_shutdown=cancel_futures_on_shutdown + ) if "threads_per_core" in resource_dict: del resource_dict["threads_per_core"] if "gpus_per_core" in resource_dict: diff --git a/src/executorlib/executor/slurm.py b/src/executorlib/executor/slurm.py index eb5848ac..31d98316 100644 --- a/src/executorlib/executor/slurm.py +++ b/src/executorlib/executor/slurm.py @@ -154,7 +154,7 @@ def __init__( plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on shutdown. """ @@ -176,7 +176,9 @@ def __init__( import pysqa # noqa if block_allocation: - check_cancel_futures_on_shutdown(cancel_futures_on_shutdown=cancel_futures_on_shutdown) + check_cancel_futures_on_shutdown( + cancel_futures_on_shutdown=cancel_futures_on_shutdown + ) from executorlib.task_scheduler.interactive.spawner_pysqa import ( create_pysqa_block_allocation_scheduler, ) @@ -289,7 +291,7 @@ class SlurmJobExecutor(BaseExecutor): plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on shutdown. Examples: @@ -379,7 +381,7 @@ def __init__( plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on shutdown. """ @@ -432,7 +434,7 @@ def __init__( block_allocation=block_allocation, init_function=init_function, log_obj_size=log_obj_size, - cancel_futures_on_shutdown=cancel_futures_on_shutdown + cancel_futures_on_shutdown=cancel_futures_on_shutdown, ) ) @@ -501,7 +503,9 @@ def create_slurm_executor( resource_dict["log_obj_size"] = log_obj_size resource_dict["pmi_mode"] = pmi_mode check_init_function(block_allocation=block_allocation, init_function=init_function) - check_cancel_futures_on_shutdown(cancel_futures_on_shutdown=cancel_futures_on_shutdown) + check_cancel_futures_on_shutdown( + cancel_futures_on_shutdown=cancel_futures_on_shutdown + ) if block_allocation: resource_dict["init_function"] = init_function max_workers = validate_number_of_cores( diff --git a/src/executorlib/task_scheduler/base.py b/src/executorlib/task_scheduler/base.py index c43c2fb5..bba2622a 100644 --- a/src/executorlib/task_scheduler/base.py +++ b/src/executorlib/task_scheduler/base.py @@ -198,7 +198,9 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): if cancel_futures and self._future_queue is not None: cancel_items_in_queue(que=self._future_queue) if self._process is not None and self._future_queue is not None: - self._future_queue.put({"shutdown": True, "wait": wait, "cancel_futures": cancel_futures}) + self._future_queue.put( + {"shutdown": True, "wait": wait, "cancel_futures": cancel_futures} + ) if wait and isinstance(self._process, Thread): self._process.join() self._future_queue.join() diff --git a/src/executorlib/task_scheduler/file/shared.py b/src/executorlib/task_scheduler/file/shared.py index 74d30138..330ac5c5 100644 --- a/src/executorlib/task_scheduler/file/shared.py +++ b/src/executorlib/task_scheduler/file/shared.py @@ -73,7 +73,7 @@ def execute_tasks_h5( backend (str, optional): name of the backend used to spawn tasks. disable_dependencies (boolean): Disable resolving future objects during the submission. pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) - cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on shutdown. Returns: diff --git a/src/executorlib/task_scheduler/file/task_scheduler.py b/src/executorlib/task_scheduler/file/task_scheduler.py index 1ca5d8d9..509d5745 100644 --- a/src/executorlib/task_scheduler/file/task_scheduler.py +++ b/src/executorlib/task_scheduler/file/task_scheduler.py @@ -51,7 +51,7 @@ def __init__( backend (str, optional): name of the backend used to spawn tasks. disable_dependencies (boolean): Disable resolving future objects during the submission. pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None - cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes + cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on shutdown. """ super().__init__(max_cores=None) From 80bab776c23b444039331f2c6c98b8c31a2a992d Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 28 Jan 2026 14:18:11 +0100 Subject: [PATCH 05/10] Update flux.py --- src/executorlib/executor/flux.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/executorlib/executor/flux.py b/src/executorlib/executor/flux.py index db3e9ca3..8e725d8b 100644 --- a/src/executorlib/executor/flux.py +++ b/src/executorlib/executor/flux.py @@ -379,9 +379,6 @@ def __init__( import pysqa # noqa if block_allocation: - check_cancel_futures_on_shutdown( - cancel_futures_on_shutdown=cancel_futures_on_shutdown - ) from executorlib.task_scheduler.interactive.spawner_pysqa import ( create_pysqa_block_allocation_scheduler, ) From fa012d6d0171bfb2bf2199bda30171828446b4d3 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 28 Jan 2026 14:20:29 +0100 Subject: [PATCH 06/10] Update slurm.py --- src/executorlib/executor/slurm.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/executorlib/executor/slurm.py b/src/executorlib/executor/slurm.py index 31d98316..5eef177b 100644 --- a/src/executorlib/executor/slurm.py +++ b/src/executorlib/executor/slurm.py @@ -176,9 +176,6 @@ def __init__( import pysqa # noqa if block_allocation: - check_cancel_futures_on_shutdown( - cancel_futures_on_shutdown=cancel_futures_on_shutdown - ) from executorlib.task_scheduler.interactive.spawner_pysqa import ( create_pysqa_block_allocation_scheduler, ) From 9d6ee4da46867dd8a083117d85758bc5d502b4c1 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 28 Jan 2026 14:36:39 +0100 Subject: [PATCH 07/10] Update test_standalone_inputcheck.py --- tests/test_standalone_inputcheck.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/test_standalone_inputcheck.py b/tests/test_standalone_inputcheck.py index 38fa896c..fbd659c1 100644 --- a/tests/test_standalone_inputcheck.py +++ b/tests/test_standalone_inputcheck.py @@ -1,6 +1,7 @@ import unittest from executorlib.standalone.inputcheck import ( + check_cancel_futures_on_shutdown, check_command_line_argument_lst, check_gpus_per_worker, check_oversubscribe, @@ -24,6 +25,10 @@ class TestInputCheck(unittest.TestCase): + def test_check_cancel_futures_on_shutdown(self): + with self.assertRaises(ValueError): + check_cancel_futures_on_shutdown(cancel_futures_on_shutdown=True) + def test_check_command_line_argument_lst(self): with self.assertRaises(ValueError): check_command_line_argument_lst(command_line_argument_lst=["a"]) From c2b7af476fe99f2914a4d4c2272afc0235e31f3f Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 28 Jan 2026 16:30:33 +0100 Subject: [PATCH 08/10] fix --- src/executorlib/task_scheduler/file/shared.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/executorlib/task_scheduler/file/shared.py b/src/executorlib/task_scheduler/file/shared.py index 330ac5c5..c731ac93 100644 --- a/src/executorlib/task_scheduler/file/shared.py +++ b/src/executorlib/task_scheduler/file/shared.py @@ -100,7 +100,7 @@ def execute_tasks_h5( for key, value in memory_dict.items() if not value.done() } - if task_dict["cancel_futures"] and cancel_futures_on_shutdown: + if task_dict["cancel_futures"] or cancel_futures_on_shutdown: if ( terminate_function is not None and terminate_function == terminate_subprocess From 8531773fc6df2f295821d7b87432b53052671b61 Mon Sep 17 00:00:00 2001 From: jan-janssen Date: Wed, 28 Jan 2026 21:07:26 +0100 Subject: [PATCH 09/10] fixes --- src/executorlib/executor/flux.py | 2 +- src/executorlib/executor/single.py | 2 +- src/executorlib/task_scheduler/file/shared.py | 19 ++++++++++++++++--- .../task_scheduler/file/task_scheduler.py | 2 +- 4 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/executorlib/executor/flux.py b/src/executorlib/executor/flux.py index 8e725d8b..a962c8b9 100644 --- a/src/executorlib/executor/flux.py +++ b/src/executorlib/executor/flux.py @@ -310,7 +310,7 @@ def __init__( plot_dependency_graph_filename: Optional[str] = None, export_workflow_filename: Optional[str] = None, log_obj_size: bool = False, - cancel_futures_on_shutdown: bool = True, + cancel_futures_on_shutdown: bool = False, ): """ The executorlib.FluxClusterExecutor leverages either the message passing interface (MPI), the SLURM workload diff --git a/src/executorlib/executor/single.py b/src/executorlib/executor/single.py index 67d08c60..040125a8 100644 --- a/src/executorlib/executor/single.py +++ b/src/executorlib/executor/single.py @@ -279,7 +279,7 @@ def __init__( plot_dependency_graph_filename: Optional[str] = None, export_workflow_filename: Optional[str] = None, log_obj_size: bool = False, - cancel_futures_on_shutdown: bool = True, + cancel_futures_on_shutdown: bool = False, ): """ The executorlib.api.TestClusterExecutor is designed to test the file based communication used in the diff --git a/src/executorlib/task_scheduler/file/shared.py b/src/executorlib/task_scheduler/file/shared.py index c731ac93..41938204 100644 --- a/src/executorlib/task_scheduler/file/shared.py +++ b/src/executorlib/task_scheduler/file/shared.py @@ -57,7 +57,7 @@ def execute_tasks_h5( backend: Optional[str] = None, disable_dependencies: bool = False, pmi_mode: Optional[str] = None, - cancel_futures_on_shutdown: bool = True, + cancel_futures_on_shutdown: bool = False, ) -> None: """ Execute tasks stored in a queue using HDF5 files. @@ -89,7 +89,7 @@ def execute_tasks_h5( with contextlib.suppress(queue.Empty): task_dict = future_queue.get_nowait() if task_dict is not None and "shutdown" in task_dict and task_dict["shutdown"]: - if task_dict["wait"]: + if task_dict["wait"] and not cancel_futures_on_shutdown: while len(memory_dict) > 0: memory_dict = { key: _check_task_output( @@ -100,7 +100,7 @@ def execute_tasks_h5( for key, value in memory_dict.items() if not value.done() } - if task_dict["cancel_futures"] or cancel_futures_on_shutdown: + if not task_dict["cancel_futures"] and not cancel_futures_on_shutdown: if ( terminate_function is not None and terminate_function == terminate_subprocess @@ -114,6 +114,19 @@ def execute_tasks_h5( config_directory=pysqa_config_directory, backend=backend, ) + else: + memory_dict = { + key: _check_task_output( + task_key=key, + future_obj=value, + cache_directory=cache_dir_dict[key], + ) + for key, value in memory_dict.items() + if not value.done() + } + for value in memory_dict.values(): + if not value.done(): + value.cancel() future_queue.task_done() future_queue.join() break diff --git a/src/executorlib/task_scheduler/file/task_scheduler.py b/src/executorlib/task_scheduler/file/task_scheduler.py index 509d5745..25d238bf 100644 --- a/src/executorlib/task_scheduler/file/task_scheduler.py +++ b/src/executorlib/task_scheduler/file/task_scheduler.py @@ -35,7 +35,7 @@ def __init__( backend: Optional[str] = None, disable_dependencies: bool = False, pmi_mode: Optional[str] = None, - cancel_futures_on_shutdown: bool = True, + cancel_futures_on_shutdown: bool = False, ): """ Initialize the FileExecutor. From cc573ae1efcc01d1c28d44668456733e097377c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 28 Jan 2026 22:06:39 +0100 Subject: [PATCH 10/10] try different tests --- tests/test_fluxclusterexecutor.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/test_fluxclusterexecutor.py b/tests/test_fluxclusterexecutor.py index 04680c34..7d391a2d 100644 --- a/tests/test_fluxclusterexecutor.py +++ b/tests/test_fluxclusterexecutor.py @@ -95,6 +95,32 @@ def test_executor_blockallocation_echo(self): self.assertTrue(fs1.done()) self.assertTrue(fs2.done()) + def test_executor_cancel_future_on_shutdown(self): + with FluxClusterExecutor( + resource_dict={"cores": 1, "cwd": "executorlib_cache"}, + block_allocation=False, + cache_directory="executorlib_cache", + pmi_mode=pmi, + cancel_futures_on_shutdown=True, + ) as exe: + cloudpickle_register(ind=1) + fs1 = exe.submit(echo, 1) + self.assertFalse(fs1.done()) + self.assertTrue(fs1.cancelled()) + sleep(2) + with FluxClusterExecutor( + resource_dict={"cores": 1, "cwd": "executorlib_cache"}, + block_allocation=False, + cache_directory="executorlib_cache", + pmi_mode=pmi, + cancel_futures_on_shutdown=True, + ) as exe: + cloudpickle_register(ind=1) + fs1 = exe.submit(echo, 1) + self.assertEqual(fs1.result(), 1) + self.assertEqual(len(os.listdir("executorlib_cache")), 4) + self.assertTrue(fs1.done()) + def test_executor_no_cwd(self): with FluxClusterExecutor( resource_dict={"cores": 2},