Custom Actions

This guide explains how to derive your own workflow actions from sane.Action, how to make them configurable from both Python and JSON, and how the SANE runtime executes them.

This material is intended to be a standalone advanced user guide. If you want more introductory coverage of Python or JSON workflows, see the Python Interfacing and JSON Interface sections.

Note

Users should aim to never directly manage or change Action.state, Action.status, or anything within the Action Internal API. These attributes and methods are provided for advanced usage far beyond normal custom workflow classes. If you find yourself using them beyond read-only, consider the design of your custom Action and workflow.

Warning

Users should NEVER try to manage host-managed resources or dependency completion & state. It is the responsibility of the Host and Orchestrator to guarantee the correctness of each, respectively.

Why derive sane.Action?

Use a custom sane.Action subclass when the default action behavior is not enough for your workflow. In SANE, the default run() executes a command from config using execute_subprocess(), but a custom class can instead:

  • implement domain-specific logic in run()

  • generate structured outputs

  • expose new top-level options through load_extra_options()

  • manage more complex setup, resources usage, and host-specific information

A derived action is still a workflow object, so it must be added to the sane.Orchestrator to participate in the workflow.

Tyically Overriden Methods

For most custom Action classes, the common extension points are those listed under Customizable Functions in the Action User Interface:

Danger

Do not override launch(). It contains the workflow framework for subprocess execution, logging, state tracking, and Orchestrator wake-up.

The typical pattern is:

  1. Create a subclass of sane.Action.

  2. Add any custom attributes in __init__().

  3. Override load_extra_options() to support custom JSON/Python options.

  4. Override run() to perform your work.

  5. Add an instance of the new Action type to the Orchestrator.

Using Custom Action

The following example shows a minimal custom action that writes a message to standard output multiple times.

import sane

class RepeatMessageAction( sane.Action ):
  def __init__( self, id ):
    super().__init__( id )
    self.message = "Hello"
    self.count = 1

  def load_extra_options( self, options, origin ):
    self.message = options.pop( "message", self.message )
    self.count = options.pop( "count", self.count )
    super().load_extra_options( options, origin )

  def run( self ) -> int:
    for i in range( self.count ):
      self.log( f"[{self.id}] {self.message}" )
    self.outputs[ "lines_written" ] = self.count
    # This is the exit code of the subprocess
    return 0

Key things to note in the above example:

  • RepeatMessageAction inherits from sane.Action.

  • load_extra_options consumes custom keys from the options dictionary.

  • super().load_extra_options(options, origin) preserves the base class option loading behavior.

  • run is the actual place where the action does work.

  • self.outputs can be used to publish results for later dependencies.

Using a custom action from Python

A custom action may be configured directly from Python without JSON. The workflow merely needs to instantiate the class, call load_options() or otherwise modify the instance, and add the action to the Orchestrator.

import sane

@sane.register
def workflow( orch ):
  action = RepeatMessageAction( "repeat_message" )
  action.load_options(
    {
      "message" : "SANE is running custom actions",
      "count"   : 3
    }
  )
  orch.add_action( action )

Using a custom action from JSON

Custom action subclasses can also be instantiated from JSON configuration. SANE resolves the "type" field using search_type():

{
  "actions":
  {
    "repeat_message":
    {
      "type"              : "my_project.actions.RepeatMessageAction",
      "environment"       : "gnu",
      "message"           : "custom action via json",
      "count"             : 4,
      "dependencies"      : { "prepare": "afterok" }
    }
  }
}

Advanced runtime hooks

SANE separates workflow launch-time behavior from subprocess runtime behavior. Action execution is divided into two phases:

  1. Per-action lifecycle methods executed in the orchestrator context
  • called in the main process around launch()

  • useful for saving metadata or validating the action before the subprocess starts

  1. Per-action execution methods executed in a separate subprocess context
  • called inside the isolated action subprocess around run()

  • useful for Action-subprocess scoped work such as preparing a temporary workspace or post processing data outside the run method.

The following diagram illustrates the SANE workflow execution model. A single host executes its runtime hook methods once per workflow, while multiple actions each independently execute their runtime hook methods.

digraph sane_workflow_lifecycle {
    rankdir=TB;
    node [shape=box, style=rounded];
    ranksep=.25;
    load    [label="Load Workflow JSON"];
    build   [label="Instantiate Hosts & Actions"];
    resolve [label="Resolve Dependency Graph"];

    hpre  [label="Host.pre_launch()"];
    hpost [label="Host.post_launch()"];

    load -> build -> resolve -> hpre;
    subgraph cluster_actions {
        labeljust="l"
        label="Per-Action Lifecycle\n(Parallel for Each Action)";
        style=dashed;

        subgraph cluster_action_a {
            labeljust="r"
            label="Action A Lifecycle";
            style=rounded;

            a_preL  [label="Action A: pre_launch()"];
            a_postL [label="Action A: post_launch()"];

            subgraph cluster_action_a_exec {
                label="Execution Phase (Subprocess Context)";
                style=dotted;
                fontsize=10;

                a_pre  [label="pre_run()"];
                a_run  [label="run()"];
                a_post [label="post_run()"];

                a_pre -> a_run -> a_post;
            }

            a_preL -> a_pre;
            a_post -> a_postL;
        }

        subgraph cluster_action_b {
            labeljust="l"
            label="Action B Lifecycle";
            style=rounded;

            b_preL  [label="Action B: pre_launch()"];
            b_postL [label="Action B: post_launch()"];

            subgraph cluster_action_b_exec {
                label="Execution Phase (Subprocess Context)";
                style=dotted;
                fontsize=10;

                b_pre  [label="pre_run()"];
                b_run  [label="run()"];
                b_post [label="post_run()"];

                b_pre -> b_run -> b_post;
            }

            b_preL -> b_pre;
            b_post -> b_postL;
        }
    }

    hpre -> a_preL;
    hpre -> b_preL;

    a_postL -> hpost;
    b_postL -> hpost;
}

Hint

The shared Action mutex is held during *_launch() calls, so other running Action objects will not interfere.

Execution phases and contexts

The workflow execution follows a defined lifecycle. Each action participates in two phases:

Orchestrator Phase (main workflow process):

The Orchestrator calls pre_launch(), then spawns a new subprocess to execute the action, and finally calls post_launch() with the subprocess results. The Orchestrator holds the action state and may manage multiple actions in parallel.

Subprocess Phase (isolated action process):

The action subprocess calls pre_run(), then run(), and finally post_run(). The Environment is already set up at this point, and the Action has access to its own isolated runtime and environment variables.

Runtime Hook Examples

pre_launch

Called before the action subprocess starts. Use this to:

  • Validate inputs or configuration before spending resources on execution

  • Prepare existing data files or temporary directories

  • Log metadata about the action

Example: validation in pre_launch

Here, pre_launch validates that required input files exist before the subprocess is spawned:

class FileProcessorAction( sane.Action ):
  def __init__( self, id ):
    super().__init__( id )
    self.input_file = None

  def load_extra_options( self, options, origin ):
    self.input_file = options.pop( "input_file", self.input_file )
    super().load_extra_options( options, origin )

  def pre_launch( self ):
    """Validate input file exists before launching subprocess"""
    import os
    if self.input_file is None:
      self.log( "No input_file specified", level=40 )
      return False
    if not os.path.isfile( self.input_file ):
      self.log( f"Input file not found: {self.input_file}", level=40 )
      return False
    self.log( f"Input file validated: {self.input_file}", level=20 )
    return True

  def run( self ) -> int:
    # At this point, we know the file exists
    with open( self.input_file, 'r' ) as f:
      lines = len( f.readlines() )
    self.outputs[ "lines" ] = lines
    return 0
post_launch

Called after the action subprocess finishes with the return code and captured output. Use this to:

  • Process or summarize action results further

  • Save intermediate outputs to shared storage

  • Clean up non-host-managed resources allocated in pre_launch

  • Handle errors and decide whether to mark the action as success or failure beyond exit code from run

Return False from post_launch to mark the action as failed.

Example: result processing in post_launch

Here, post_launch processes the subprocess output and saves a summary:

class TestAction( sane.Action ):
  def run( self ) -> int:
    # Use *_exec_raw() functions to allow subprocess STDOUT to be logged with
    # timestamps + context or if preferred just raw output
    self.push_exec_raw( False )
    # Run tests and capture stdout
    retval, content = self.execute_subprocess(
      "python", [ "-m", "pytest", "--tb=short", "tests/" ],
      capture=True,
      verbose=True
    )
    self.pop_exec_raw()
    self.outputs[ "test_output" ] = content
    return retval

  def post_launch( self, retval, content ):
    """Process test results and save summary"""
    if retval == 0:
      self.log( "All tests passed", level=20 )
      self.outputs[ "summary" ] = "PASS"
    else:
      self.log( "Some tests failed", level=40 )
      self.outputs[ "summary" ] = "FAIL"
      # You could write a report here beyond the simple XML/JSON/CLI reports SANE provides
    return True  # Mark action as success even on test failure
pre_run

Called within the subprocess before run(). Use this to:

  • Set up the subprocess environment (e.g., create temp directories)

  • Source additional environment scripts

  • Validate the runtime environment

post_run

Called within the subprocess after run() with the return code. Use this to:

  • Clean up temporary files created in pre_run

  • Process output captured by run

  • Log final results

Example: environment setup in pre_run / post_run

Here, pre_run sets up a temporary working directory within the subprocess - note that these methods all execute in the same context one immediately after the other:

class ScratchWorkAction( sane.Action ):
  def pre_run( self ):
    """Create a temporary scratch directory for this action"""
    import tempfile
    import os
    # Assigning a new variable at this scope for run() is okay because
    # this is executing within the same process scope
    self.scratch = tempfile.mkdtemp( prefix=self.id )
    self.log( f"Created scratch directory: {self.scratch}" )

  def run( self ) -> int:
    # self.scratch was setup in pre_run()
    work_file = os.path.join( self.scratch, "intermediate.txt" )
    # Use scratch directory for temp work
    with open( work_file, 'w' ) as f:
      f.write( "intermediate results" )
    self.outputs[ "work_file" ] = work_file
    return 0

  def post_run( self, retval ):
    """Clean up the scratch directory"""
    import shutil
    import os
    if hasattr( self, "scratch" ) and os.path.isdir( self.scratch ):
      shutil.rmtree( self.scratch )
      self.log( f"Cleaned up scratch directory: {self.scratch}" )

Dependency and output access

At runtime, the Action dependencies are available through the dependencies property. Each dependency entry contains the parent Action’s info().

This is useful when implementing Actions that consume the results of prior steps. The default dereferencing syntax also supports expressions such as:

  • ${{ dependencies.prepare.outputs.some_file }}

  • ${{ dependencies.prepare.config.foo }}

This can be extremely useful when chaining custom Action classes together to consume the values of predecessors.

Important

Any changes that you make to the Action object within any of the *run() methods will not be visible in the main process (and thus not propagate as dependency info() or post_launch()) EXCEPT Action.outputs.

The Action.outputs attribute is serialized right after Action.post_run() and read back in on Action completion.

Summary

A custom sane.Action subclass gives you the most flexible way to add workflow-specific behavior to your SANE workflow. Use the action hooks for runtime control, load extra options to support JSON and Python inputs, and keep in mind when & where a particular method executes during the workflow.

  • Override run() for custom work, and NEVER modify launch()

  • Use outputs to publish results for later actions.

  • Use *_launch() methods for context in the main process

  • Use *_run() methods for context in the Action.run() subprocess

  • Extend JSON interface with your custom class by using load_extra_options().

  • Use execute_subprocess() instead of raw subprocess calls if you want SANE logging integration.

Remember - for simple command-based actions, the inherited default run() may be sufficient with config["command"] and config["arguments"].