Skip to content

Executor

The dissmodel.executor module provides the standardised interface for packaging, deploying, and reproducing simulations. It separates scientific logic from execution infrastructure, so the same model code runs locally via CLI or remotely via the platform API without modification.

The module exposes three building blocks:

Component Description
ModelExecutor Abstract base class defining the four-phase lifecycle
ExecutorRegistry Central registry mapping model names to executor classes
execute_lifecycle Canonical orchestration function used by CLI and platform

Lifecycle

Every executor follows the same four-phase lifecycle, orchestrated by execute_lifecycle:

validate(record)            # stateless pre-flight checks — no I/O
data = load(record)         # resolve URI, apply column/band maps, return data
result = run(data, record)  # simulation — no I/O here
record = save(result, record)

Each phase is timed independently. Timings are written to record.metrics automatically (time_validate_sec, time_load_sec, time_run_sec, time_save_sec, time_total_sec).


Minimal implementation

import geopandas as gpd
from dissmodel.executor import ModelExecutor, ExperimentRecord
from dissmodel.executor.cli import run_cli
from dissmodel.io import load_dataset, save_dataset


class MyExecutor(ModelExecutor):
    name = "my_model"

    def load(self, record: ExperimentRecord) -> gpd.GeoDataFrame:
        gdf, checksum = load_dataset(record.source.uri)
        record.source.checksum = checksum
        return gdf

    def run(self, data: gpd.GeoDataFrame, record: ExperimentRecord) -> gpd.GeoDataFrame:
        # data is already loaded — no I/O here
        params = record.parameters
        # ... simulation logic ...
        return data

    def save(self, result, record: ExperimentRecord) -> ExperimentRecord:
        uri = record.output_path or "output.gpkg"
        checksum = save_dataset(result, uri)
        record.output_path   = uri
        record.output_sha256 = checksum
        record.status        = "completed"
        return record


if __name__ == "__main__":
    run_cli(MyExecutor)

Auto-registration

Subclasses of ModelExecutor that define a name class attribute are registered automatically in ExecutorRegistry via Python's __init_subclass__. No boilerplate is required:

from dissmodel.executor import ExecutorRegistry

ExecutorRegistry.list()          # ["my_model", ...]
ExecutorRegistry.get("my_model") # → MyExecutor class

The name attribute is also the key used in the TOML model registry:

[model]
class   = "my_model"
package = "my_package"

execute_lifecycle

execute_lifecycle is the single source of truth for orchestration. It is used by both dissmodel/executor/cli.py and the platform job_runner.py, ensuring behavioural parity without code duplication.

from dissmodel.executor import execute_lifecycle

executor = MyExecutor()
record, timings = execute_lifecycle(executor, record)

print(timings)
# {
#   "time_validate_sec": 0.0,
#   "time_load_sec": 1.243,
#   "time_run_sec": 0.872,
#   "time_save_sec": 0.004,
#   "time_total_sec": 2.119
# }

API Reference

dissmodel.executor.model_executor.ModelExecutor

Bases: ABC

Base interface for DisSModel executors.

Subclasses register themselves automatically in ExecutorRegistry via init_subclass — no boilerplate required.

The platform orchestrates the full lifecycle in order:

validate(record)          # stateless pre-flight checks
data = load(record)       # I/O — called once by the platform
result = run(data, record) # simulation — no I/O here
record = save(result, record)
Minimal implementation

class MyExecutor(ModelExecutor): name = "my_model"

def load(self, record: ExperimentRecord):
    return gpd.read_file(record.source.uri)

def run(self, data, record: ExperimentRecord):
    gdf = data  # already loaded — no I/O here
    # ... run simulation ...
    return gdf

def save(self, result, record: ExperimentRecord) -> ExperimentRecord:
    record.status = "completed"
    return record
CLI usage

if name == "main": from dissmodel.executor.cli import run_cli run_cli(MyExecutor)

Source code in dissmodel/executor/model_executor.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
class ModelExecutor(ABC):
    """
    Base interface for DisSModel executors.

    Subclasses register themselves automatically in ExecutorRegistry
    via __init_subclass__ — no boilerplate required.

    The platform orchestrates the full lifecycle in order:

        validate(record)          # stateless pre-flight checks
        data = load(record)       # I/O — called once by the platform
        result = run(data, record) # simulation — no I/O here
        record = save(result, record)

    Minimal implementation
    ----------------------
    class MyExecutor(ModelExecutor):
        name = "my_model"

        def load(self, record: ExperimentRecord):
            return gpd.read_file(record.source.uri)

        def run(self, data, record: ExperimentRecord):
            gdf = data  # already loaded — no I/O here
            # ... run simulation ...
            return gdf

        def save(self, result, record: ExperimentRecord) -> ExperimentRecord:
            record.status = "completed"
            return record

    CLI usage
    ---------
    if __name__ == "__main__":
        from dissmodel.executor.cli import run_cli
        run_cli(MyExecutor)
    """

    # Static class attribute — the registry key.
    # Must be a plain string, never a property.
    name: ClassVar[str]

    def __init_subclass__(cls, **kwargs: object) -> None:
        super().__init_subclass__(**kwargs)
        from .registry import ExecutorRegistry
        if hasattr(cls, "name"):
            ExecutorRegistry.register(cls)

    # ── Required lifecycle methods ────────────────────────────────────────────

    @abstractmethod
    def load(self, record: ExperimentRecord) -> Any:
        """
        Load and resolve the input dataset.

        Called by the platform before run(). The return value is passed
        directly as the first argument to run() — no second load occurs.

        Responsibilities:
        - Resolve URI (s3://, http://, local path)
        - Apply column_map (vector) or band_map (raster)
        - Fill record.source.checksum with sha256 of the loaded data
        - Return data in the format expected by run()
        """

    @abstractmethod
    def run(self, data: Any, record: ExperimentRecord) -> Any:
        """
        Execute the simulation.

        `data` is the direct return value of load(), injected by the platform.
        No I/O should happen here — all loading is done by load().

        Receives record with resolved_spec and parameters already merged.
        Returns raw result — format defined by the subclass and
        consumed by save().
        """

    @abstractmethod
    def save(self, result: Any, record: ExperimentRecord) -> ExperimentRecord:
        """
        Persist the result and return the updated record.

        Responsibilities:
        - Save output to destination (local path or s3://)
        - Fill record.output_path and record.output_sha256
        - Set record.status = "completed"
        - Return the complete record
        """

    # ── Optional hook ─────────────────────────────────────────────────────────

    def validate(self, record: ExperimentRecord) -> None:
        """
        Stateless pre-flight checks on the record — no data loading.

        Called by the platform before load(). Override to check canonical
        columns/bands, value ranges, parameter constraints, etc.
        Raise ValueError with an actionable message if invalid.

        Default implementation: no-op.
        """

    # ── Utilities available to subclasses ─────────────────────────────────────

    def _resolve_uri(self, uri: str) -> str:
        """
        Resolve any URI to a local path accessible by the executor.

        s3://bucket/key  → downloads to /tmp/<filename>, returns path
        http(s)://...    → downloads to /tmp/<filename>, returns path
        /local/path      → returns as-is
        """
        import os
        import urllib.request

        if uri.startswith("s3://"):
            from dissmodel.io._storage import get_default_client
            minio       = get_default_client()
            bucket, key = uri[5:].split("/", 1)
            local_path  = f"/tmp/{os.path.basename(key)}"
            minio.fget_object(bucket, key, local_path)
            return local_path

        if uri.startswith("http://") or uri.startswith("https://"):
            filename   = uri.split("/")[-1].split("?")[0]
            local_path = f"/tmp/{filename}"
            urllib.request.urlretrieve(uri, local_path)
            return local_path

        return uri

    @staticmethod
    def _sha256(path_or_bytes) -> str:
        """Compute sha256 of a file path or bytes."""
        import hashlib
        if isinstance(path_or_bytes, bytes):
            return hashlib.sha256(path_or_bytes).hexdigest()
        with open(path_or_bytes, "rb") as f:
            return hashlib.sha256(f.read()).hexdigest()

load(record) abstractmethod

Load and resolve the input dataset.

Called by the platform before run(). The return value is passed directly as the first argument to run() — no second load occurs.

Responsibilities: - Resolve URI (s3://, http://, local path) - Apply column_map (vector) or band_map (raster) - Fill record.source.checksum with sha256 of the loaded data - Return data in the format expected by run()

Source code in dissmodel/executor/model_executor.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@abstractmethod
def load(self, record: ExperimentRecord) -> Any:
    """
    Load and resolve the input dataset.

    Called by the platform before run(). The return value is passed
    directly as the first argument to run() — no second load occurs.

    Responsibilities:
    - Resolve URI (s3://, http://, local path)
    - Apply column_map (vector) or band_map (raster)
    - Fill record.source.checksum with sha256 of the loaded data
    - Return data in the format expected by run()
    """

run(data, record) abstractmethod

Execute the simulation.

data is the direct return value of load(), injected by the platform. No I/O should happen here — all loading is done by load().

Receives record with resolved_spec and parameters already merged. Returns raw result — format defined by the subclass and consumed by save().

Source code in dissmodel/executor/model_executor.py
75
76
77
78
79
80
81
82
83
84
85
86
@abstractmethod
def run(self, data: Any, record: ExperimentRecord) -> Any:
    """
    Execute the simulation.

    `data` is the direct return value of load(), injected by the platform.
    No I/O should happen here — all loading is done by load().

    Receives record with resolved_spec and parameters already merged.
    Returns raw result — format defined by the subclass and
    consumed by save().
    """

save(result, record) abstractmethod

Persist the result and return the updated record.

Responsibilities: - Save output to destination (local path or s3://) - Fill record.output_path and record.output_sha256 - Set record.status = "completed" - Return the complete record

Source code in dissmodel/executor/model_executor.py
88
89
90
91
92
93
94
95
96
97
98
@abstractmethod
def save(self, result: Any, record: ExperimentRecord) -> ExperimentRecord:
    """
    Persist the result and return the updated record.

    Responsibilities:
    - Save output to destination (local path or s3://)
    - Fill record.output_path and record.output_sha256
    - Set record.status = "completed"
    - Return the complete record
    """

validate(record)

Stateless pre-flight checks on the record — no data loading.

Called by the platform before load(). Override to check canonical columns/bands, value ranges, parameter constraints, etc. Raise ValueError with an actionable message if invalid.

Default implementation: no-op.

Source code in dissmodel/executor/model_executor.py
102
103
104
105
106
107
108
109
110
111
def validate(self, record: ExperimentRecord) -> None:
    """
    Stateless pre-flight checks on the record — no data loading.

    Called by the platform before load(). Override to check canonical
    columns/bands, value ranges, parameter constraints, etc.
    Raise ValueError with an actionable message if invalid.

    Default implementation: no-op.
    """

dissmodel.executor.registry.ExecutorRegistry

Central registry mapping model names to executor classes.

Registration is automatic — subclasses of ModelExecutor register themselves via init_subclass without any boilerplate.

Source code in dissmodel/executor/registry.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class ExecutorRegistry:
    """
    Central registry mapping model names to executor classes.

    Registration is automatic — subclasses of ModelExecutor register
    themselves via __init_subclass__ without any boilerplate.
    """

    _executors: dict[str, type[ModelExecutor]] = {}

    @classmethod
    def register(cls, executor_cls: type[ModelExecutor]) -> None:
        """Called automatically by ModelExecutor.__init_subclass__."""
        cls._executors[executor_cls.name] = executor_cls

    @classmethod
    def get(cls, name: str) -> type[ModelExecutor]:
        """
        Resolve executor class by name.
        Raises KeyError with a clear message if not registered.
        """
        if name not in cls._executors:
            available = ", ".join(cls._executors) or "none"
            raise KeyError(
                f"Executor '{name}' not registered. "
                f"Available: {available}"
            )
        return cls._executors[name]

    @classmethod
    def list(cls) -> list[str]:
        """Return all registered executor names."""
        return list(cls._executors.keys())

get(name) classmethod

Resolve executor class by name. Raises KeyError with a clear message if not registered.

Source code in dissmodel/executor/registry.py
24
25
26
27
28
29
30
31
32
33
34
35
36
@classmethod
def get(cls, name: str) -> type[ModelExecutor]:
    """
    Resolve executor class by name.
    Raises KeyError with a clear message if not registered.
    """
    if name not in cls._executors:
        available = ", ".join(cls._executors) or "none"
        raise KeyError(
            f"Executor '{name}' not registered. "
            f"Available: {available}"
        )
    return cls._executors[name]

list() classmethod

Return all registered executor names.

Source code in dissmodel/executor/registry.py
38
39
40
41
@classmethod
def list(cls) -> list[str]:
    """Return all registered executor names."""
    return list(cls._executors.keys())

register(executor_cls) classmethod

Called automatically by ModelExecutor.init_subclass.

Source code in dissmodel/executor/registry.py
19
20
21
22
@classmethod
def register(cls, executor_cls: type[ModelExecutor]) -> None:
    """Called automatically by ModelExecutor.__init_subclass__."""
    cls._executors[executor_cls.name] = executor_cls

dissmodel.executor.runner.execute_lifecycle(executor, record)

Canonical lifecycle orchestration for DisSModel executors.

Runs validate → load → run → save in order, times each phase independently, and populates record.metrics with the results.

Used by both the CLI runner and the platform job_runner to ensure behavioural parity without code duplication.

Parameters:

Name Type Description Default
executor 'ModelExecutor'

An instantiated ModelExecutor subclass.

required
record 'ExperimentRecord'

The ExperimentRecord for this job. May be mutated in-place by load() (e.g. source.checksum) and save() (output_path, status).

required

Returns:

Name Type Description
record 'ExperimentRecord'

The completed ExperimentRecord with status, output_path, and metrics populated.

timings dict[str, float]

Dict with individual phase times and total: time_validate_sec, time_load_sec, time_run_sec, time_save_sec, time_total_sec.

Source code in dissmodel/executor/runner.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def execute_lifecycle(
    executor: "ModelExecutor",
    record: "ExperimentRecord",
) -> tuple["ExperimentRecord", dict[str, float]]:
    """
    Canonical lifecycle orchestration for DisSModel executors.

    Runs validate → load → run → save in order, times each phase
    independently, and populates record.metrics with the results.

    Used by both the CLI runner and the platform job_runner to ensure
    behavioural parity without code duplication.

    Parameters
    ----------
    executor:
        An instantiated ModelExecutor subclass.
    record:
        The ExperimentRecord for this job. May be mutated in-place by
        load() (e.g. source.checksum) and save() (output_path, status).

    Returns
    -------
    record:
        The completed ExperimentRecord with status, output_path, and
        metrics populated.
    timings:
        Dict with individual phase times and total:
        time_validate_sec, time_load_sec, time_run_sec,
        time_save_sec, time_total_sec.
    """
    t0 = time.perf_counter()
    executor.validate(record)
    t_val = time.perf_counter() - t0

    t0 = time.perf_counter()
    data = executor.load(record)
    t_load = time.perf_counter() - t0

    t0 = time.perf_counter()
    result = executor.run(data, record)
    t_run = time.perf_counter() - t0

    t0 = time.perf_counter()
    record = executor.save(result, record)
    t_save = time.perf_counter() - t0

    t_total = t_val + t_load + t_run + t_save

    timings: dict[str, float] = {
        "time_validate_sec": round(t_val,   3),
        "time_load_sec":     round(t_load,  3),
        "time_run_sec":      round(t_run,   3),
        "time_save_sec":     round(t_save,  3),
        "time_total_sec":    round(t_total, 3),
    }

    record.metrics.update(timings)
    return record, timings