Action

class sane.DependencyType[source]

Types of dependencies between actions from child to parent

AFTEROK = 'afterok'

after successful run (this is the default)

AFTERNOTOK = 'afternotok'

after failure

AFTERANY = 'afterany'

after either failure or success

AFTER = 'after'

after the step starts

class sane.ActionState[source]

States that an action may be in

PENDING = 'pending'

queued for running

RUNNING = 'running'

currently executing

FINISHED = 'finished'

completed regardless of success

INACTIVE = 'inactive'

never queued or run, idle

SKIPPED = 'skipped'

never run due to dependencies missing

ERROR = 'error'

reserved for internal use, use action status for action failure

class sane.ActionStatus[source]

Final status of an action

Note

Note that the SUBMITTED status is considered “completed” in the context of action queueing, but final SUCCESS or FAILURE is left to the host implementation.

SUCCESS = 'success'

completed successfully

FAILURE = 'failure'

completed unsuccessfully

SUBMITTED = 'submitted'

submitted to host, current context action completed but execution left to host

NONE = 'none'

no completion status to report

class sane.Action[source]

Bases: SaveState, ResourceRequestor

A single task

An Action is the singular unit within workflows that performs work. Actions will always be in one of a finite set of states with an associated status.

  • Actions can specify an Environment necessary to run

  • Actions can specify resources necessry to run

  • Actions can specify dependencies necessry to run

  • Actions must have unique IDs within a workflow

  • Actions will always execute under separate processes from the sane.Orchestrator

User Interface

User Methods

__init__(id)[source]

Create an Action with unique ID

add_dependencies(*args)[source]

Add dependencies to this Action

Use this function to properly add dependencies at any time before the workflow is run. This can be called before or after Actions are added to the workflow, but not after the workflow has started running. No checks for valid graph topology, dependency existing, and so on are performed. These checks are done by the Orchestrator (see Orchestrator.construct_dag())

Any number of dependencies may be listed as either :

Note

If no DependencyType is provided, i.e. calling in manner (a), then the default is DependencyType.AFTEROK

The following code block shows a few valid example calls:

import sane

@sane.register
def register_actions( orch ):
  a = sane.Action( "a" )
  b = sane.Action( "bee" )
  c = sane.Action( "c" )

  orch.add_action( a )
  # The provided ID string MUST match the Action.id, not the object variable name
  #                    vvv
  a.add_dependencies( "bee", ( "c", "afterok" ) )
  b.add_dependencies( ( "c", sane.DependencyType.AFTEROK ) )
  # This is a valid call but does not form a valid DAG
  c.add_dependencies( "a" )
Parameters:

args (list[ str | tuple[ str, str | DependencyType ] ]) – variable length argument list with each entry corresponding to a dependency

Return type:

None

add_resource_requirements(resource_dict)[source]

Add resource requirements to this requestor

Hint

See Resource for syntax on default supported values

Add an arbitrary dict of key-value pairs to this requestor. The key-value pairs in this dict will eventually be requested from a ResourceRequestor for:

If the value in a key-value pair is of type dict, it will be considered an override resource request specific to the name of the key. This override dict will be kept separately in an internal location to be used later. Multiple nested dict overrides are not allowed.

See resources() for more info.

As an example:

{
  "cpus"  : 12,
  "mem"   : "1gb",
  "slots" : 1
  "specific_provider" :
  {
    "cpus" : 36,
    "mem"  : "3gb"
  }
}

The values of "specific_provider" will only be used (in addition to any unmodified values in the top-level dict) if the current_host matches this key.

Note

While the resource_dict can be arbitrary values, it is on the ResourceProvider (i.e. sane.Host) to be able to provide these resources.

Notably, any resources that do not follow the Resource syntax are left strictly to the provider class implementation. The default classes do not support values outside of Resource unless specified.

Parameters:

resource_dict (dict)

User Attributes & Properties

config: dict

A user-defined dict that can hold anything picklable

Automatic config loading and dereferencing is supported for this attribute. This is meant as a general information container without the need for defining a custom Action with custom attributes.

Reserved keywords in this dict are:

keyword

usage

"command"

command to execute when Action is run

"arguments"

arguments to use in command execution

If a custom Action overrides run(), then there are no reserved keywords as the above keywords are specific to the default base class run() implementation.

outputs: dict

A user-provided dict of picklable output values that should be valid upon Action completion

These outputs will be provided to direct dependencies of this Action within their dependencies property during execution. Any further use of this attribute is left to user implementation.

environment: str

The Environment.name of the Environment to use that the current host should provide.

property host_info: dict

Info dict provided from the current_host using the Host.info

property info: dict

Action info dict (dereferenced) provided to direct dependencies via dependencies at runtime

The default info dict provided is:

{
  "config"  : config,
  "outputs" : outputs
}
property id: str

The unique ID of this Action.

property dependencies: dict

A copy of the internal dependencies dict with relevant info about these direct dependencies

The workflow uses a \(child \rightarrow N parents\) relation, where an Action records which parent Actions it is dependent on.

Prior to workflow runtime, this dict contains parent Action dependencies by id and their corresponding DependencyType as "dep_type" in a sub-dict:

{
  "<parent-id-a>" : { "dep_type" : DependencyType },
  "<parent-id-b>" : { "dep_type" : DependencyType },
  ...
}

During runtime, this dict is modified to include the parent Action.info within the sub-dict. By default this would then allow access to dependencies’ outputs and config:

{
  "<parent-id-a>" :
  {
    "dep_type" : DependencyType,
    # The exact contents below come from info()
    "config"  : config,
    "outputs" : outputs
  },
  ...
}
resources(override=None)[source]

Return a copy of the current resources requested

During workflow execution, the current_host will be provided as the override value when requesting resources from the sane.Host via resources.ResourceProvider.acquire_resources()

Parameters:

override (str) – If an override dict exists in the resources, return a copy of the resources recursively updated to prioritize the overriden values.

Return type:

dict

local

Control if the resources requested should be provided from a local pool if a NonLocalProvider is used. If set to None then resource delegation is left to the provider, True to force local, and False to force non-local. If the provider is a normal ResourceProvider this option has no effect.

Customizable Functions

Action and Host classes are designed to be modified by users.

While any function could generally be overwritten with some care, certain functions within each class are designed specifically for user modification without the need to worry about internal logic.

Defining these functions does not require calling super or other intrinsic Python knowledge beyond the interface of the function and any user logic.

extra_requirements_met(dependency_actions)[source]

Check if any extra user-imposed requirements are met

The return value may be True to signify this Action is ready to run, or False to note that a requirement was not satisfied and this action would not be able to run and thus be SKIPPED. Default is to always return True

Caution

For delayed requirements that may be fulfllled at a later time (e.g. non-local host evaluation) refer to the source code for using sane.action.RequirementState.PENDING. If used, an external entity (often a non-local host) MUST retrigger Orchestrator.__wake__ when this requirement may be affected to ensure the workflow does not silently hang with the Action never running.

Parameters:

dependency_actions (dict[str, Action]) – all Action this instance is dependent on

Return type:

bool

load_extra_options(options, origin=None)[source]

Load any extra options after load_core_options().

Any processed field should be removed from the options dict, with everything else ignored.

See load_options() for parameters.

Parameters:
pre_launch()[source]

Called before save() and execution of action_launcher.py. See launch()

Return type:

None

pre_run()[source]

Called just before run() within the context of the Action subprocess

Return type:

None

run()[source]

The main execution of the Action, performed within an isolated subprocess

This function will be called from within action_launcher.py under an isolated subprocess with the requested environment already setup().

Users may override this function in a derived Action as primary way to change how an Action runs. The default implementation of this function fully dereferences the config dict and executes config["command"] and config["arguments"] in another subprocess via execute_subprocess().

Hint

The subprocess of running config["command"] in the default implementation of this function will inherit the Environment.setup() settings. See subprocess.Popen env=None for more info.

Returns:

The return value of running this action. Used by action_launcher.py as the exit() code. Thus, anything aside from 0 is FAILURE. The default returns the return value of execute_subprocess() from running config["command"]

Return type:

int

post_run(retval)[source]

Called just after run() within the context of the Action subprocess

Return type:

None

post_launch(retval, content)[source]

Called after execution of action_launcher.py with the output of execute_subprocess(). See launch()

Returns:

If return is False, Action is assumed to have a FAILURE

Return type:

bool

Helper Functions

resolve_path(input_path, base_path=None)[source]

Reslove a path using base path if input path is relative, otherwise only use input path

Parameters:
  • input_path (str) – relative or absolute path

  • base_path (str) – base path to evaluate relative paths from

Returns:

absolute path of input_path

Return type:

str

resolve_path_exists(input_path, base_path=None, allow_dry_run=True)[source]

Wrapper on resolve_path() to also check if that directory exists, throwing exception if not

Parameters:
  • allow_dry_run (bool) – during dry runs do not fail if resolved path does not exist

  • input_path (str)

  • base_path (str)

Return type:

str

file_exists_in_path(input_path, file, allow_dry_run=True)[source]

Check if a specified file exists within a provided path

Parameters:
  • input_path (str) – path to check for file evaluated from current working directory

  • file (str) – regular file to check for

  • allow_dry_run (bool) – during dry runs do not fail if resolved path does not exist

dereference_str(input_str, log=True, noexcept=False)[source]

Dereference an input string using GitHub Actions style syntax scoped to the current object

Continuously dereferences strings within the current object until no more substitutions can be made. All attributes and properties can be referenced, but dereferencing will work best with attributes that are dict, list, str, or int values.

Nested referencing can be achieved with . operator (key as next field)
Index referencing can be achieved with [] operator (positive integer)

Valid syntax examples:

action_a = sane.Action( "action_a" )
action_b = sane.Action( "action_b" )
action_a.add_dependencies( "action_b" )

action_a.config["foo"] = "1"
action_a.config["bar"] = "2"
action_a.config["zoo"] = [ 3, 4 ]
action_a.config["boo"] = 7
action_a.config["moo"] = { "loo" : [ { "goo" : "5", "hoo" : [ 6, "${{ config.boo" }} ] }, 0, 0, 0 ] }
action_a.outputs["outfile"] = "something"
action_a.add_resource_requirements( { "cpus" : 12, "mem" : "1gb" } )

action_b.outputs["some_file"] = "fill_this_in"

# Within the context of action_a these are valid
"${{ config.foo }}"       => "1"
"${{ config.bar }}"       => "2"
"${{ config.zoo[1] }}"    => "4"
"${{ resources.cpus }}"   => "12"
"${{ outputs.outfile }}"  => "something"
# At runtime this would be valid, with this value if the value in action_b has not changed
"${{ dependencies.action_b.outputs.some_file }}" => "fill_this_in"

# A complex dereference
"${{ config.moo.loo[0].hoo[1] }}" => "7"

Attention

During the substitution, if indexing to the next attribute yields None an Exception will be thrown. Thus, at the time of dereferencing, the string input MUST be valid.

Parameters:
  • log – enable logging

  • noexcept – disable exceptions and instead allow failed dereference

  • input_str (str)

Returns:

string fully dereferenced

Return type:

str

dereference(obj, log=True, noexcept=False)[source]

Fully dereference all strings within the obj passed in

For dict and list objects, each will be iterated over and this function will be recursively call for each iterated value (not key), and then assigned back to itself, presumably modified. For str objects dereference_str() will be called.

For all other object types, the obj will be unmodified.

The obj is then returned, modified if necessary (and possible) to contain only fully dereferenced strings.

Returns:

obj with all strings dereferenced

Internal API

The following documentation is provided for advanced use in the creation of custom Actions.

__timestamp__

The start time of the Action.launch() in ISO format

__time__
dry_run

Stub launch() to skip execution of run()

property logfile: str

Full recording of logs for this action

property runlog: str

Absolute path to logfile capturing this Action.run() output

property origins

A copy of all the workflow file paths (and line numbers if applicable) that formed this object

property status: ActionStatus

The current ActionStatus of this Action.

property state: ActionState

The current ActionState of this Action.

property results: dict

Default results to be saved in the workflow cache returned as a dict

By default the following is returned

{
  "state"     : state, (string repr)
  "status"    : status, (string repr)
  "origins"   : origins,
  "outputs"   : outputs, (dereferenced)
  "timestamp" : __timestamp__, (if available)
  "time"      : __time__, (if available)
}

When set, the provided dict should match the above format

load_options(options, origin=None)[source]

Base class implementation for loading of dict-based attributes into instance

Take a options dict of relevant attributes and load them via load_core_options() then load_extra_options(). The options dict should be modified in each call to remove processed fields so that at the very end of this method, any unused keys in the options dict may be logged.

The load_extra_options() is meant as a user-overwritable method so that load_core_options() may retain core underlying base class implementation details without the risk of base class loading not being called.

To keep track of every time this function is called and potentially modifying this instance an origin may be provided, noting where the change is coming from.

Parameters:
  • options (dict) –

    A dict of class-specific attributes.

    Important

    The options dict is modified such that only unused values are left in it at the end of this method

  • origin (str) – A string identifier of where this load is coming from

load_core_options(options, origin)[source]

From OptionLoader.load_core_options:

Any processed field should be removed from the options dict, with everything else ignored. All listed options are cummulative and optional unless specified otherwise.

See load_options() for parameters.

From Action.load_core_options():

Load Action settings from the provided options dict, all keys are optional.

The following keys are loaded verbatim into their respective attribute:

  • "environment" => environment

  • "working_directory" => working_directory

The following key is loaded and calls recursive_update() preserve any unmodified existing values:

The following key is loaded directly to add_dependencies() as key-value tuple pairs via dict.items()

  • "dependencies"

An example options dict

{
  "environment" : "gnu",
  # Recall that config is a generic dict
  "config"      : { "foo" : [ 1, 2, 3 ], "bar" : "file" },
  # if loading via plain-text (e.g. JSON), use the text value
  # of DependencyType as noted in add_dependencies()
  "dependencies" : { "action_b" : "afterok", "action_c" : "afternotok" }
}

From ResourceRequestor.load_core_options:

Load ResourceRequestor resource requirements

The following key is loaded verbatim into add_resource_requirements():

  • "resources"

The following key is loaded if possible, defaulting to None:

execute_subprocess(cmd, arguments=None, logfile=None, verbose=False, dry_run=False, capture=False, shell=False, log_level=19)[source]

Execution wrapper for running a command using subprocess.Popen

Notably, this wrapper handles:

  • command and argument aggregation

  • internal, logfile, and verbose logging with flushing in realtime

  • wrapping of stdout using internal log levels

  • returning return value along with captured stdout (if capture enabled)

Prefer using this function if stdout log wrapping is desired. If none of the features above are of use, any subprocess function call can be used, but may or may not appear in the log output as desired. All output is logged, however.

Parameters:
  • cmd (str) – command to execute

  • arguments (list) – list of arguments, will be str cast

  • logfile (str) – optional logfile to write command output to

  • verbose – optional output the command stdout and stderr to terminal

  • dry_run – optional do not call subprocess.Popen, i.e. stub this call

  • capture – optional capture stderr/stdout to str return

  • shell – optional treat execution as shell command (see subprocess.Popen for more detail)

Returns:

tuple of execution return value and any stderr/stdout capture

Return type:

tuple[int, str]

launch(working_directory, launch_wrapper=None)[source]

Main entry point for executing an Action within a workflow

Coordinates the current state and status of the action whilst executing, prepares the Action for action_launcher.py execution proxy, and adjust call to execute_subprocess() if a launch_wrapper is provided.

__timestamp__ and total execution __time__ are recorded immediately at the beginning and end of this function, not just around the execute_subprocess() call.

The order of operations, as they concern the user, are:

  1. pre_launch() (shared Action mutex locked around this call)

  2. save()

  3. resolve internal launch command (action_launcher.py) and launch_wrapper

  4. execute_subprocess() of resolved command, capturing to runlog using dry_run if set

  5. final state and status recorded

  6. post_launch() called with output of (4) (shared Action mutex locked around this call)

  7. __orch_wake__() the Orchestrator

  8. return output of (4)

If an unexpected Exception occurs at any point outside of the subprocess.Popen call within execute_subprocess(), the following occurs:

  1. set_state_error() called

  2. shared Action mutex force unlocked

  3. __orch_wake__() the Orchestrator

  4. re-raise the same Exception

Danger

Avoid modifying or overriding this function in any derived Action

Parameters:
  • working_directory (str) – Base directory to evaluate working_directory from. Final resolved path is used to evaluate the internal _launch_cmd or launch_wrapper command, if provided.

  • launch_wrapper (tuple[str, list[str]]) – tuple pair of command and list of arguments to use as a prefix to execute_subprocess() where this command is the new command and the arguments, then previous command and arguments are provided in that order.

Returns:

tuple of execute_subprocess() of this Action’s run() (or launch_wrapper output if provided)

Return type:

tuple[int,str]

set_status_success()[source]

Quickset state to ActionState.FINISHED and status to ActionStatus.SUCCESS

Typically set by the launch() after Action completion.

Return type:

None

set_status_failure()[source]

Quickset state to ActionState.FINISHED and status to ActionStatus.FAILURE

Typically set by the launch() after Action completion.

Return type:

None

set_state_pending()[source]

Quickset state to ActionState.PENDING and status to ActionStatus.NONE

Typically set by the Orchestrator during workflow startup

Return type:

None

set_state_skipped()[source]

Quickset state to ActionState.SKIPPED and status to ActionStatus.NONE

Set by the Orchestrator during workflow running if deemed necessary

Return type:

None

set_state_error()[source]

Internal, only to report SANE errors

Return type:

None

__orch_wake__()[source]

Wake up the Orchestrator from another thread.

This should be used as an event trigger to induce re-evaluation of completed Actions in the current workflow run. See Orchestrator.__wake__ for more info.

Return type:

None