Skip to content

Schemas

The executor schemas are Pydantic models that carry provenance metadata across the full simulation lifecycle. Every execution produces an ExperimentRecord automatically — no additional instrumentation is required from the modeller.


ExperimentRecord

The central provenance object. Created before execute_lifecycle is called and mutated in-place by load, run, and save.

After a completed run, the record contains:

Field Populated by Description
experiment_id automatically UUID generated at creation
created_at automatically ISO 8601 timestamp
source.checksum load() SHA-256 of the input dataset
parameters CLI / API Resolved model parameters
output_path save() URI of the output file
output_sha256 save() SHA-256 of the output file
metrics execute_lifecycle Per-phase timing + model-specific metrics
artifacts save() Named checksums of extra artefacts
logs any phase Free-form execution log
status save() "completed" or "failed"

Example — reading a saved record

from dissmodel.executor.schemas import ExperimentRecord
from pathlib import Path

record = ExperimentRecord.model_validate_json(
    Path("output.record.json").read_text()
)

print(record.experiment_id)
print(record.metrics["time_load_sec"])
print(record.source.checksum)

Example — building a record manually (tests / scripts)

from dissmodel.executor.schemas import ExperimentRecord, DataSource

record = ExperimentRecord(
    model_name   = "my_model",
    model_commit = "abc123",
    code_version = "0.4.0",
    source       = DataSource(type="local", uri="data/input.gpkg"),
    parameters   = {"end_time": 20, "resolution": 100.0},
)

DataSource

Describes the input dataset. The checksum field is filled by load() after the file is read — not before.

DataSource(
    type     = "s3",           # "local" | "s3" | "http"
    uri      = "s3://bucket/input.tif",
    checksum = "",             # filled by load()
)

JobRequest

Used by the platform API to submit a simulation job. The platform resolves the executor from model.class, installs the package from model.package, and builds the ExperimentRecord before dispatching to the worker queue.

from dissmodel.executor.schemas import JobRequest

job = JobRequest(
    model = {
        "class":   "coastal_raster",
        "package": "git+https://github.com/LambdaGeo/coastal-dynamics@main",
        "parameters": {"end_time": 88},
    },
    source = {"type": "s3", "uri": "s3://dissmodel-inputs/grid.zip"},
)

API Reference

dissmodel.executor.schemas.ExperimentRecord

Bases: BaseModel

Source code in dissmodel/executor/schemas.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class ExperimentRecord(BaseModel):
    # Identidade
    experiment_id: str      = Field(default_factory=lambda: __import__("uuid").uuid4().hex)
    created_at:    datetime = Field(default_factory=datetime.utcnow)

    # Proveniência
    model_name:    str  = ""
    model_commit:  str  = ""
    code_version:  str  = ""
    resolved_spec: dict = {}

    # Input
    source:       DataSource = Field(default_factory=DataSource)
    input_format: str        = "auto"
    column_map:   dict       = {}
    band_map:     dict       = {}
    parameters:   dict       = {}

    # Output
    output_path: str | None = None

    artifacts: dict[str, str] = {}
    # Ex: {"output": "sha256...", "report": "sha256...", "plot": "sha256..."}
    # Chave "output" é a principal — usada para verificação de reprodutibilidade.

    metrics: dict = {}
    status:  str  = "pending"
    logs:    list[str] = []

    # ── compatibilidade com executors existentes ──────────────────────────────

    @property
    def output_sha256(self) -> str | None:
        """Compat: retorna artifacts["output"] se existir."""
        return self.artifacts.get("output")

    @output_sha256.setter
    def output_sha256(self, value: str | None) -> None:
        """Compat: salva em artifacts["output"] — executors antigos continuam funcionando."""
        if value is not None:
            self.artifacts["output"] = value

    # ── helpers ───────────────────────────────────────────────────────────────

    def add_log(self, msg: str) -> None:
        self.logs.append(msg)

    def add_artifact(self, name: str, checksum: str) -> None:
        """
        Registra um artefato com seu checksum.

        Uso no executor:
            record.add_artifact("report", write_text(md, uri))
            record.add_artifact("plot",   write_bytes(buf, uri))
            record.add_artifact("output", write_bytes(tif, uri))
        """
        self.artifacts[name] = checksum

output_sha256 property writable

Compat: retorna artifacts["output"] se existir.

add_artifact(name, checksum)

Registra um artefato com seu checksum.

Uso no executor: record.add_artifact("report", write_text(md, uri)) record.add_artifact("plot", write_bytes(buf, uri)) record.add_artifact("output", write_bytes(tif, uri))

Source code in dissmodel/executor/schemas.py
65
66
67
68
69
70
71
72
73
74
def add_artifact(self, name: str, checksum: str) -> None:
    """
    Registra um artefato com seu checksum.

    Uso no executor:
        record.add_artifact("report", write_text(md, uri))
        record.add_artifact("plot",   write_bytes(buf, uri))
        record.add_artifact("output", write_bytes(tif, uri))
    """
    self.artifacts[name] = checksum

dissmodel.executor.schemas.DataSource

Bases: BaseModel

Source code in dissmodel/executor/schemas.py
10
11
12
13
14
15
class DataSource(BaseModel):
    type:       str = "local"   # 'local' | 's3' | 'http' | 'bdc_stac'
    uri:        str = ""
    collection: str = ""
    version:    str = ""
    checksum:   str = ""

dissmodel.executor.schemas.JobRequest

Bases: BaseModel

Payload for POST /submit_job (platform only).

Source code in dissmodel/executor/schemas.py
77
78
79
80
81
82
83
84
85
86
class JobRequest(BaseModel):
    """Payload for POST /submit_job (platform only)."""

    model_name:    str
    input_dataset: str
    input_format:  Literal["tiff", "vector", "auto"] = "auto"
    parameters:    dict = {}
    column_map:    dict = {}
    band_map:      dict = {}
    priority:      Literal["low", "normal", "high"] = "normal"