Adding an Engine
Macaw is engine-agnostic. Adding a new STT or TTS engine requires implementing the backend interface, creating a model manifest, and writing tests.
There are two paths:
| Path | When to use | Changes to Macaw |
|---|---|---|
| External (recommended) | Third-party or community engines | Zero |
| Built-in | Engines shipped with the package | Factory + engines.py + pyproject.toml |
Both paths share the same backend interface, lifecycle, and required patterns.
Backend Lifecycle
Every engine goes through this lifecycle inside the gRPC worker subprocess:
_create_backend(engine) # instantiate (no heavy work)
│
▼
backend.load(model_path, config) # load model into GPU/CPU
│
▼
backend.post_load_hook() # optional: load auxiliary models
│
▼
warmup (3 inference passes) # prime JIT/CUDA caches
│
▼
serve gRPC requests # synthesize() / transcribe_file() / transcribe_stream()
│
▼
backend.unload() # free GPU memory, cleanupLifecycle guarantees
load()is where you check for missing dependencies (import guard).post_load_hook()is for auxiliary models that depend on the main model.- If
load()orpost_load_hook()raises, the worker fails to start. This is correct. unload()must release all GPU memory (callrelease_gpu_memory()).
Complete TTS Example
Minimal TTS backend (simplified from macaw/workers/tts/chatterbox.py):
"""TTS backend for MyTTS engine.
my-tts-lib is an optional dependency -- the import is guarded.
"""
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
import numpy as np
from macaw._audio_constants import TTS_DEFAULT_SAMPLE_RATE
from macaw._types import TTSEngineCapabilities, VoiceInfo
from macaw.exceptions import ModelLoadError, TTSEngineError, TTSSynthesisError
from macaw.logging import get_logger
from macaw.workers.torch_utils import release_gpu_memory, resolve_device
from macaw.workers.tts.audio_utils import CHUNK_SIZE_BYTES, float32_to_pcm16_bytes
from macaw.workers.tts.interface import TTSBackend
if TYPE_CHECKING:
from collections.abc import AsyncIterator
# --- Import guard: fail at load() time, not at import time ---
try:
from my_tts_lib import MyTTSModel as _MyTTSModel
except ImportError:
_MyTTSModel = None
logger = get_logger("worker.tts.my_engine")
class MyTTSBackend(TTSBackend):
"""TTS backend using MyTTS."""
def __init__(self) -> None:
self._model: object | None = None
self._model_path: str = ""
self._sample_rate: int = TTS_DEFAULT_SAMPLE_RATE
async def capabilities(self) -> TTSEngineCapabilities:
return TTSEngineCapabilities(
supports_streaming=False,
supports_voice_cloning=False,
)
async def load(self, model_path: str, config: dict[str, object]) -> None:
# Check import guard FIRST
if _MyTTSModel is None:
msg = (
"my-tts-lib is not installed. "
"Install with: pip install macaw-openvoice[my-tts]"
)
raise ModelLoadError(model_path, msg)
device = resolve_device(str(config.get("device", "auto")))
# Blocking model load -> run in executor to avoid blocking the event loop
loop = asyncio.get_running_loop()
try:
self._model = await loop.run_in_executor(
None,
lambda: _MyTTSModel.from_pretrained(model_path, device=device),
)
except Exception as exc:
raise ModelLoadError(model_path, str(exc)) from exc
self._model_path = model_path
logger.info("model_loaded", model_path=model_path, device=device)
# AsyncGenerator is a subtype of AsyncIterator but mypy doesn't recognize
# yield-based overrides. The type: ignore is correct and expected.
async def synthesize( # type: ignore[override, misc]
self,
text: str,
voice: str = "default",
*,
sample_rate: int = TTS_DEFAULT_SAMPLE_RATE,
speed: float = 1.0,
options: dict[str, object] | None = None,
) -> AsyncIterator[bytes]:
if self._model is None:
msg = "Model not loaded. Call load() first."
raise ModelLoadError("unknown", msg)
if not text.strip():
raise TTSSynthesisError(self._model_path, "Empty text")
# Blocking inference -> run in executor
loop = asyncio.get_running_loop()
try:
audio_bytes = await loop.run_in_executor(
None,
lambda: _synthesize_blocking(self._model, text, voice),
)
except (TTSSynthesisError, TTSEngineError):
raise
except Exception as exc:
raise TTSEngineError(self._model_path, str(exc)) from exc
if len(audio_bytes) == 0:
raise TTSEngineError(self._model_path, "Synthesis returned empty audio")
# Yield in chunks for gRPC streaming
for i in range(0, len(audio_bytes), CHUNK_SIZE_BYTES):
yield audio_bytes[i : i + CHUNK_SIZE_BYTES]
async def voices(self) -> list[VoiceInfo]:
return [
VoiceInfo(
voice_id="default",
name="default",
language="en",
gender="neutral",
),
]
async def unload(self) -> None:
if self._model is not None:
del self._model
self._model = None
release_gpu_memory()
self._model_path = ""
logger.info("model_unloaded")
async def health(self) -> dict[str, str]:
if self._model is not None:
return {"status": "ok"}
return {"status": "not_loaded"}
# --- Pure helper (runs in executor thread) ---
def _synthesize_blocking(model: object, text: str, voice: str) -> bytes:
"""Run synthesis in a blocking context (called via run_in_executor)."""
from macaw.workers.torch_utils import get_inference_context
with get_inference_context():
wav = model.generate(text, voice=voice) # type: ignore[attr-defined]
audio_array = np.asarray(wav, dtype=np.float32).squeeze()
return float32_to_pcm16_bytes(audio_array)Streaming TTS
The synthesize() method returns an AsyncIterator[bytes] — not a single bytes object. This enables streaming with low TTFB (Time to First Byte). Yield audio chunks as they become available rather than waiting for the full synthesis to complete.
Complete STT Example
Minimal STT backend (simplified from macaw/workers/stt/qwen3_asr.py):
"""STT backend for MyASR engine.
my-asr-lib is an optional dependency -- the import is guarded.
"""
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
import numpy as np
from macaw._audio_constants import STT_SAMPLE_RATE
from macaw._types import (
BatchResult,
EngineCapabilities,
SegmentDetail,
STTArchitecture,
TranscriptSegment,
)
from macaw.exceptions import AudioFormatError, ModelLoadError
from macaw.logging import get_logger
from macaw.workers.audio_utils import pcm_bytes_to_float32
from macaw.workers.stt.interface import STTBackend
from macaw.workers.torch_utils import release_gpu_memory, resolve_device
if TYPE_CHECKING:
from collections.abc import AsyncIterator
# --- Import guard ---
try:
from my_asr_lib import MyASRModel as _MyASRModel
except ImportError:
_MyASRModel = None
logger = get_logger("worker.stt.my_asr")
class MyASRBackend(STTBackend):
"""STT backend using MyASR (encoder-decoder)."""
def __init__(self) -> None:
self._model: object | None = None
self._model_path: str = ""
@property
def architecture(self) -> STTArchitecture:
return STTArchitecture.ENCODER_DECODER
async def load(self, model_path: str, config: dict[str, object]) -> None:
if _MyASRModel is None:
msg = (
"my-asr-lib is not installed. "
"Install with: pip install macaw-openvoice[my-asr]"
)
raise ModelLoadError(model_path, msg)
device = resolve_device(str(config.get("device", "auto")))
loop = asyncio.get_running_loop()
try:
self._model = await loop.run_in_executor(
None,
lambda: _MyASRModel.from_pretrained(model_path, device=device),
)
except Exception as exc:
raise ModelLoadError(model_path, str(exc)) from exc
self._model_path = model_path
logger.info("model_loaded", model_path=model_path, device=device)
async def capabilities(self) -> EngineCapabilities:
return EngineCapabilities(
supports_hot_words=False,
supports_initial_prompt=False,
supports_batch=True,
supports_word_timestamps=False,
max_concurrent_sessions=1,
)
async def transcribe_file(
self,
audio_data: bytes,
language: str | None = None,
initial_prompt: str | None = None,
hot_words: list[str] | None = None,
temperature: float = 0.0,
word_timestamps: bool = False,
) -> BatchResult:
if self._model is None:
msg = "Model not loaded. Call load() first."
raise ModelLoadError("unknown", msg)
if not audio_data:
raise AudioFormatError("Empty audio")
audio_array = pcm_bytes_to_float32(audio_data)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None,
lambda: _transcribe_blocking(self._model, audio_array, language),
)
duration = float(len(audio_array)) / STT_SAMPLE_RATE
return BatchResult(
text=result["text"],
language=result["language"],
duration=duration,
segments=(
SegmentDetail(id=0, start=0.0, end=duration, text=result["text"]),
),
)
# AsyncGenerator is a subtype of AsyncIterator but mypy doesn't recognize
# yield-based overrides. The type: ignore is correct and expected.
async def transcribe_stream( # type: ignore[override, misc]
self,
audio_chunks: AsyncIterator[bytes],
language: str | None = None,
initial_prompt: str | None = None,
hot_words: list[str] | None = None,
) -> AsyncIterator[TranscriptSegment]:
if self._model is None:
msg = "Model not loaded. Call load() first."
raise ModelLoadError("unknown", msg)
# Accumulate-and-transcribe pattern for encoder-decoder
buffer: list[np.ndarray] = []
threshold_samples = int(5.0 * STT_SAMPLE_RATE)
total_samples = 0
segment_id = 0
async for chunk in audio_chunks:
if not chunk:
break
audio = pcm_bytes_to_float32(chunk)
buffer.append(audio)
total_samples += len(audio)
if total_samples >= threshold_samples:
accumulated = np.concatenate(buffer)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None,
lambda: _transcribe_blocking(
self._model, accumulated, language
),
)
yield TranscriptSegment(
text=result["text"],
is_final=True,
segment_id=segment_id,
start_ms=0,
end_ms=int(float(total_samples) / STT_SAMPLE_RATE * 1000),
language=result["language"],
)
segment_id += 1
buffer = []
total_samples = 0
# Flush remaining audio
if buffer:
accumulated = np.concatenate(buffer)
if len(accumulated) > 0:
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None,
lambda: _transcribe_blocking(
self._model, accumulated, language
),
)
yield TranscriptSegment(
text=result["text"],
is_final=True,
segment_id=segment_id,
start_ms=0,
end_ms=int(float(total_samples) / STT_SAMPLE_RATE * 1000),
language=result["language"],
)
async def unload(self) -> None:
if self._model is not None:
del self._model
self._model = None
release_gpu_memory()
self._model_path = ""
logger.info("model_unloaded")
async def health(self) -> dict[str, str]:
if self._model is not None:
return {"status": "ok"}
return {"status": "not_loaded"}
# --- Pure helper (runs in executor thread) ---
def _transcribe_blocking(
model: object, audio: np.ndarray, language: str | None
) -> dict[str, str]:
from macaw.workers.torch_utils import get_inference_context
with get_inference_context():
result = model.transcribe( # type: ignore[attr-defined]
audio=(audio, STT_SAMPLE_RATE), language=language,
)
return {"text": str(result.text).strip(), "language": str(result.language)}Architecture Property (STT)
STT backends must declare their architecture via a read-only property. The runtime uses this to adapt the streaming pipeline automatically:
| Architecture | LocalAgreement | Cross-segment Context | Native Partials |
|---|---|---|---|
encoder-decoder | Yes (confirms tokens across passes) | Yes (224 tokens from previous segment) | No |
ctc | No (not needed) | No (initial_prompt not supported) | Yes |
streaming-native | No (not needed) | No | Yes |
from macaw._types import STTArchitecture
@property
def architecture(self) -> STTArchitecture:
return STTArchitecture.ENCODER_DECODERSet the right architecture
Choosing the wrong architecture causes incorrect streaming behavior. If your engine produces native partial transcripts, use ctc or streaming-native. If it needs multiple inference passes to produce stable output, use encoder-decoder.
Required Patterns
Import Guard
Engine dependencies are optional. Guard the import at module level and check in load():
try:
from some_engine import Model as _Model
except ImportError:
_Model = None
class MyBackend(TTSBackend):
async def load(self, model_path: str, config: dict[str, object]) -> None:
if _Model is None:
msg = (
"some-engine is not installed. "
"Install with: pip install macaw-openvoice[some-engine]"
)
raise ModelLoadError(model_path, msg)
# ... proceed with loadingWhy this matters
Workers run as subprocesses. If an import fails at module level, the subprocess crashes with a confusing traceback. The import guard gives a clear ModelLoadError with install instructions.
Executor Pattern
All blocking calls (model loading, inference) must run in an executor to avoid blocking the asyncio event loop:
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None, # default ThreadPoolExecutor
lambda: blocking_function(args),
)Use get_inference_context() inside the blocking helper to wrap inference in torch.inference_mode() (or nullcontext() if torch is unavailable):
def _my_blocking_inference(model: object, data: object) -> object:
from macaw.workers.torch_utils import get_inference_context
with get_inference_context():
return model.predict(data) # type: ignore[attr-defined]Why run_in_executor?
The gRPC server shares the event loop with health checks, graceful shutdown, and concurrent requests. A blocking call on the event loop freezes everything. The executor runs your blocking code in a separate thread.
Error Handling
Use the correct exception type depending on the error source:
| Exception | When | gRPC Status |
|---|---|---|
ModelLoadError | Model cannot be loaded (missing file, OOM, dependency) | Worker fails to start |
TTSSynthesisError | Client input error (empty text, bad params) | INVALID_ARGUMENT |
TTSEngineError | Server-side failure (GPU crash, empty output) | INTERNAL |
AudioFormatError | Invalid audio input (STT) | INVALID_ARGUMENT |
try:
result = await loop.run_in_executor(None, lambda: blocking_fn())
except (TTSSynthesisError, TTSEngineError):
raise # re-raise known exceptions as-is
except Exception as exc:
raise TTSEngineError(self._model_path, str(exc)) from exc # wrap unknownNever let raw exceptions escape -- they become unhelpful gRPC UNKNOWN errors.
AsyncGenerator Typing
When your method uses yield, Python creates an AsyncGenerator (which IS-A AsyncIterator at runtime). mypy does not accept this as a valid override of methods declared with AsyncIterator return type.
Add # type: ignore[override, misc] to the method signature:
async def synthesize( # type: ignore[override, misc]
self,
text: str,
voice: str = "default",
*,
sample_rate: int = TTS_DEFAULT_SAMPLE_RATE,
speed: float = 1.0,
options: dict[str, object] | None = None,
) -> AsyncIterator[bytes]:
yield some_bytesThis applies to both TTSBackend.synthesize() and STTBackend.transcribe_stream(). Do not try to "fix" this by changing the ABC signature or wrapping the generator.
Shared Utilities
Reuse these instead of reimplementing:
| Module | Function / Constant | Purpose |
|---|---|---|
macaw.workers.torch_utils | resolve_device(device_str) | Resolves "auto" to "cuda:0" or "cpu" |
macaw.workers.torch_utils | get_inference_context() | torch.inference_mode() or nullcontext() |
macaw.workers.torch_utils | release_gpu_memory() | torch.cuda.empty_cache() (safe if no torch) |
macaw.workers.audio_utils | pcm_bytes_to_float32(data) | 16-bit PCM bytes to float32 numpy [-1, 1] |
macaw.workers.tts.audio_utils | float32_to_pcm16_bytes(arr) | Float32 numpy to 16-bit PCM bytes |
macaw.workers.tts.audio_utils | CHUNK_SIZE_BYTES | TTS chunk size (lazy-resolved from settings) |
macaw._audio_constants | STT_SAMPLE_RATE (16000) | Standard STT input sample rate |
macaw._audio_constants | TTS_DEFAULT_SAMPLE_RATE (24000) | Standard TTS output sample rate |
macaw._types | TTSEngineCapabilities | TTS capabilities dataclass |
macaw._types | EngineCapabilities | STT capabilities dataclass |
macaw._types | VoiceInfo | Voice metadata (voice_id, name, language, gender) |
macaw.logging | get_logger(name) | Structured logger (structlog) |
macaw.exceptions | ModelLoadError, TTSSynthesisError, etc. | Typed domain exceptions |
Path A: External Engine
External engines live in their own Python package and require zero changes to the Macaw runtime. This is the recommended approach for community engines.
Steps
-
Create your Python package with the backend class implementing
STTBackendorTTSBackend(see examples above). -
Create a
macaw.yamlmanifest alongside your model files:
name: my-custom-tts
version: "1.0.0"
engine: my-tts-engine
type: tts
python_package: my_company.engines.custom_tts
resources:
memory_mb: 2048
gpu_recommended: true
engine_config:
device: auto-
Install your package in the same environment as Macaw.
-
Place the manifest in a directory that the Macaw registry can scan. Each model directory should contain one
macaw.yamlat its root. -
Write tests (unit + integration).
How external loading works
When python_package is set in the manifest, the runtime:
- Imports the module via
importlib.import_module(python_package) - Scans for exactly one concrete subclass of the target ABC (
STTBackendorTTSBackend) - Instantiates and returns it
If zero or multiple subclasses are found, a ModelLoadError is raised. The module path must be a valid Python dotted path.
Path B: Built-In Engine
Built-in engines are part of the Macaw package and require changes to the factory, engine registry, and dependency declarations.
Step 1: Implement the Backend
Create a new file in macaw/workers/tts/ or macaw/workers/stt/. Follow the complete examples above and all patterns in the Required Patterns section.
Step 2: Create a Model Manifest
name: my-model-large
version: "1.0.0"
engine: my-engine
type: stt
resources:
memory_mb: 4096
gpu_recommended: true
capabilities:
architecture: encoder-decoder
streaming: true
hot_words: false
initial_prompt: true
batch_inference: true
word_timestamps: true
engine_config:
beam_size: 5
vad_filter: false
compute_type: float16
device: autovad_filter must be false
Always set vad_filter: false in the manifest. The runtime handles VAD independently. Enabling the engine's internal VAD duplicates work and causes incorrect behavior.
Step 3: Register in the Factory
Add one if block with a lazy import in macaw/workers/{stt,tts}/main.py:
def _create_backend(engine: str, *, python_package: str | None = None) -> TTSBackend:
if python_package:
from macaw.workers._engine_loader import load_external_backend
from macaw.workers.tts.interface import TTSBackend as _TTSBackend
return load_external_backend(python_package, _TTSBackend)
if engine == "kokoro":
from macaw.workers.tts.kokoro import KokoroBackend
return KokoroBackend()
if engine == "qwen3-tts":
from macaw.workers.tts.qwen3 import Qwen3TTSBackend
return Qwen3TTSBackend()
if engine == "chatterbox":
from macaw.workers.tts.chatterbox import ChatterboxTurboBackend
return ChatterboxTurboBackend()
if engine == "my-tts": # <-- add your engine here
from macaw.workers.tts.my_engine import MyTTSBackend
return MyTTSBackend()
msg = f"Unsupported TTS engine: {engine}"
raise ValueError(msg)Lazy imports
Use lazy imports inside each if branch. This way, engine dependencies are only loaded when that specific engine is requested. Users who don't use your engine don't need to install its dependencies.
Step 4: Add to ENGINE_PACKAGE
Map the engine name to its Python package in macaw/engines.py:
ENGINE_PACKAGE: dict[str, str] = {
"faster-whisper": "faster_whisper",
"kokoro": "kokoro",
"qwen3-tts": "qwen_tts",
"qwen3-asr": "qwen_asr",
"chatterbox": "chatterbox",
"my-tts": "my_tts_lib", # <-- add this
}This mapping is used by is_engine_available() to check whether the engine's dependency is installed before spawning a worker.
Step 5: Declare Dependencies
Add the engine dependency as an optional extra in pyproject.toml:
[project.optional-dependencies]
my-tts = ["my-tts-lib>=1.0"]
# Users install with:
# pip install macaw-openvoice[my-tts]If the library lacks type stubs, add a mypy override:
[[tool.mypy.overrides]]
module = "my_tts_lib.*"
ignore_missing_imports = trueStep 6: Write Tests
Test your backend in isolation with mocked inference:
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from macaw.workers.tts.my_engine import MyTTSBackend
class TestMyTTSBackend:
async def test_capabilities(self):
backend = MyTTSBackend()
caps = await backend.capabilities()
assert caps.supports_streaming is False
assert caps.supports_voice_cloning is False
async def test_load_raises_when_dependency_missing(self):
backend = MyTTSBackend()
with patch("macaw.workers.tts.my_engine._MyTTSModel", None):
with pytest.raises(Exception, match="not installed"):
await backend.load("/fake/path", {"device": "cpu"})
async def test_synthesize_empty_text_raises(self):
backend = MyTTSBackend()
backend._model = MagicMock()
backend._model_path = "/fake"
with pytest.raises(Exception, match="Empty text"):
async for _ in backend.synthesize(" "):
pass
async def test_health_not_loaded(self):
backend = MyTTSBackend()
status = await backend.health()
assert status["status"] == "not_loaded"
async def test_health_loaded(self):
backend = MyTTSBackend()
backend._model = MagicMock()
status = await backend.health()
assert status["status"] == "ok"Post-Load Hook
Both STTBackend and TTSBackend provide an optional post_load_hook() method that runs after load() and before warmup. Use it to load auxiliary models that depend on the main model:
class MyTTSBackend(TTSBackend):
async def load(self, model_path: str, config: dict[str, object]) -> None:
self._model = load_main_model(model_path)
async def post_load_hook(self) -> None:
# Load vocoder that requires the main model's config
self._vocoder = load_vocoder(self._model.config)Common use cases: vocoder loading, speaker embedding extractors, language-specific adapters, punctuation models.
Checklist
Before submitting your engine:
- Implements all abstract methods from
STTBackendorTTSBackend - Import guard at module level (
try/except ImportError) -
load()raisesModelLoadErrorwhen dependency is missing - All blocking calls use
run_in_executor -
unload()callsrelease_gpu_memory() - No raw exceptions escape (all wrapped in typed domain exceptions)
-
# type: ignore[override, misc]on generator methods -
architectureproperty returns the correct type (STT only) -
capabilities()accurately reflects engine features -
vad_filter: falsein the manifest -
macaw.yamlincludes required fields:name,version,engine,type,resources - Optional dependency declared in
pyproject.toml(built-in only) - Engine registered in factory and
ENGINE_PACKAGE(built-in only) - Unit tests with mocked inference
- Integration tests marked with
@pytest.mark.integration -
make checkpasses (ruff + mypy)
What You Don't Need to Touch
The engine-agnostic design means you do not modify:
| Component | Reason |
|---|---|
| API Server | Routes are engine-agnostic |
| Session Manager | Adapts automatically via architecture property |
| VAD Pipeline | Runs before audio reaches the engine |
| Preprocessing | Engines receive normalized PCM 16kHz |
| Postprocessing (ITN) | Runs after transcription, independent of engine |
| Scheduler | Routes requests by model name, not engine type |
| CLI | Commands work with any registered model |