Custom Actions
This guide explains how to derive your own workflow actions from sane.Action,
how to make them configurable from both Python and JSON, and how the SANE runtime executes them.
This material is intended to be a standalone advanced user guide. If you want more introductory coverage of Python or JSON workflows, see the Python Interfacing and JSON Interface sections.
Note
Users should aim to never directly manage or change Action.state,
Action.status, or anything within the Action Internal API.
These attributes and methods are provided for advanced usage far beyond normal
custom workflow classes. If you find yourself using them beyond read-only,
consider the design of your custom Action and workflow.
Warning
Users should NEVER try to manage host-managed resources or dependency completion
& state. It is the responsibility of the Host and Orchestrator
to guarantee the correctness of each, respectively.
Why derive sane.Action?
Use a custom sane.Action subclass when the default action behavior
is not enough for your workflow. In SANE, the default run()
executes a command from config using
execute_subprocess(), but a custom class can instead:
implement domain-specific logic in
run()generate structured
outputsexpose new top-level options through
load_extra_options()manage more complex setup, resources usage, and host-specific information
A derived action is still a workflow object, so it must be added to the
sane.Orchestrator to participate in the workflow.
Tyically Overriden Methods
For most custom Action classes, the common extension points are those
listed under Customizable Functions in the Action User Interface:
Danger
Do not override launch(). It contains the workflow
framework for subprocess execution, logging, state tracking, and Orchestrator
wake-up.
The typical pattern is:
Create a subclass of
sane.Action.Add any custom attributes in
__init__().Override
load_extra_options()to support custom JSON/Python options.Override
run()to perform your work.Add an instance of the new Action type to the
Orchestrator.
Using Custom Action
The following example shows a minimal custom action that writes a message to standard output multiple times.
import sane
class RepeatMessageAction( sane.Action ):
def __init__( self, id ):
super().__init__( id )
self.message = "Hello"
self.count = 1
def load_extra_options( self, options, origin ):
self.message = options.pop( "message", self.message )
self.count = options.pop( "count", self.count )
super().load_extra_options( options, origin )
def run( self ) -> int:
for i in range( self.count ):
self.log( f"[{self.id}] {self.message}" )
self.outputs[ "lines_written" ] = self.count
# This is the exit code of the subprocess
return 0
Key things to note in the above example:
RepeatMessageActioninherits fromsane.Action.load_extra_optionsconsumes custom keys from theoptionsdictionary.super().load_extra_options(options, origin)preserves the base class option loading behavior.runis the actual place where the action does work.self.outputscan be used to publish results for later dependencies.
Using a custom action from Python
A custom action may be configured directly from Python without JSON. The workflow
merely needs to instantiate the class, call load_options()
or otherwise modify the instance, and add the action to the Orchestrator.
import sane
@sane.register
def workflow( orch ):
action = RepeatMessageAction( "repeat_message" )
action.load_options(
{
"message" : "SANE is running custom actions",
"count" : 3
}
)
orch.add_action( action )
Using a custom action from JSON
Custom action subclasses can also be instantiated from JSON configuration.
SANE resolves the "type" field using search_type():
{
"actions":
{
"repeat_message":
{
"type" : "my_project.actions.RepeatMessageAction",
"environment" : "gnu",
"message" : "custom action via json",
"count" : 4,
"dependencies" : { "prepare": "afterok" }
}
}
}
Advanced runtime hooks
SANE separates workflow launch-time behavior from subprocess runtime behavior. Action execution is divided into two phases:
- Per-action lifecycle methods executed in the orchestrator context
called in the main process around
launch()useful for saving metadata or validating the action before the subprocess starts
- Per-action execution methods executed in a separate subprocess context
called inside the isolated action subprocess around
run()useful for Action-subprocess scoped work such as preparing a temporary workspace or post processing data outside the run method.
The following diagram illustrates the SANE workflow execution model. A single host executes its runtime hook methods once per workflow, while multiple actions each independently execute their runtime hook methods.
![digraph sane_workflow_lifecycle {
rankdir=TB;
node [shape=box, style=rounded];
ranksep=.25;
load [label="Load Workflow JSON"];
build [label="Instantiate Hosts & Actions"];
resolve [label="Resolve Dependency Graph"];
hpre [label="Host.pre_launch()"];
hpost [label="Host.post_launch()"];
load -> build -> resolve -> hpre;
subgraph cluster_actions {
labeljust="l"
label="Per-Action Lifecycle\n(Parallel for Each Action)";
style=dashed;
subgraph cluster_action_a {
labeljust="r"
label="Action A Lifecycle";
style=rounded;
a_preL [label="Action A: pre_launch()"];
a_postL [label="Action A: post_launch()"];
subgraph cluster_action_a_exec {
label="Execution Phase (Subprocess Context)";
style=dotted;
fontsize=10;
a_pre [label="pre_run()"];
a_run [label="run()"];
a_post [label="post_run()"];
a_pre -> a_run -> a_post;
}
a_preL -> a_pre;
a_post -> a_postL;
}
subgraph cluster_action_b {
labeljust="l"
label="Action B Lifecycle";
style=rounded;
b_preL [label="Action B: pre_launch()"];
b_postL [label="Action B: post_launch()"];
subgraph cluster_action_b_exec {
label="Execution Phase (Subprocess Context)";
style=dotted;
fontsize=10;
b_pre [label="pre_run()"];
b_run [label="run()"];
b_post [label="post_run()"];
b_pre -> b_run -> b_post;
}
b_preL -> b_pre;
b_post -> b_postL;
}
}
hpre -> a_preL;
hpre -> b_preL;
a_postL -> hpost;
b_postL -> hpost;
}](../../_images/graphviz-e47c7358f0e990e1bedc1c6bf79df349d70c77f9.png)
Hint
The shared Action mutex is held during *_launch() calls, so other
running Action objects will not interfere.
Execution phases and contexts
The workflow execution follows a defined lifecycle. Each action participates in two phases:
- Orchestrator Phase (main workflow process):
The
Orchestratorcallspre_launch(), then spawns a new subprocess to execute the action, and finally callspost_launch()with the subprocess results. TheOrchestratorholds the action state and may manage multiple actions in parallel.- Subprocess Phase (isolated action process):
The action subprocess calls
pre_run(), thenrun(), and finallypost_run(). TheEnvironmentis already set up at this point, and theActionhas access to its own isolated runtime and environment variables.
Runtime Hook Examples
- pre_launch
Called before the action subprocess starts. Use this to:
Validate inputs or configuration before spending resources on execution
Prepare existing data files or temporary directories
Log metadata about the action
Example: validation in pre_launch
Here, pre_launch validates that required input files exist before the
subprocess is spawned:
class FileProcessorAction( sane.Action ):
def __init__( self, id ):
super().__init__( id )
self.input_file = None
def load_extra_options( self, options, origin ):
self.input_file = options.pop( "input_file", self.input_file )
super().load_extra_options( options, origin )
def pre_launch( self ):
"""Validate input file exists before launching subprocess"""
import os
if self.input_file is None:
self.log( "No input_file specified", level=40 )
return False
if not os.path.isfile( self.input_file ):
self.log( f"Input file not found: {self.input_file}", level=40 )
return False
self.log( f"Input file validated: {self.input_file}", level=20 )
return True
def run( self ) -> int:
# At this point, we know the file exists
with open( self.input_file, 'r' ) as f:
lines = len( f.readlines() )
self.outputs[ "lines" ] = lines
return 0
- post_launch
Called after the action subprocess finishes with the return code and captured output. Use this to:
Process or summarize action results further
Save intermediate outputs to shared storage
Clean up non-host-managed resources allocated in
pre_launchHandle errors and decide whether to mark the action as success or failure beyond exit code from run
Return
Falsefrompost_launchto mark the action as failed.
Example: result processing in post_launch
Here, post_launch processes the subprocess output and saves a summary:
class TestAction( sane.Action ):
def run( self ) -> int:
# Use *_exec_raw() functions to allow subprocess STDOUT to be logged with
# timestamps + context or if preferred just raw output
self.push_exec_raw( False )
# Run tests and capture stdout
retval, content = self.execute_subprocess(
"python", [ "-m", "pytest", "--tb=short", "tests/" ],
capture=True,
verbose=True
)
self.pop_exec_raw()
self.outputs[ "test_output" ] = content
return retval
def post_launch( self, retval, content ):
"""Process test results and save summary"""
if retval == 0:
self.log( "All tests passed", level=20 )
self.outputs[ "summary" ] = "PASS"
else:
self.log( "Some tests failed", level=40 )
self.outputs[ "summary" ] = "FAIL"
# You could write a report here beyond the simple XML/JSON/CLI reports SANE provides
return True # Mark action as success even on test failure
- pre_run
Called within the subprocess before
run(). Use this to:Set up the subprocess environment (e.g., create temp directories)
Source additional environment scripts
Validate the runtime environment
- post_run
Called within the subprocess after
run()with the return code. Use this to:Clean up temporary files created in
pre_runProcess output captured by
runLog final results
Example: environment setup in pre_run / post_run
Here, pre_run sets up a temporary working directory within the subprocess -
note that these methods all execute in the same context one immediately after the other:
class ScratchWorkAction( sane.Action ):
def pre_run( self ):
"""Create a temporary scratch directory for this action"""
import tempfile
import os
# Assigning a new variable at this scope for run() is okay because
# this is executing within the same process scope
self.scratch = tempfile.mkdtemp( prefix=self.id )
self.log( f"Created scratch directory: {self.scratch}" )
def run( self ) -> int:
# self.scratch was setup in pre_run()
work_file = os.path.join( self.scratch, "intermediate.txt" )
# Use scratch directory for temp work
with open( work_file, 'w' ) as f:
f.write( "intermediate results" )
self.outputs[ "work_file" ] = work_file
return 0
def post_run( self, retval ):
"""Clean up the scratch directory"""
import shutil
import os
if hasattr( self, "scratch" ) and os.path.isdir( self.scratch ):
shutil.rmtree( self.scratch )
self.log( f"Cleaned up scratch directory: {self.scratch}" )
Dependency and output access
At runtime, the Action dependencies are available through the
dependencies property. Each dependency entry contains the
parent Action’s info().
This is useful when implementing Actions that consume the results of prior steps. The default dereferencing syntax also supports expressions such as:
${{ dependencies.prepare.outputs.some_file }}${{ dependencies.prepare.config.foo }}
This can be extremely useful when chaining custom Action classes together
to consume the values of predecessors.
Important
Any changes that you make to the Action object within any of the
*run() methods will not be visible in the main process (and thus not propagate
as dependency info() or post_launch()) EXCEPT
Action.outputs.
The Action.outputs attribute is serialized right after Action.post_run()
and read back in on Action completion.
Summary
A custom sane.Action subclass gives you the most flexible way to add
workflow-specific behavior to your SANE workflow. Use the action hooks for runtime control,
load extra options to support JSON and Python inputs, and keep in mind when & where
a particular method executes during the workflow.
Use
outputsto publish results for later actions.Use
*_launch()methods for context in the main processUse
*_run()methods for context in theAction.run()subprocessExtend JSON interface with your custom class by using
load_extra_options().Use
execute_subprocess()instead of raw subprocess calls if you want SANE logging integration.
Remember - for simple command-based actions, the inherited default run()
may be sufficient with config["command"] and config["arguments"].