Orchestrator
- @sane.register(f, priority=0)[source]
Adds a Python callable to the list of registered functions in
saneAny callable Python object which accepts
Orchestratoras the first positional argument may be registered. This is the primary way to havesanedirectly call Python code within a workflow. The aggregate list will then be invoked by anOrchestratorinstance.A priority can optionally be associated with this registration, corresponding to precedence in invocations. Priorities are handled in descending order, i.e. highest priority first. Equal priorities are evaluated in order of registration order.
See
Orchestrator.process_registered()for more info.The decorator may be called with no priority, in which case the default is
0.Example:
import sane @sane.register def last( orch ): # defaul priority is 0 pass @sane.register( priority=5 ) def second( orch ) pass @sane.register( 99 ) def first( orch ): pass
- Parameters:
f (Callable[[Orchestrator], None]) – Callable to register for future use when an
Orchestratorinstance loads the workflow. The calling instance will pass itself as the single positional argument to the registered callable.priority (int)
- class sane.Orchestrator[source]
Bases:
OptionLoaderWorkflow controller containing all hosts and actions
The Orchestrator serves as the main entry point for constructing, managing, and executing workflows. It uses a simple DAG (using Action IDs) to orchestrate action scheduling.
The Orchestrator is also responsible for any intercommunication between hosts and actions, as both actions and hosts are (generally) unaware of each other. Additionally, the Orchestrator catalogues cummulative workflow runs to provide a final state of all actions, caching results between runs if not cleared.
User Interface
User Methods
User Attributes & Properties
- actions: utdict.UniqueTypedDict[Action]
The unique set of actions in this workflow.
The actions are stored in a unique-key type-enforced dictionary. Only
Actioninstance objects may be stored (derived types valid)
- hosts: utdict.UniqueTypedDict[Host]
The unique set of hosts in this workflow.
The hosts are stored in a unique-key type-enforced dictionary. Only
Hostinstance objects may be stored (derived types valid)
- property save_location: str
The directory used for saving any intermediary SaveState or workflow cache
The provided path does not need to exist yet, but must exist in a location the user of the workflow has adequate permissions.
Internal API
While not to be invoked by the user, these functions may be useful reference.
- property current_host: str
Returns the current host name (key for
hosts) for this current workflow runThis value is only valid after
find_host()has been called. For normal users, this would be valid duringrun_actions().
- run_actions(action_id_list, as_host=None, continue_on_err=True, visualize=False)[source]
Run the workflow for the provided action id list and any dependencies
- Parameters:
action_id_list (List[str]) – A list of specifically requested ids from
actionsto run.as_host (str) – The preferred host name or alias to run as, if provided.
continue_on_err (bool) – Continue workflow evaluation as best as possible even if an
Actionencounters an error.visualize (bool) – Print out a CLI-friendly rendition of the dependency graph of actions to be run.
- add_search_paths(search_paths)[source]
Add a series of paths to search for workflow files. Cannot be used after
load_paths()has been called
- add_search_patterns(search_patterns)[source]
Add a series of Python re strings as filters for finding workflow files. Cannot be used after
load_paths()has been called
- load_paths()[source]
Load workflow definitions from current search paths and filters
This is the primary load call after all necessary paths and filters have been set. The order of operations is as follows:
Add all search paths to
sys.pathAll valid files matching at least one search filter across all paths are gathered.
Files are sorted based on file extension into
.pyand.json[c]All
.pyfiles are loaded viaload_py_files()All registered calls (via
@sane.register) are invoked in priority order viaprocess_registered()All
.json[c]files are then loaded viaload_config_files()(.jsonfirst, then.jsonc)All patches are processed in priority order via
process_patches()
- Return type:
None
- load_py_files(files)[source]
Load the provided list of python files as modules dynamically
Files are evaluated relative to the first path that yields this file from the set of search paths added via
add_search_paths().An effective module name is generated from the relative path to the file from the respective path. This module name is then dynamically imported using
importlib.import_module(), relying on the fact thatload_paths()has added the search paths to sys.path.Important
For workflows that use Python files with helper functions, classes, etc. in files separate from where a
@sane.registeroccurs this means that the provided search paths for this workflow can be treated as top-level searchable directories within your workflow’s Python code.For instance consider the following layout:
project/ ├── .sane │ ├── helpers │ │ ├── custom_action.py │ │ └── custom_host.py │ └── tests │ └── workflow_a.py └── src
The workflow may be invoked using:sane_runner -p .sane -a my_action -rWhere
my_actionis defined in.sane/tests/workflow_a.py:import sane import helpers.custom_action #< Relative to .sane @sane.register def workflow_a( orch ): orch.add_action( helpers.custom_action.MyAction( "a" ) )
And
helpers.custom_action.MyActionis defined in.sane/helpers/custom_action.py:import sane class MyAction( sane.Action ): def __init__( self, id ): super().__init__( id ) # ... implementation ...
Since
.saneis provided as a search path (and thus added tosys.path), we can treat theimportof other modules within our search path as relative to it.
- load_config_files(files)[source]
Load the provided list of files as JSON files (JSON with
//-style comments allowed) and callload_options()for each.See
load_core_options()for class-specific load implementation.
- load_options(options, origin=None)[source]
Base class implementation for loading of dict-based attributes into instance
Take a options dict of relevant attributes and load them via
load_core_options()thenload_extra_options(). The options dict should be modified in each call to remove processed fields so that at the very end of this method, any unused keys in the options dict may be logged.The
load_extra_options()is meant as a user-overwritable method so thatload_core_options()may retain core underlying base class implementation details without the risk of base class loading not being called.To keep track of every time this function is called and potentially modifying this instance an origin may be provided, noting where the change is coming from.
- load_core_options(options, origin)[source]
From
OptionLoader.load_core_options:Any processed field should be removed from the options dict, with everything else ignored. All listed options are cummulative and optional unless specified otherwise.
See
load_options()for parameters.From
Orchestrator.load_core_options():Load the provided options dict, creating any
HostorActionas necessary and recording patches.Below is the expected layout, where all fields are optional and
"<>"fields are user-specified:{ "hosts" : { "<host-name>" : { "type" : "<some_host_type>", ...host options... }, ...other host declarations... }, "actions" : { "<action-id>" : { "type" : "<some_action_type>", ...action options... }, ...other action declarations... } "patches" : { "priority" : int, "hosts" : ...same as above *except* "type"... "actions" : ...same as above *except* "type"... } }
The
"hosts"key is processed first, iterating over each"<host-name>"and its dict. Inside of this respective"<host-name>"dict, the"type"field informs which type ofHostto create. If no"type"is specified, the default isHost. The"<host-name>"is used as theHost.nameduring instantiation.Once the host instance is created, its respective dict is loaded via its own
Host.load_options(). Then the created host is added withadd_host()Next, the
"actions"key is processed in a similar fashion, except the default"type"isActionand added viaadd_action()Hint
See
search_type()for more info on how the"type"field should be specified.Finally, the
"patches"key is processed. A default priority of0is used if no priority is specified. Everything in the"patches"dict (except the"priority") is saved for later use inprocess_patches()in an internal patch priority queue. The content of this can generally be the same as when declaring"hosts"or"actions", with limitations left the type’s implementation of loading the options for which the patch would be applied to (e.g. a derivedActionmay allow more or less fields in itsload_options/load_core_options/load_extra_options). Each entry should correspond to an existing object in the workflow found inhostsoractions- objects to be patched do not need to be created via JSON config file.Hint
See
process_patches()orprocess_patch_dict()for advanced usage of patching objects, including using patch filters.Note
"type"is not a valid field in any of the"patches"sub-dicts as the options will be applied to existing object instances and"type"is only used for initial creation of objects in this method.
- search_type(type_str, noexcept=False)[source]
Match a type (as an input string) to an actualy python
typeIf at any point a search is successful, the function immediately returns the found
type.Search priority:
type_strusingpydoclocate()(effectively search current context for type of that fully qualified name )Split
type_stron last.in name and search any user-loaded module that contains the prefix for an attribute matching the suffix. If no split occurs all user modules are searched.
Valid type examples:
import sane import user_mod.nested.foo # module foo has CustomType # ... in the context of this class ... self.search_type( "sane.Action" ) self.search_type( "sane.host.Host" ) self.search_type( "user_mod.nested.foo.CustomType" ) # Using search method (2) if foo was loaded into the user modules by the workflow # since "foo" is a substring of "user_mod.nested.foo" self.search_type( "foo.CustomType" )
- process_registered()[source]
Process functions registered via
@sane.registerin priority orderAll registered functions are called in descending priority order (highest priority first), with equal priority resolved based on order of registration. The functions are called with this
Orchestratorinstance as the single argument- Return type:
None
- process_patches()[source]
Process JSON patches in priority order
Process the stored JSON patches read in from
load_config_files()after both JSON and python workflow files have been processed. First, they are sorted in priority order, highest value first. Then for each patch dictprocess_patch_dict()is called using the path of the JSON file this patch came from as the origin- Return type:
None
- process_patch_dict(origin, patch)[source]
Process an individual patch dict without priority
Find the corresponding object(s) that already exist within the
Orchestratorand call the respectiveload_options()of an object with the sub-dict as the value. The patch input argument should closely resemble the options argument inload_core_options(), with some minor caveats.Following the processing order of
load_core_options(), any patch forsane.Hostis processed first, thensane.Action.Patches are applied, for a respective attribute (
hostsoractions), either by finding a matching key in the attribute or if a patch filter for all matching keys. If no key(s) are found, the patch is not applied.When referencing an object to be patched, it must use the key for the respective attribute it is in. For hosts, it should be the
Host.nameused as a key inhosts, and for actions it should be theAction.idused as a key inactions.When referencing objects to be patched via a filter, use a Python re regex wrapped in
[].As an example of a valid patch:
{ "hosts" : { "simple_host" : { ...things to patch... } } "actions" : { "[action_00[0-5]]" : { ...things to patch for maybe 5 actions... } } }
[action_00[0-5]]is a patch filter withaction_00[0-5]as the match regex.Regardless of the patch applied or not, the effects are logged.
- Parameters:
origin (str) – Where this patch originates from, file or source code.
patch (dict) – A dict similiar to
load_core_options(), containing a collection of dicts corresponding to a patched object’sload_options()
- Return type:
None
- find_host(as_host)[source]
Finds the host to use for this workflow run
Cycle through all
Hostinhostsand check viaHost.valid_host()stopping on the first host that is valid. This then setscurrent_host.- Parameters:
as_host (str) – The preferred host name or alias to use when checking validity. If set to
None, a default will be used (seeHost.valid_host())
- construct_dag()[source]
Constructs an internal DAG using
Action.idfromactionsas nodes and graph edges fromAction.dependencies- Return type:
None
- traversal_list(action_id_list)[source]
Constructs the internal DAG and returns a traversal order consisting of { id : number of dependencies }
The traversal is a transitive reduction of the subgraph of the graph of
actions, with connectivity informed byAction.dependencies, consisting of all actions withidslisted in action_id_list and any dependencies necessary to complete the subgraph. The returned traversal can then be used to walk through the transitive reduction by extracting all ids that are zero, updating the traversal to reduce any remaining ids’ dependency count by one for each respective id removed, and repeating the process. This is facilitated internally via a DAG
- __wake__
-
A synchronization primitive for coordinating workflow execution. All queued
Actionsand the currentHostare provided a reference to this object before a workflow begins.During workflow execution (
run_actions()), theOrchestratorstarts allActionsable to run then callsthreading.Event.wait()Further workflow execution evaluation will not continue until an object not in the main thread triggers this primitive.