Orchestrator

@sane.register(f, priority=0)[source]

Adds a Python callable to the list of registered functions in sane

Any callable Python object which accepts Orchestrator as the first positional argument may be registered. This is the primary way to have sane directly call Python code within a workflow. The aggregate list will then be invoked by an Orchestrator instance.

A priority can optionally be associated with this registration, corresponding to precedence in invocations. Priorities are handled in descending order, i.e. highest priority first. Equal priorities are evaluated in order of registration order.

See Orchestrator.process_registered() for more info.

The decorator may be called with no priority, in which case the default is 0.

Example:

import sane

@sane.register
def last( orch ):
  # defaul priority is 0
  pass

@sane.register( priority=5 )
def second( orch )
  pass

@sane.register( 99 )
def first( orch ):
  pass
Parameters:
  • f (Callable[[Orchestrator], None]) – Callable to register for future use when an Orchestrator instance loads the workflow. The calling instance will pass itself as the single positional argument to the registered callable.

  • priority (int)

class sane.Orchestrator[source]

Bases: OptionLoader

Workflow controller containing all hosts and actions

The Orchestrator serves as the main entry point for constructing, managing, and executing workflows. It uses a simple DAG (using Action IDs) to orchestrate action scheduling.

The Orchestrator is also responsible for any intercommunication between hosts and actions, as both actions and hosts are (generally) unaware of each other. Additionally, the Orchestrator catalogues cummulative workflow runs to provide a final state of all actions, caching results between runs if not cleared.

User Interface

User Methods

add_action(action)[source]

Adds an action to actions, using the action.id as the key

Parameters:

action (Action)

Return type:

None

add_host(host)[source]

Adds a host to hosts, using the host.name as the key

Parameters:

host (Host)

Return type:

None

User Attributes & Properties

actions: utdict.UniqueTypedDict[Action]

The unique set of actions in this workflow.

The actions are stored in a unique-key type-enforced dictionary. Only Action instance objects may be stored (derived types valid)

hosts: utdict.UniqueTypedDict[Host]

The unique set of hosts in this workflow.

The hosts are stored in a unique-key type-enforced dictionary. Only Host instance objects may be stored (derived types valid)

property save_location: str

The directory used for saving any intermediary SaveState or workflow cache

The provided path does not need to exist yet, but must exist in a location the user of the workflow has adequate permissions.

property log_location: str

The directory used for saving out any workflow log output and final results

The provided path does not need to exist yet, but must exist in a location the user of the workflow has adequate permissions. DO NOT change this value once Action have been added.

property working_directory: str

The directory from which all paths and commands are evaluated from

Internal API

While not to be invoked by the user, these functions may be useful reference.

__init__()[source]
property current_host: str

Returns the current host name (key for hosts) for this current workflow run

This value is only valid after find_host() has been called. For normal users, this would be valid during run_actions().

property save_file: str

Absolute path to workflow cache save file, cannot be set

property results_file: str

Absolute path to final workflow results file, cannot be set

run_actions(action_id_list, as_host=None, continue_on_err=True, visualize=False)[source]

Run the workflow for the provided action id list and any dependencies

Parameters:
  • action_id_list (List[str]) – A list of specifically requested ids from actions to run.

  • as_host (str) – The preferred host name or alias to run as, if provided.

  • continue_on_err (bool) – Continue workflow evaluation as best as possible even if an Action encounters an error.

  • visualize (bool) – Print out a CLI-friendly rendition of the dependency graph of actions to be run.

add_search_paths(search_paths)[source]

Add a series of paths to search for workflow files. Cannot be used after load_paths() has been called

Parameters:

search_paths (list[str]) – Paths to add to workflow search and later sys.path

Return type:

None

add_search_patterns(search_patterns)[source]

Add a series of Python re strings as filters for finding workflow files. Cannot be used after load_paths() has been called

Parameters:
  • search_pattern (list[str]) – regular expressions to filter filenames for when searching for workflow files

  • search_patterns (List[str])

Return type:

None

load_paths()[source]

Load workflow definitions from current search paths and filters

This is the primary load call after all necessary paths and filters have been set. The order of operations is as follows:

  1. Add all search paths to sys.path

  2. All valid files matching at least one search filter across all paths are gathered.

  3. Files are sorted based on file extension into .py and .json[c]

  4. All .py files are loaded via load_py_files()

  5. All registered calls (via @sane.register) are invoked in priority order via process_registered()

  6. All .json[c] files are then loaded via load_config_files() (.json first, then .jsonc)

  7. All patches are processed in priority order via process_patches()

Return type:

None

load_py_files(files)[source]

Load the provided list of python files as modules dynamically

Files are evaluated relative to the first path that yields this file from the set of search paths added via add_search_paths().

An effective module name is generated from the relative path to the file from the respective path. This module name is then dynamically imported using importlib.import_module(), relying on the fact that load_paths() has added the search paths to sys.path.

Important

For workflows that use Python files with helper functions, classes, etc. in files separate from where a @sane.register occurs this means that the provided search paths for this workflow can be treated as top-level searchable directories within your workflow’s Python code.

For instance consider the following layout:

project/
├── .sane
│   ├── helpers
│   │   ├── custom_action.py
│   │   └── custom_host.py
│   └── tests
│       └── workflow_a.py
└── src
The workflow may be invoked using:
sane_runner -p .sane -a my_action -r

Where my_action is defined in .sane/tests/workflow_a.py:

import sane
import helpers.custom_action  #< Relative to .sane 

@sane.register
def workflow_a( orch ):
  orch.add_action( helpers.custom_action.MyAction( "a" ) )

And helpers.custom_action.MyAction is defined in .sane/helpers/custom_action.py:

import sane

class MyAction( sane.Action ):
  def __init__( self, id ):
    super().__init__( id )
  # ... implementation ...

Since .sane is provided as a search path (and thus added to sys.path), we can treat the import of other modules within our search path as relative to it.

Parameters:

files (List[str])

load_config_files(files)[source]

Load the provided list of files as JSON files (JSON with //-style comments allowed) and call load_options() for each.

See load_core_options() for class-specific load implementation.

Parameters:

files (List[str])

load_options(options, origin=None)[source]

Base class implementation for loading of dict-based attributes into instance

Take a options dict of relevant attributes and load them via load_core_options() then load_extra_options(). The options dict should be modified in each call to remove processed fields so that at the very end of this method, any unused keys in the options dict may be logged.

The load_extra_options() is meant as a user-overwritable method so that load_core_options() may retain core underlying base class implementation details without the risk of base class loading not being called.

To keep track of every time this function is called and potentially modifying this instance an origin may be provided, noting where the change is coming from.

Parameters:
  • options (dict) –

    A dict of class-specific attributes.

    Important

    The options dict is modified such that only unused values are left in it at the end of this method

  • origin (str) – A string identifier of where this load is coming from

load_core_options(options, origin)[source]

From OptionLoader.load_core_options:

Any processed field should be removed from the options dict, with everything else ignored. All listed options are cummulative and optional unless specified otherwise.

See load_options() for parameters.

From Orchestrator.load_core_options():

Load the provided options dict, creating any Host or Action as necessary and recording patches.

Below is the expected layout, where all fields are optional and "<>" fields are user-specified:

{
  "hosts" :
  {
    "<host-name>" : { "type" : "<some_host_type>", ...host options... },
    ...other host declarations...
  },
  "actions" :
  {
    "<action-id>" : { "type" : "<some_action_type>", ...action options... },
    ...other action declarations...
  }
  "patches" :
  {
    "priority" : int,
    "hosts"   : ...same as above *except* "type"...
    "actions" : ...same as above *except* "type"...
  }
}

The "hosts" key is processed first, iterating over each "<host-name>" and its dict. Inside of this respective "<host-name>" dict, the "type" field informs which type of Host to create. If no "type" is specified, the default is Host. The "<host-name>" is used as the Host.name during instantiation.

Once the host instance is created, its respective dict is loaded via its own Host.load_options(). Then the created host is added with add_host()

Next, the "actions" key is processed in a similar fashion, except the default "type" is Action and added via add_action()

Hint

See search_type() for more info on how the "type" field should be specified.

Finally, the "patches" key is processed. A default priority of 0 is used if no priority is specified. Everything in the "patches" dict (except the "priority") is saved for later use in process_patches() in an internal patch priority queue. The content of this can generally be the same as when declaring "hosts" or "actions", with limitations left the type’s implementation of loading the options for which the patch would be applied to (e.g. a derived Action may allow more or less fields in its load_options/load_core_options/load_extra_options). Each entry should correspond to an existing object in the workflow found in hosts or actions - objects to be patched do not need to be created via JSON config file.

Hint

See process_patches() or process_patch_dict() for advanced usage of patching objects, including using patch filters.

Note

"type" is not a valid field in any of the "patches" sub-dicts as the options will be applied to existing object instances and "type" is only used for initial creation of objects in this method.

Parameters:
search_type(type_str, noexcept=False)[source]

Match a type (as an input string) to an actualy python type

If at any point a search is successful, the function immediately returns the found type.

Search priority:

  1. type_str using pydoc locate() (effectively search current context for type of that fully qualified name )

  2. Split type_str on last . in name and search any user-loaded module that contains the prefix for an attribute matching the suffix. If no split occurs all user modules are searched.

Valid type examples:

import sane
import user_mod.nested.foo # module foo has CustomType

# ... in the context of this class ...
self.search_type( "sane.Action" )
self.search_type( "sane.host.Host" )
self.search_type( "user_mod.nested.foo.CustomType" )
# Using search method (2) if foo was loaded into the user modules by the workflow
# since "foo" is a substring of "user_mod.nested.foo"
self.search_type( "foo.CustomType" )
Returns:

type corresponding to the type_str

Parameters:

type_str (str)

process_registered()[source]

Process functions registered via @sane.register in priority order

All registered functions are called in descending priority order (highest priority first), with equal priority resolved based on order of registration. The functions are called with this Orchestrator instance as the single argument

Return type:

None

process_patches()[source]

Process JSON patches in priority order

Process the stored JSON patches read in from load_config_files() after both JSON and python workflow files have been processed. First, they are sorted in priority order, highest value first. Then for each patch dict process_patch_dict() is called using the path of the JSON file this patch came from as the origin

Return type:

None

process_patch_dict(origin, patch)[source]

Process an individual patch dict without priority

Find the corresponding object(s) that already exist within the Orchestrator and call the respective load_options() of an object with the sub-dict as the value. The patch input argument should closely resemble the options argument in load_core_options(), with some minor caveats.

Following the processing order of load_core_options(), any patch for sane.Host is processed first, then sane.Action.

Patches are applied, for a respective attribute (hosts or actions), either by finding a matching key in the attribute or if a patch filter for all matching keys. If no key(s) are found, the patch is not applied.

When referencing an object to be patched, it must use the key for the respective attribute it is in. For hosts, it should be the Host.name used as a key in hosts, and for actions it should be the Action.id used as a key in actions.

When referencing objects to be patched via a filter, use a Python re regex wrapped in [].

As an example of a valid patch:

{
  "hosts" :
  {
    "simple_host" : { ...things to patch... }
  }
  "actions" :
  {
    "[action_00[0-5]]" : { ...things to patch for maybe 5 actions... }
  }
}

[action_00[0-5]] is a patch filter with action_00[0-5] as the match regex.

Regardless of the patch applied or not, the effects are logged.

Parameters:
  • origin (str) – Where this patch originates from, file or source code.

  • patch (dict) – A dict similiar to load_core_options(), containing a collection of dicts corresponding to a patched object’s load_options()

Return type:

None

find_host(as_host)[source]

Finds the host to use for this workflow run

Cycle through all Host in hosts and check via Host.valid_host() stopping on the first host that is valid. This then sets current_host.

Parameters:

as_host (str) – The preferred host name or alias to use when checking validity. If set to None, a default will be used (see Host.valid_host())

construct_dag()[source]

Constructs an internal DAG using Action.id from actions as nodes and graph edges from Action.dependencies

Return type:

None

traversal_list(action_id_list)[source]

Constructs the internal DAG and returns a traversal order consisting of { id : number of dependencies }

The traversal is a transitive reduction of the subgraph of the graph of actions, with connectivity informed by Action.dependencies, consisting of all actions with ids listed in action_id_list and any dependencies necessary to complete the subgraph. The returned traversal can then be used to walk through the transitive reduction by extracting all ids that are zero, updating the traversal to reduce any remaining ids’ dependency count by one for each respective id removed, and repeating the process. This is facilitated internally via a DAG

Parameters:

action_id_list (list[str]) – A list of Action.id to traverse to

Returns:

A dict of { Action.id : number of dependencies }

Return type:

dict[str, int]

__wake__

threading.Event

A synchronization primitive for coordinating workflow execution. All queued Actions and the current Host are provided a reference to this object before a workflow begins.

During workflow execution (run_actions()), the Orchestrator starts all Actions able to run then calls threading.Event.wait() Further workflow execution evaluation will not continue until an object not in the main thread triggers this primitive.