diff --git a/src/executorlib/executor/flux.py b/src/executorlib/executor/flux.py index f06d6570..4eb0d61f 100644 --- a/src/executorlib/executor/flux.py +++ b/src/executorlib/executor/flux.py @@ -9,6 +9,7 @@ check_plot_dependency_graph, check_pmi, check_refresh_rate, + check_wait_on_shutdown, validate_number_of_cores, ) from executorlib.task_scheduler.interactive.blockallocation import ( @@ -67,6 +68,7 @@ class FluxJobExecutor(BaseExecutor): plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + wait (bool): Whether to wait for the completion of all tasks before shutting down the executor. Examples: ``` @@ -108,6 +110,7 @@ def __init__( plot_dependency_graph_filename: Optional[str] = None, export_workflow_filename: Optional[str] = None, log_obj_size: bool = False, + wait: bool = True, ): """ The executorlib.FluxJobExecutor leverages either the message passing interface (MPI), the SLURM workload manager @@ -156,6 +159,7 @@ def __init__( plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + wait (bool): Whether to wait for the completion of all tasks before shutting down the executor. """ default_resource_dict: dict = { @@ -187,6 +191,7 @@ def __init__( block_allocation=block_allocation, init_function=init_function, log_obj_size=log_obj_size, + wait=wait, ), max_cores=max_cores, refresh_rate=refresh_rate, @@ -212,6 +217,7 @@ def __init__( block_allocation=block_allocation, init_function=init_function, log_obj_size=log_obj_size, + wait=wait, ) ) @@ -261,6 +267,7 @@ class FluxClusterExecutor(BaseExecutor): plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + wait (bool): Whether to wait for the completion of all tasks before shutting down the executor. Examples: ``` @@ -300,6 +307,7 @@ def __init__( plot_dependency_graph_filename: Optional[str] = None, export_workflow_filename: Optional[str] = None, log_obj_size: bool = False, + wait: bool = True, ): """ The executorlib.FluxClusterExecutor leverages either the message passing interface (MPI), the SLURM workload @@ -346,6 +354,7 @@ def __init__( plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + wait (bool): Whether to wait for the completion of all tasks before shutting down the executor. """ default_resource_dict: dict = { @@ -405,6 +414,7 @@ def __init__( block_allocation=block_allocation, init_function=init_function, disable_dependencies=disable_dependencies, + wait=wait, ) ) else: @@ -445,6 +455,7 @@ def create_flux_executor( block_allocation: bool = False, init_function: Optional[Callable] = None, log_obj_size: bool = False, + wait: bool = True, ) -> Union[OneProcessTaskScheduler, BlockAllocationTaskScheduler]: """ Create a flux executor @@ -483,6 +494,7 @@ def create_flux_executor( of the individual function. init_function (None): optional function to preset arguments for functions which are submitted later log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + wait (bool): Whether to wait for the completion of all tasks before shutting down the executor. Returns: InteractiveStepExecutor/ InteractiveExecutor @@ -504,6 +516,7 @@ def create_flux_executor( check_command_line_argument_lst( command_line_argument_lst=resource_dict.get("slurm_cmd_args", []) ) + check_wait_on_shutdown(wait_on_shutdown=wait) if "openmpi_oversubscribe" in resource_dict: del resource_dict["openmpi_oversubscribe"] if "slurm_cmd_args" in resource_dict: diff --git a/src/executorlib/executor/single.py b/src/executorlib/executor/single.py index d0140fa5..8e9e0650 100644 --- a/src/executorlib/executor/single.py +++ b/src/executorlib/executor/single.py @@ -7,6 +7,7 @@ check_init_function, check_plot_dependency_graph, check_refresh_rate, + check_wait_on_shutdown, validate_number_of_cores, ) from executorlib.standalone.interactive.spawner import MpiExecSpawner @@ -60,6 +61,7 @@ class SingleNodeExecutor(BaseExecutor): plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + wait (bool): Whether to wait for the completion of all tasks before shutting down the executor. Examples: ``` @@ -97,6 +99,7 @@ def __init__( plot_dependency_graph_filename: Optional[str] = None, export_workflow_filename: Optional[str] = None, log_obj_size: bool = False, + wait: bool = True, ): """ The executorlib.SingleNodeExecutor leverages either the message passing interface (MPI), the SLURM workload @@ -142,6 +145,7 @@ def __init__( plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + wait (bool): Whether to wait for the completion of all tasks before shutting down the executor. """ default_resource_dict: dict = { @@ -169,6 +173,7 @@ def __init__( block_allocation=block_allocation, init_function=init_function, log_obj_size=log_obj_size, + wait=wait, ), max_cores=max_cores, refresh_rate=refresh_rate, @@ -190,6 +195,7 @@ def __init__( block_allocation=block_allocation, init_function=init_function, log_obj_size=log_obj_size, + wait=wait, ) ) @@ -232,6 +238,7 @@ class TestClusterExecutor(BaseExecutor): plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + wait (bool): Whether to wait for the completion of all tasks before shutting down the executor. Examples: ``` @@ -269,6 +276,7 @@ def __init__( plot_dependency_graph_filename: Optional[str] = None, export_workflow_filename: Optional[str] = None, log_obj_size: bool = False, + wait: bool = True, ): """ The executorlib.api.TestClusterExecutor is designed to test the file based communication used in the @@ -307,6 +315,7 @@ def __init__( plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + wait (bool): Whether to wait for the completion of all tasks before shutting down the executor. """ default_resource_dict: dict = { @@ -346,6 +355,7 @@ def __init__( init_function=init_function, disable_dependencies=disable_dependencies, execute_function=execute_in_subprocess, + wait=wait, ) ) else: @@ -379,6 +389,7 @@ def create_single_node_executor( block_allocation: bool = False, init_function: Optional[Callable] = None, log_obj_size: bool = False, + wait: bool = True, ) -> Union[OneProcessTaskScheduler, BlockAllocationTaskScheduler]: """ Create a single node executor @@ -413,6 +424,7 @@ def create_single_node_executor( of the individual function. init_function (None): optional function to preset arguments for functions which are submitted later log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + wait (bool): Whether to wait for the completion of all tasks before shutting down the executor. Returns: InteractiveStepExecutor/ InteractiveExecutor @@ -429,6 +441,7 @@ def create_single_node_executor( check_command_line_argument_lst( command_line_argument_lst=resource_dict.get("slurm_cmd_args", []) ) + check_wait_on_shutdown(wait_on_shutdown=wait) if "threads_per_core" in resource_dict: del resource_dict["threads_per_core"] if "gpus_per_core" in resource_dict: diff --git a/src/executorlib/executor/slurm.py b/src/executorlib/executor/slurm.py index 1631c914..f021a78b 100644 --- a/src/executorlib/executor/slurm.py +++ b/src/executorlib/executor/slurm.py @@ -6,6 +6,7 @@ check_log_obj_size, check_plot_dependency_graph, check_refresh_rate, + check_wait_on_shutdown, validate_number_of_cores, ) from executorlib.task_scheduler.interactive.blockallocation import ( @@ -65,6 +66,7 @@ class SlurmClusterExecutor(BaseExecutor): plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + wait (bool): Whether to wait for the completion of all tasks before shutting down the executor. Examples: ``` @@ -104,6 +106,7 @@ def __init__( plot_dependency_graph_filename: Optional[str] = None, export_workflow_filename: Optional[str] = None, log_obj_size: bool = False, + wait: bool = True, ): """ The executorlib.SlurmClusterExecutor leverages either the message passing interface (MPI), the SLURM workload @@ -150,6 +153,7 @@ def __init__( plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + wait (bool): Whether to wait for the completion of all tasks before shutting down the executor. """ default_resource_dict: dict = { @@ -210,6 +214,7 @@ def __init__( block_allocation=block_allocation, init_function=init_function, disable_dependencies=disable_dependencies, + wait=wait, ) ) else: @@ -281,6 +286,7 @@ class SlurmJobExecutor(BaseExecutor): plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + wait (bool): Whether to wait for the completion of all tasks before shutting down the executor. Examples: ``` @@ -319,6 +325,7 @@ def __init__( plot_dependency_graph_filename: Optional[str] = None, export_workflow_filename: Optional[str] = None, log_obj_size: bool = False, + wait: bool = True, ): """ The executorlib.SlurmJobExecutor leverages either the message passing interface (MPI), the SLURM workload @@ -368,6 +375,7 @@ def __init__( plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. export_workflow_filename (str): Name of the file to store the exported workflow graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + wait (bool): Whether to wait for the completion of all tasks before shutting down the executor. """ default_resource_dict: dict = { @@ -396,6 +404,7 @@ def __init__( block_allocation=block_allocation, init_function=init_function, log_obj_size=log_obj_size, + wait=wait, ), max_cores=max_cores, refresh_rate=refresh_rate, @@ -418,6 +427,7 @@ def __init__( block_allocation=block_allocation, init_function=init_function, log_obj_size=log_obj_size, + wait=wait, ) ) @@ -432,6 +442,7 @@ def create_slurm_executor( block_allocation: bool = False, init_function: Optional[Callable] = None, log_obj_size: bool = False, + wait: bool = True, ) -> Union[OneProcessTaskScheduler, BlockAllocationTaskScheduler]: """ Create a SLURM executor @@ -471,6 +482,7 @@ def create_slurm_executor( of the individual function. init_function (None): optional function to preset arguments for functions which are submitted later log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + wait (bool): Whether to wait for the completion of all tasks before shutting down the executor. Returns: InteractiveStepExecutor/ InteractiveExecutor @@ -483,6 +495,7 @@ def create_slurm_executor( resource_dict["log_obj_size"] = log_obj_size resource_dict["pmi_mode"] = pmi_mode check_init_function(block_allocation=block_allocation, init_function=init_function) + check_wait_on_shutdown(wait_on_shutdown=wait) if block_allocation: resource_dict["init_function"] = init_function max_workers = validate_number_of_cores( diff --git a/src/executorlib/standalone/inputcheck.py b/src/executorlib/standalone/inputcheck.py index 6f6ab763..fdba845a 100644 --- a/src/executorlib/standalone/inputcheck.py +++ b/src/executorlib/standalone/inputcheck.py @@ -17,6 +17,18 @@ def check_oversubscribe(oversubscribe: bool) -> None: ) +def check_wait_on_shutdown( + wait_on_shutdown: bool, +) -> None: + """ + Check if wait_on_shutdown is False and raise a ValueError if it is. + """ + if not wait_on_shutdown: + raise ValueError( + "The wait_on_shutdown parameter is only supported for the executorlib.FluxClusterExecutor and executorlib.SlurmClusterExecutor." + ) + + def check_command_line_argument_lst(command_line_argument_lst: list[str]) -> None: """ Check if command_line_argument_lst is not empty and raise a ValueError if it is. diff --git a/src/executorlib/task_scheduler/base.py b/src/executorlib/task_scheduler/base.py index 8940fd7a..bba2622a 100644 --- a/src/executorlib/task_scheduler/base.py +++ b/src/executorlib/task_scheduler/base.py @@ -198,7 +198,9 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): if cancel_futures and self._future_queue is not None: cancel_items_in_queue(que=self._future_queue) if self._process is not None and self._future_queue is not None: - self._future_queue.put({"shutdown": True, "wait": wait}) + self._future_queue.put( + {"shutdown": True, "wait": wait, "cancel_futures": cancel_futures} + ) if wait and isinstance(self._process, Thread): self._process.join() self._future_queue.join() diff --git a/src/executorlib/task_scheduler/file/shared.py b/src/executorlib/task_scheduler/file/shared.py index 177e28cd..c65409ca 100644 --- a/src/executorlib/task_scheduler/file/shared.py +++ b/src/executorlib/task_scheduler/file/shared.py @@ -57,6 +57,7 @@ def execute_tasks_h5( backend: Optional[str] = None, disable_dependencies: bool = False, pmi_mode: Optional[str] = None, + wait: bool = True, ) -> None: """ Execute tasks stored in a queue using HDF5 files. @@ -72,6 +73,7 @@ def execute_tasks_h5( backend (str, optional): name of the backend used to spawn tasks. disable_dependencies (boolean): Disable resolving future objects during the submission. pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) + wait (bool): Whether to wait for the completion of all tasks before shutting down the executor. Returns: None @@ -86,7 +88,7 @@ def execute_tasks_h5( with contextlib.suppress(queue.Empty): task_dict = future_queue.get_nowait() if task_dict is not None and "shutdown" in task_dict and task_dict["shutdown"]: - if task_dict["wait"]: + if task_dict["wait"] and wait: while len(memory_dict) > 0: memory_dict = { key: _check_task_output( @@ -97,19 +99,33 @@ def execute_tasks_h5( for key, value in memory_dict.items() if not value.done() } - if ( - terminate_function is not None - and terminate_function == terminate_subprocess - ): - for task in process_dict.values(): - terminate_function(task=task) - elif terminate_function is not None: - for queue_id in process_dict.values(): - terminate_function( - queue_id=queue_id, - config_directory=pysqa_config_directory, - backend=backend, + if not task_dict["cancel_futures"] and wait: + if ( + terminate_function is not None + and terminate_function == terminate_subprocess + ): + for task in process_dict.values(): + terminate_function(task=task) + elif terminate_function is not None: + for queue_id in process_dict.values(): + terminate_function( + queue_id=queue_id, + config_directory=pysqa_config_directory, + backend=backend, + ) + else: + memory_dict = { + key: _check_task_output( + task_key=key, + future_obj=value, + cache_directory=cache_dir_dict[key], ) + for key, value in memory_dict.items() + if not value.done() + } + for value in memory_dict.values(): + if not value.done(): + value.cancel() future_queue.task_done() future_queue.join() break diff --git a/src/executorlib/task_scheduler/file/task_scheduler.py b/src/executorlib/task_scheduler/file/task_scheduler.py index d12480d1..e4cec50c 100644 --- a/src/executorlib/task_scheduler/file/task_scheduler.py +++ b/src/executorlib/task_scheduler/file/task_scheduler.py @@ -35,6 +35,7 @@ def __init__( backend: Optional[str] = None, disable_dependencies: bool = False, pmi_mode: Optional[str] = None, + wait: bool = True, ): """ Initialize the FileExecutor. @@ -50,6 +51,7 @@ def __init__( backend (str, optional): name of the backend used to spawn tasks. disable_dependencies (boolean): Disable resolving future objects during the submission. pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None + wait (bool): Whether to wait for the completion of all tasks before shutting down the executor. """ super().__init__(max_cores=None) default_resource_dict = { @@ -73,6 +75,7 @@ def __init__( "backend": backend, "disable_dependencies": disable_dependencies, "pmi_mode": pmi_mode, + "wait": wait, } self._set_process( Thread( @@ -98,6 +101,7 @@ def create_file_executor( init_function: Optional[Callable] = None, disable_dependencies: bool = False, execute_function: Callable = execute_with_pysqa, + wait: bool = True, ): if block_allocation: raise ValueError( @@ -128,4 +132,5 @@ def create_file_executor( execute_function=execute_function, terminate_function=terminate_function, pmi_mode=pmi_mode, + wait=wait, ) diff --git a/tests/unit/executor/test_flux_cluster.py b/tests/unit/executor/test_flux_cluster.py index 04680c34..48736c73 100644 --- a/tests/unit/executor/test_flux_cluster.py +++ b/tests/unit/executor/test_flux_cluster.py @@ -95,6 +95,32 @@ def test_executor_blockallocation_echo(self): self.assertTrue(fs1.done()) self.assertTrue(fs2.done()) + def test_executor_cancel_future_on_shutdown(self): + with FluxClusterExecutor( + resource_dict={"cores": 1, "cwd": "executorlib_cache"}, + block_allocation=False, + cache_directory="executorlib_cache", + pmi_mode=pmi, + wait=False, + ) as exe: + cloudpickle_register(ind=1) + fs1 = exe.submit(echo, 1) + self.assertFalse(fs1.done()) + self.assertTrue(fs1.cancelled()) + sleep(2) + with FluxClusterExecutor( + resource_dict={"cores": 1, "cwd": "executorlib_cache"}, + block_allocation=False, + cache_directory="executorlib_cache", + pmi_mode=pmi, + wait=False, + ) as exe: + cloudpickle_register(ind=1) + fs1 = exe.submit(echo, 1) + self.assertEqual(fs1.result(), 1) + self.assertEqual(len(os.listdir("executorlib_cache")), 4) + self.assertTrue(fs1.done()) + def test_executor_no_cwd(self): with FluxClusterExecutor( resource_dict={"cores": 2}, diff --git a/tests/unit/standalone/test_inputcheck.py b/tests/unit/standalone/test_inputcheck.py index 38fa896c..93e6b176 100644 --- a/tests/unit/standalone/test_inputcheck.py +++ b/tests/unit/standalone/test_inputcheck.py @@ -1,6 +1,7 @@ import unittest from executorlib.standalone.inputcheck import ( + check_wait_on_shutdown, check_command_line_argument_lst, check_gpus_per_worker, check_oversubscribe, @@ -24,6 +25,10 @@ class TestInputCheck(unittest.TestCase): + def test_check_wait_on_shutdown(self): + with self.assertRaises(ValueError): + check_wait_on_shutdown(wait_on_shutdown=False) + def test_check_command_line_argument_lst(self): with self.assertRaises(ValueError): check_command_line_argument_lst(command_line_argument_lst=["a"])