From Python Workflow to Compute Payload¶
Sophios provides a clean path from Python-authored CWL all the way to a schema-validated compute submission payload.
The key idea is simple:
build a CWL tool in Python,
compose it into a
Workflow,ask the workflow API for the compiled CWL and job inputs in memory,
wrap that compiled result in
ComputeWorkflowPayload,submit it when you are ready.
You do not need to hand-build JSON.
You do not need to write an intermediate .cwl file just to produce the
submission request body.
A runnable version of this pattern lives in examples/scripts/compute_payload_workflow.py.
If you want a larger example that starts from the Ichnaea autosegmentation CLT and carries that tool all the way through workflow construction and compute submission, see ichnaea_compact_compute.
What This Pattern Gives You¶
This split gives you clear checkpoints:
CommandLineTool(...)keeps tool authoring structured and readable.Workflow(...)keeps step wiring explicit and reviewable.Workflow.get_cwl_workflow()gives you the exact compiled workflow plus job inputs.ComputeWorkflowPayload.get_compute_payload()validates that request against the checked-in compute schema.
That last point matters. Schema validation catches payload-shape mistakes before
you submit the request. The schema lives at
src/sophios/compute_payload_schema.json.
Minimal mental model¶
Think in terms of layers:
tool_builderdefines a single CWL toolthe workflow Python API composes tools into a CWL workflow
ComputeWorkflowPayloadpackages that compiled workflow for a compute service
Each layer owns one job. That keeps the implementation understandable and the user-facing API focused.
Full example¶
from datetime import datetime
from sophios.apis.python.tool_builder import (
CommandLineTool,
Input,
Inputs,
Output,
Outputs,
cwl,
)
from sophios.apis.python.workflow import (
Step,
Workflow,
)
from sophios.compute_payload import (
ComputeConfig,
ComputeWorkflowPayload,
OutputConfig,
SlurmConfig,
ToilConfig,
)
def build_emit_text_tool() -> CommandLineTool:
inputs = Inputs(
message=Input(cwl.string, position=1)
.label("Message")
.doc("Text to print to stdout."),
)
outputs = Outputs(
text_file=Output(cwl.file, glob="stdout.txt")
.label("Captured stdout")
.doc("Text emitted by the tool, captured as a file."),
)
return (
CommandLineTool("emit_text", inputs, outputs)
.describe("Emit text", "Generated CLT that prints one message.")
.base_command("python", "-c")
.argument("import sys; print(sys.argv[1])", position=0)
.stdout("stdout.txt")
)
def build_workflow(message: str) -> Workflow:
emit_step = Step(build_emit_text_tool(), step_name="emit_text")
workflow = Workflow([emit_step], "compute_payload_workflow_demo")
emit_step.inputs.message = message
workflow.outputs.text_file = emit_step.outputs.text_file
return workflow
workflow = build_workflow("hello from compute")
compiled = workflow.get_cwl_workflow()
cwl_workflow = {key: value for key, value in compiled.items() if key not in {"name", "yaml_inputs"}}
cwl_job_inputs = dict(compiled["yaml_inputs"])
payload = ComputeWorkflowPayload(
cwl_workflow=cwl_workflow,
cwl_job_inputs=cwl_job_inputs,
workflow_id=f"{workflow.process_name}__{datetime.now():%Y_%m_%d_%H.%M.%S}__",
)
compute_json = payload.get_compute_payload()
Why this shape is useful¶
There are three design choices here that are worth keeping in mind.
1. The workflow stays in memory¶
workflow.get_cwl_workflow() returns a plain Python object with:
the compiled CWL workflow document
the generated
yaml_inputspayload
That is exactly what the compute payload layer needs.
So instead of rebuilding the request manually, you split the compiled object once
at the boundary and hand the two pieces to ComputeWorkflowPayload.
2. The payload object stays focused¶
The core constructor only needs:
cwl_workflowcwl_job_inputsoptionally
workflow_id
That keeps the compute layer loosely coupled to the Python workflow API.
It does not need to know what a Workflow is. It only needs the compiled output.
In this example the message is bound directly to emit_step.inputs.message.
That is deliberate: it produces a real cwlJobInputs payload immediately, which
is the most useful shape for validating the submission boundary.
3. Validation happens before submission¶
This line is the validation boundary:
compute_json = payload.get_compute_payload()
That call renders the payload and validates it against the checked-in compute schema.
If the payload shape drifts from the schema, it fails here, before any network call.
Optional compute configuration¶
Most workflows only need the default payload shape:
payload = ComputeWorkflowPayload(
cwl_workflow=cwl_workflow,
cwl_job_inputs=cwl_job_inputs,
)
When you do need compute-specific settings, add a ComputeConfig:
from sophios.compute_payload import (
ComputeConfig,
OutputConfig,
SlurmConfig,
ToilConfig,
)
payload = ComputeWorkflowPayload(
cwl_workflow=cwl_workflow,
cwl_job_inputs=cwl_job_inputs,
workflow_id="demo_job",
compute_config=ComputeConfig(
toil=ToilConfig(log_level="INFO"),
output=OutputConfig.from_json(
mode="userSpecified",
outputDir="/tmp/compute-demo-out",
),
slurm=SlurmConfig(partition="normal_gpu", cpus_per_task=4),
),
)
That keeps compute-specific concerns explicit without leaking them into the workflow API.
If you prefer the more Pythonic helpers, OutputConfig.user_specified(...) and
OutputConfig.workflow_declared() still work too.
Submission¶
Submission is intentionally a separate concern:
from sophios.compute_submit import submit_compute_json, submit_compute_payload
retval = submit_compute_payload(payload, "http://127.0.0.1:7998/compute/")
retval = submit_compute_json(compute_json, "http://127.0.0.1:7998/compute/")
Submission behavior is intentionally narrow:
send the validated payload
poll
/status/until the job reaches a started or terminal stateprint logs only after the job reaches
RUNNING
That makes the client behavior predictable and easy to inspect.
Run the example¶
From the repository root:
python examples/scripts/compute_payload_workflow.py
The script validates the generated CLT and writes a compute payload JSON file by
default. To submit the payload, set SUBMIT_URL near the top of the script.
Summary¶
The intended flow is now:
author tools with
tool_buildercompose them with the workflow Python API
compile in memory with
Workflow.get_cwl_workflow()package and validate with
ComputeWorkflowPayloadsubmit only when the payload is already known to match the schema
That gives you a path from Python authoring to compute submission without raw JSON assembly, while keeping the submitted payload visible and schema-checked.