Skip to content
20 changes: 20 additions & 0 deletions src/executorlib/executor/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from executorlib.executor.base import BaseExecutor
from executorlib.standalone.inputcheck import (
check_cancel_futures_on_shutdown,
check_command_line_argument_lst,
check_init_function,
check_log_obj_size,
Expand Down Expand Up @@ -67,6 +68,8 @@ class FluxJobExecutor(BaseExecutor):
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on
shutdown.

Examples:
```
Expand Down Expand Up @@ -108,6 +111,7 @@ def __init__(
plot_dependency_graph_filename: Optional[str] = None,
export_workflow_filename: Optional[str] = None,
log_obj_size: bool = False,
cancel_futures_on_shutdown: bool = False,
):
"""
The executorlib.FluxJobExecutor leverages either the message passing interface (MPI), the SLURM workload manager
Expand Down Expand Up @@ -156,6 +160,8 @@ def __init__(
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes
on shutdown.

"""
default_resource_dict: dict = {
Expand Down Expand Up @@ -187,6 +193,7 @@ def __init__(
block_allocation=block_allocation,
init_function=init_function,
log_obj_size=log_obj_size,
cancel_futures_on_shutdown=cancel_futures_on_shutdown,
),
max_cores=max_cores,
refresh_rate=refresh_rate,
Expand All @@ -212,6 +219,7 @@ def __init__(
block_allocation=block_allocation,
init_function=init_function,
log_obj_size=log_obj_size,
cancel_futures_on_shutdown=cancel_futures_on_shutdown,
)
)

Expand Down Expand Up @@ -261,6 +269,8 @@ class FluxClusterExecutor(BaseExecutor):
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes
on shutdown.

Examples:
```
Expand Down Expand Up @@ -300,6 +310,7 @@ def __init__(
plot_dependency_graph_filename: Optional[str] = None,
export_workflow_filename: Optional[str] = None,
log_obj_size: bool = False,
cancel_futures_on_shutdown: bool = False,
):
"""
The executorlib.FluxClusterExecutor leverages either the message passing interface (MPI), the SLURM workload
Expand Down Expand Up @@ -346,6 +357,8 @@ def __init__(
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes
on shutdown.

"""
default_resource_dict: dict = {
Expand Down Expand Up @@ -405,6 +418,7 @@ def __init__(
block_allocation=block_allocation,
init_function=init_function,
disable_dependencies=disable_dependencies,
cancel_futures_on_shutdown=cancel_futures_on_shutdown,
)
)
else:
Expand Down Expand Up @@ -445,6 +459,7 @@ def create_flux_executor(
block_allocation: bool = False,
init_function: Optional[Callable] = None,
log_obj_size: bool = False,
cancel_futures_on_shutdown: bool = False,
) -> Union[OneProcessTaskScheduler, BlockAllocationTaskScheduler]:
"""
Create a flux executor
Expand Down Expand Up @@ -483,6 +498,8 @@ def create_flux_executor(
of the individual function.
init_function (None): optional function to preset arguments for functions which are submitted later
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on
shutdown.

Returns:
InteractiveStepExecutor/ InteractiveExecutor
Expand All @@ -504,6 +521,9 @@ def create_flux_executor(
check_command_line_argument_lst(
command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
)
check_cancel_futures_on_shutdown(
cancel_futures_on_shutdown=cancel_futures_on_shutdown
)
if "openmpi_oversubscribe" in resource_dict:
del resource_dict["openmpi_oversubscribe"]
if "slurm_cmd_args" in resource_dict:
Expand Down
20 changes: 20 additions & 0 deletions src/executorlib/executor/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from executorlib.executor.base import BaseExecutor
from executorlib.standalone.inputcheck import (
check_cancel_futures_on_shutdown,
check_command_line_argument_lst,
check_gpus_per_worker,
check_init_function,
Expand Down Expand Up @@ -60,6 +61,8 @@ class SingleNodeExecutor(BaseExecutor):
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on
shutdown.

Examples:
```
Expand Down Expand Up @@ -97,6 +100,7 @@ def __init__(
plot_dependency_graph_filename: Optional[str] = None,
export_workflow_filename: Optional[str] = None,
log_obj_size: bool = False,
cancel_futures_on_shutdown: bool = False,
):
"""
The executorlib.SingleNodeExecutor leverages either the message passing interface (MPI), the SLURM workload
Expand Down Expand Up @@ -142,6 +146,8 @@ def __init__(
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes
on shutdown.

"""
default_resource_dict: dict = {
Expand Down Expand Up @@ -169,6 +175,7 @@ def __init__(
block_allocation=block_allocation,
init_function=init_function,
log_obj_size=log_obj_size,
cancel_futures_on_shutdown=cancel_futures_on_shutdown,
),
max_cores=max_cores,
refresh_rate=refresh_rate,
Expand All @@ -190,6 +197,7 @@ def __init__(
block_allocation=block_allocation,
init_function=init_function,
log_obj_size=log_obj_size,
cancel_futures_on_shutdown=cancel_futures_on_shutdown,
)
)

Expand Down Expand Up @@ -232,6 +240,8 @@ class TestClusterExecutor(BaseExecutor):
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on
shutdown.

Examples:
```
Expand Down Expand Up @@ -269,6 +279,7 @@ def __init__(
plot_dependency_graph_filename: Optional[str] = None,
export_workflow_filename: Optional[str] = None,
log_obj_size: bool = False,
cancel_futures_on_shutdown: bool = False,
):
"""
The executorlib.api.TestClusterExecutor is designed to test the file based communication used in the
Expand Down Expand Up @@ -307,6 +318,8 @@ def __init__(
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes
on shutdown.

"""
default_resource_dict: dict = {
Expand Down Expand Up @@ -346,6 +359,7 @@ def __init__(
init_function=init_function,
disable_dependencies=disable_dependencies,
execute_function=execute_in_subprocess,
cancel_futures_on_shutdown=cancel_futures_on_shutdown,
)
)
else:
Expand Down Expand Up @@ -379,6 +393,7 @@ def create_single_node_executor(
block_allocation: bool = False,
init_function: Optional[Callable] = None,
log_obj_size: bool = False,
cancel_futures_on_shutdown: bool = False,
) -> Union[OneProcessTaskScheduler, BlockAllocationTaskScheduler]:
"""
Create a single node executor
Expand Down Expand Up @@ -413,6 +428,8 @@ def create_single_node_executor(
of the individual function.
init_function (None): optional function to preset arguments for functions which are submitted later
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on
shutdown.

Returns:
InteractiveStepExecutor/ InteractiveExecutor
Expand All @@ -429,6 +446,9 @@ def create_single_node_executor(
check_command_line_argument_lst(
command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
)
check_cancel_futures_on_shutdown(
cancel_futures_on_shutdown=cancel_futures_on_shutdown
)
if "threads_per_core" in resource_dict:
del resource_dict["threads_per_core"]
if "gpus_per_core" in resource_dict:
Expand Down
20 changes: 20 additions & 0 deletions src/executorlib/executor/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from executorlib.executor.base import BaseExecutor
from executorlib.standalone.inputcheck import (
check_cancel_futures_on_shutdown,
check_init_function,
check_log_obj_size,
check_plot_dependency_graph,
Expand Down Expand Up @@ -65,6 +66,8 @@ class SlurmClusterExecutor(BaseExecutor):
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on
shutdown.

Examples:
```
Expand Down Expand Up @@ -104,6 +107,7 @@ def __init__(
plot_dependency_graph_filename: Optional[str] = None,
export_workflow_filename: Optional[str] = None,
log_obj_size: bool = False,
cancel_futures_on_shutdown: bool = False,
):
"""
The executorlib.SlurmClusterExecutor leverages either the message passing interface (MPI), the SLURM workload
Expand Down Expand Up @@ -150,6 +154,8 @@ def __init__(
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes
on shutdown.

"""
default_resource_dict: dict = {
Expand Down Expand Up @@ -210,6 +216,7 @@ def __init__(
block_allocation=block_allocation,
init_function=init_function,
disable_dependencies=disable_dependencies,
cancel_futures_on_shutdown=cancel_futures_on_shutdown,
)
)
else:
Expand Down Expand Up @@ -281,6 +288,8 @@ class SlurmJobExecutor(BaseExecutor):
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on
shutdown.

Examples:
```
Expand Down Expand Up @@ -319,6 +328,7 @@ def __init__(
plot_dependency_graph_filename: Optional[str] = None,
export_workflow_filename: Optional[str] = None,
log_obj_size: bool = False,
cancel_futures_on_shutdown: bool = False,
):
"""
The executorlib.SlurmJobExecutor leverages either the message passing interface (MPI), the SLURM workload
Expand Down Expand Up @@ -368,6 +378,8 @@ def __init__(
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
export_workflow_filename (str): Name of the file to store the exported workflow graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes
on shutdown.

"""
default_resource_dict: dict = {
Expand Down Expand Up @@ -396,6 +408,7 @@ def __init__(
block_allocation=block_allocation,
init_function=init_function,
log_obj_size=log_obj_size,
cancel_futures_on_shutdown=cancel_futures_on_shutdown,
),
max_cores=max_cores,
refresh_rate=refresh_rate,
Expand All @@ -418,6 +431,7 @@ def __init__(
block_allocation=block_allocation,
init_function=init_function,
log_obj_size=log_obj_size,
cancel_futures_on_shutdown=cancel_futures_on_shutdown,
)
)

Expand All @@ -432,6 +446,7 @@ def create_slurm_executor(
block_allocation: bool = False,
init_function: Optional[Callable] = None,
log_obj_size: bool = False,
cancel_futures_on_shutdown: bool = False,
) -> Union[OneProcessTaskScheduler, BlockAllocationTaskScheduler]:
"""
Create a SLURM executor
Expand Down Expand Up @@ -471,6 +486,8 @@ def create_slurm_executor(
of the individual function.
init_function (None): optional function to preset arguments for functions which are submitted later
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
cancel_futures_on_shutdown (bool): Whether to cancel pending futures and the corresponding Python processes on
shutdown.

Returns:
InteractiveStepExecutor/ InteractiveExecutor
Expand All @@ -483,6 +500,9 @@ def create_slurm_executor(
resource_dict["log_obj_size"] = log_obj_size
resource_dict["pmi_mode"] = pmi_mode
check_init_function(block_allocation=block_allocation, init_function=init_function)
check_cancel_futures_on_shutdown(
cancel_futures_on_shutdown=cancel_futures_on_shutdown
)
if block_allocation:
resource_dict["init_function"] = init_function
max_workers = validate_number_of_cores(
Expand Down
12 changes: 12 additions & 0 deletions src/executorlib/standalone/inputcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ def check_oversubscribe(oversubscribe: bool) -> None:
)


def check_cancel_futures_on_shutdown(
cancel_futures_on_shutdown: bool,
) -> None:
"""
Check if cancel_futures_on_shutdown is True and raise a ValueError if it is.
"""
if cancel_futures_on_shutdown:
raise ValueError(
"The cancel_futures_on_shutdown parameter is only supported for the executorlib.FluxClusterExecutor and executorlib.SlurmClusterExecutor."
)


def check_command_line_argument_lst(command_line_argument_lst: list[str]) -> None:
"""
Check if command_line_argument_lst is not empty and raise a ValueError if it is.
Expand Down
4 changes: 3 additions & 1 deletion src/executorlib/task_scheduler/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
if cancel_futures and self._future_queue is not None:
cancel_items_in_queue(que=self._future_queue)
if self._process is not None and self._future_queue is not None:
self._future_queue.put({"shutdown": True, "wait": wait})
self._future_queue.put(
{"shutdown": True, "wait": wait, "cancel_futures": cancel_futures}
)
if wait and isinstance(self._process, Thread):
self._process.join()
self._future_queue.join()
Expand Down
Loading
Loading