MacawMacaw OpenVoice
Guides

Adding an Engine

Macaw is engine-agnostic. Adding a new STT or TTS engine requires implementing the backend interface, creating a model manifest, and writing tests.

There are two paths:

PathWhen to useChanges to Macaw
External (recommended)Third-party or community enginesZero
Built-inEngines shipped with the packageFactory + engines.py + pyproject.toml

Both paths share the same backend interface, lifecycle, and required patterns.

Backend Lifecycle

Every engine goes through this lifecycle inside the gRPC worker subprocess:

_create_backend(engine)          # instantiate (no heavy work)


backend.load(model_path, config) # load model into GPU/CPU


backend.post_load_hook()         # optional: load auxiliary models


warmup (3 inference passes)      # prime JIT/CUDA caches


serve gRPC requests              # synthesize() / transcribe_file() / transcribe_stream()


backend.unload()                 # free GPU memory, cleanup

Lifecycle guarantees

  • load() is where you check for missing dependencies (import guard).
  • post_load_hook() is for auxiliary models that depend on the main model.
  • If load() or post_load_hook() raises, the worker fails to start. This is correct.
  • unload() must release all GPU memory (call release_gpu_memory()).

Complete TTS Example

Minimal TTS backend (simplified from macaw/workers/tts/chatterbox.py):

macaw/workers/tts/my_engine.py
"""TTS backend for MyTTS engine.

my-tts-lib is an optional dependency -- the import is guarded.
"""

from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING

import numpy as np

from macaw._audio_constants import TTS_DEFAULT_SAMPLE_RATE
from macaw._types import TTSEngineCapabilities, VoiceInfo
from macaw.exceptions import ModelLoadError, TTSEngineError, TTSSynthesisError
from macaw.logging import get_logger
from macaw.workers.torch_utils import release_gpu_memory, resolve_device
from macaw.workers.tts.audio_utils import CHUNK_SIZE_BYTES, float32_to_pcm16_bytes
from macaw.workers.tts.interface import TTSBackend

if TYPE_CHECKING:
    from collections.abc import AsyncIterator

# --- Import guard: fail at load() time, not at import time ---
try:
    from my_tts_lib import MyTTSModel as _MyTTSModel
except ImportError:
    _MyTTSModel = None

logger = get_logger("worker.tts.my_engine")


class MyTTSBackend(TTSBackend):
    """TTS backend using MyTTS."""

    def __init__(self) -> None:
        self._model: object | None = None
        self._model_path: str = ""
        self._sample_rate: int = TTS_DEFAULT_SAMPLE_RATE

    async def capabilities(self) -> TTSEngineCapabilities:
        return TTSEngineCapabilities(
            supports_streaming=False,
            supports_voice_cloning=False,
        )

    async def load(self, model_path: str, config: dict[str, object]) -> None:
        # Check import guard FIRST
        if _MyTTSModel is None:
            msg = (
                "my-tts-lib is not installed. "
                "Install with: pip install macaw-openvoice[my-tts]"
            )
            raise ModelLoadError(model_path, msg)

        device = resolve_device(str(config.get("device", "auto")))

        # Blocking model load -> run in executor to avoid blocking the event loop
        loop = asyncio.get_running_loop()
        try:
            self._model = await loop.run_in_executor(
                None,
                lambda: _MyTTSModel.from_pretrained(model_path, device=device),
            )
        except Exception as exc:
            raise ModelLoadError(model_path, str(exc)) from exc

        self._model_path = model_path
        logger.info("model_loaded", model_path=model_path, device=device)

    # AsyncGenerator is a subtype of AsyncIterator but mypy doesn't recognize
    # yield-based overrides. The type: ignore is correct and expected.
    async def synthesize(  # type: ignore[override, misc]
        self,
        text: str,
        voice: str = "default",
        *,
        sample_rate: int = TTS_DEFAULT_SAMPLE_RATE,
        speed: float = 1.0,
        options: dict[str, object] | None = None,
    ) -> AsyncIterator[bytes]:
        if self._model is None:
            msg = "Model not loaded. Call load() first."
            raise ModelLoadError("unknown", msg)

        if not text.strip():
            raise TTSSynthesisError(self._model_path, "Empty text")

        # Blocking inference -> run in executor
        loop = asyncio.get_running_loop()
        try:
            audio_bytes = await loop.run_in_executor(
                None,
                lambda: _synthesize_blocking(self._model, text, voice),
            )
        except (TTSSynthesisError, TTSEngineError):
            raise
        except Exception as exc:
            raise TTSEngineError(self._model_path, str(exc)) from exc

        if len(audio_bytes) == 0:
            raise TTSEngineError(self._model_path, "Synthesis returned empty audio")

        # Yield in chunks for gRPC streaming
        for i in range(0, len(audio_bytes), CHUNK_SIZE_BYTES):
            yield audio_bytes[i : i + CHUNK_SIZE_BYTES]

    async def voices(self) -> list[VoiceInfo]:
        return [
            VoiceInfo(
                voice_id="default",
                name="default",
                language="en",
                gender="neutral",
            ),
        ]

    async def unload(self) -> None:
        if self._model is not None:
            del self._model
            self._model = None
            release_gpu_memory()
        self._model_path = ""
        logger.info("model_unloaded")

    async def health(self) -> dict[str, str]:
        if self._model is not None:
            return {"status": "ok"}
        return {"status": "not_loaded"}


# --- Pure helper (runs in executor thread) ---

def _synthesize_blocking(model: object, text: str, voice: str) -> bytes:
    """Run synthesis in a blocking context (called via run_in_executor)."""
    from macaw.workers.torch_utils import get_inference_context

    with get_inference_context():
        wav = model.generate(text, voice=voice)  # type: ignore[attr-defined]

    audio_array = np.asarray(wav, dtype=np.float32).squeeze()
    return float32_to_pcm16_bytes(audio_array)

Streaming TTS

The synthesize() method returns an AsyncIterator[bytes] — not a single bytes object. This enables streaming with low TTFB (Time to First Byte). Yield audio chunks as they become available rather than waiting for the full synthesis to complete.

Complete STT Example

Minimal STT backend (simplified from macaw/workers/stt/qwen3_asr.py):

macaw/workers/stt/my_asr.py
"""STT backend for MyASR engine.

my-asr-lib is an optional dependency -- the import is guarded.
"""

from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING

import numpy as np

from macaw._audio_constants import STT_SAMPLE_RATE
from macaw._types import (
    BatchResult,
    EngineCapabilities,
    SegmentDetail,
    STTArchitecture,
    TranscriptSegment,
)
from macaw.exceptions import AudioFormatError, ModelLoadError
from macaw.logging import get_logger
from macaw.workers.audio_utils import pcm_bytes_to_float32
from macaw.workers.stt.interface import STTBackend
from macaw.workers.torch_utils import release_gpu_memory, resolve_device

if TYPE_CHECKING:
    from collections.abc import AsyncIterator

# --- Import guard ---
try:
    from my_asr_lib import MyASRModel as _MyASRModel
except ImportError:
    _MyASRModel = None

logger = get_logger("worker.stt.my_asr")


class MyASRBackend(STTBackend):
    """STT backend using MyASR (encoder-decoder)."""

    def __init__(self) -> None:
        self._model: object | None = None
        self._model_path: str = ""

    @property
    def architecture(self) -> STTArchitecture:
        return STTArchitecture.ENCODER_DECODER

    async def load(self, model_path: str, config: dict[str, object]) -> None:
        if _MyASRModel is None:
            msg = (
                "my-asr-lib is not installed. "
                "Install with: pip install macaw-openvoice[my-asr]"
            )
            raise ModelLoadError(model_path, msg)

        device = resolve_device(str(config.get("device", "auto")))

        loop = asyncio.get_running_loop()
        try:
            self._model = await loop.run_in_executor(
                None,
                lambda: _MyASRModel.from_pretrained(model_path, device=device),
            )
        except Exception as exc:
            raise ModelLoadError(model_path, str(exc)) from exc

        self._model_path = model_path
        logger.info("model_loaded", model_path=model_path, device=device)

    async def capabilities(self) -> EngineCapabilities:
        return EngineCapabilities(
            supports_hot_words=False,
            supports_initial_prompt=False,
            supports_batch=True,
            supports_word_timestamps=False,
            max_concurrent_sessions=1,
        )

    async def transcribe_file(
        self,
        audio_data: bytes,
        language: str | None = None,
        initial_prompt: str | None = None,
        hot_words: list[str] | None = None,
        temperature: float = 0.0,
        word_timestamps: bool = False,
    ) -> BatchResult:
        if self._model is None:
            msg = "Model not loaded. Call load() first."
            raise ModelLoadError("unknown", msg)

        if not audio_data:
            raise AudioFormatError("Empty audio")

        audio_array = pcm_bytes_to_float32(audio_data)

        loop = asyncio.get_running_loop()
        result = await loop.run_in_executor(
            None,
            lambda: _transcribe_blocking(self._model, audio_array, language),
        )

        duration = float(len(audio_array)) / STT_SAMPLE_RATE
        return BatchResult(
            text=result["text"],
            language=result["language"],
            duration=duration,
            segments=(
                SegmentDetail(id=0, start=0.0, end=duration, text=result["text"]),
            ),
        )

    # AsyncGenerator is a subtype of AsyncIterator but mypy doesn't recognize
    # yield-based overrides. The type: ignore is correct and expected.
    async def transcribe_stream(  # type: ignore[override, misc]
        self,
        audio_chunks: AsyncIterator[bytes],
        language: str | None = None,
        initial_prompt: str | None = None,
        hot_words: list[str] | None = None,
    ) -> AsyncIterator[TranscriptSegment]:
        if self._model is None:
            msg = "Model not loaded. Call load() first."
            raise ModelLoadError("unknown", msg)

        # Accumulate-and-transcribe pattern for encoder-decoder
        buffer: list[np.ndarray] = []
        threshold_samples = int(5.0 * STT_SAMPLE_RATE)
        total_samples = 0
        segment_id = 0

        async for chunk in audio_chunks:
            if not chunk:
                break
            audio = pcm_bytes_to_float32(chunk)
            buffer.append(audio)
            total_samples += len(audio)

            if total_samples >= threshold_samples:
                accumulated = np.concatenate(buffer)
                loop = asyncio.get_running_loop()
                result = await loop.run_in_executor(
                    None,
                    lambda: _transcribe_blocking(
                        self._model, accumulated, language
                    ),
                )
                yield TranscriptSegment(
                    text=result["text"],
                    is_final=True,
                    segment_id=segment_id,
                    start_ms=0,
                    end_ms=int(float(total_samples) / STT_SAMPLE_RATE * 1000),
                    language=result["language"],
                )
                segment_id += 1
                buffer = []
                total_samples = 0

        # Flush remaining audio
        if buffer:
            accumulated = np.concatenate(buffer)
            if len(accumulated) > 0:
                loop = asyncio.get_running_loop()
                result = await loop.run_in_executor(
                    None,
                    lambda: _transcribe_blocking(
                        self._model, accumulated, language
                    ),
                )
                yield TranscriptSegment(
                    text=result["text"],
                    is_final=True,
                    segment_id=segment_id,
                    start_ms=0,
                    end_ms=int(float(total_samples) / STT_SAMPLE_RATE * 1000),
                    language=result["language"],
                )

    async def unload(self) -> None:
        if self._model is not None:
            del self._model
            self._model = None
            release_gpu_memory()
        self._model_path = ""
        logger.info("model_unloaded")

    async def health(self) -> dict[str, str]:
        if self._model is not None:
            return {"status": "ok"}
        return {"status": "not_loaded"}


# --- Pure helper (runs in executor thread) ---

def _transcribe_blocking(
    model: object, audio: np.ndarray, language: str | None
) -> dict[str, str]:
    from macaw.workers.torch_utils import get_inference_context

    with get_inference_context():
        result = model.transcribe(  # type: ignore[attr-defined]
            audio=(audio, STT_SAMPLE_RATE), language=language,
        )

    return {"text": str(result.text).strip(), "language": str(result.language)}

Architecture Property (STT)

STT backends must declare their architecture via a read-only property. The runtime uses this to adapt the streaming pipeline automatically:

ArchitectureLocalAgreementCross-segment ContextNative Partials
encoder-decoderYes (confirms tokens across passes)Yes (224 tokens from previous segment)No
ctcNo (not needed)No (initial_prompt not supported)Yes
streaming-nativeNo (not needed)NoYes
from macaw._types import STTArchitecture

@property
def architecture(self) -> STTArchitecture:
    return STTArchitecture.ENCODER_DECODER

Set the right architecture

Choosing the wrong architecture causes incorrect streaming behavior. If your engine produces native partial transcripts, use ctc or streaming-native. If it needs multiple inference passes to produce stable output, use encoder-decoder.

Required Patterns

Import Guard

Engine dependencies are optional. Guard the import at module level and check in load():

Import guard pattern
try:
    from some_engine import Model as _Model
except ImportError:
    _Model = None

class MyBackend(TTSBackend):
    async def load(self, model_path: str, config: dict[str, object]) -> None:
        if _Model is None:
            msg = (
                "some-engine is not installed. "
                "Install with: pip install macaw-openvoice[some-engine]"
            )
            raise ModelLoadError(model_path, msg)
        # ... proceed with loading

Why this matters

Workers run as subprocesses. If an import fails at module level, the subprocess crashes with a confusing traceback. The import guard gives a clear ModelLoadError with install instructions.

Executor Pattern

All blocking calls (model loading, inference) must run in an executor to avoid blocking the asyncio event loop:

Executor pattern
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
    None,  # default ThreadPoolExecutor
    lambda: blocking_function(args),
)

Use get_inference_context() inside the blocking helper to wrap inference in torch.inference_mode() (or nullcontext() if torch is unavailable):

Blocking helper with inference context
def _my_blocking_inference(model: object, data: object) -> object:
    from macaw.workers.torch_utils import get_inference_context

    with get_inference_context():
        return model.predict(data)  # type: ignore[attr-defined]

Why run_in_executor?

The gRPC server shares the event loop with health checks, graceful shutdown, and concurrent requests. A blocking call on the event loop freezes everything. The executor runs your blocking code in a separate thread.

Error Handling

Use the correct exception type depending on the error source:

ExceptionWhengRPC Status
ModelLoadErrorModel cannot be loaded (missing file, OOM, dependency)Worker fails to start
TTSSynthesisErrorClient input error (empty text, bad params)INVALID_ARGUMENT
TTSEngineErrorServer-side failure (GPU crash, empty output)INTERNAL
AudioFormatErrorInvalid audio input (STT)INVALID_ARGUMENT
Error handling pattern for synthesis/transcription
try:
    result = await loop.run_in_executor(None, lambda: blocking_fn())
except (TTSSynthesisError, TTSEngineError):
    raise  # re-raise known exceptions as-is
except Exception as exc:
    raise TTSEngineError(self._model_path, str(exc)) from exc  # wrap unknown

Never let raw exceptions escape -- they become unhelpful gRPC UNKNOWN errors.

AsyncGenerator Typing

When your method uses yield, Python creates an AsyncGenerator (which IS-A AsyncIterator at runtime). mypy does not accept this as a valid override of methods declared with AsyncIterator return type.

Add # type: ignore[override, misc] to the method signature:

async def synthesize(  # type: ignore[override, misc]
    self,
    text: str,
    voice: str = "default",
    *,
    sample_rate: int = TTS_DEFAULT_SAMPLE_RATE,
    speed: float = 1.0,
    options: dict[str, object] | None = None,
) -> AsyncIterator[bytes]:
    yield some_bytes

This applies to both TTSBackend.synthesize() and STTBackend.transcribe_stream(). Do not try to "fix" this by changing the ABC signature or wrapping the generator.

Shared Utilities

Reuse these instead of reimplementing:

ModuleFunction / ConstantPurpose
macaw.workers.torch_utilsresolve_device(device_str)Resolves "auto" to "cuda:0" or "cpu"
macaw.workers.torch_utilsget_inference_context()torch.inference_mode() or nullcontext()
macaw.workers.torch_utilsrelease_gpu_memory()torch.cuda.empty_cache() (safe if no torch)
macaw.workers.audio_utilspcm_bytes_to_float32(data)16-bit PCM bytes to float32 numpy [-1, 1]
macaw.workers.tts.audio_utilsfloat32_to_pcm16_bytes(arr)Float32 numpy to 16-bit PCM bytes
macaw.workers.tts.audio_utilsCHUNK_SIZE_BYTESTTS chunk size (lazy-resolved from settings)
macaw._audio_constantsSTT_SAMPLE_RATE (16000)Standard STT input sample rate
macaw._audio_constantsTTS_DEFAULT_SAMPLE_RATE (24000)Standard TTS output sample rate
macaw._typesTTSEngineCapabilitiesTTS capabilities dataclass
macaw._typesEngineCapabilitiesSTT capabilities dataclass
macaw._typesVoiceInfoVoice metadata (voice_id, name, language, gender)
macaw.loggingget_logger(name)Structured logger (structlog)
macaw.exceptionsModelLoadError, TTSSynthesisError, etc.Typed domain exceptions

Path A: External Engine

External engines live in their own Python package and require zero changes to the Macaw runtime. This is the recommended approach for community engines.

Steps

  1. Create your Python package with the backend class implementing STTBackend or TTSBackend (see examples above).

  2. Create a macaw.yaml manifest alongside your model files:

models/my-model/macaw.yaml
name: my-custom-tts
version: "1.0.0"
engine: my-tts-engine
type: tts
python_package: my_company.engines.custom_tts
resources:
  memory_mb: 2048
  gpu_recommended: true
engine_config:
  device: auto
  1. Install your package in the same environment as Macaw.

  2. Place the manifest in a directory that the Macaw registry can scan. Each model directory should contain one macaw.yaml at its root.

  3. Write tests (unit + integration).

How external loading works

When python_package is set in the manifest, the runtime:

  1. Imports the module via importlib.import_module(python_package)
  2. Scans for exactly one concrete subclass of the target ABC (STTBackend or TTSBackend)
  3. Instantiates and returns it

If zero or multiple subclasses are found, a ModelLoadError is raised. The module path must be a valid Python dotted path.

Path B: Built-In Engine

Built-in engines are part of the Macaw package and require changes to the factory, engine registry, and dependency declarations.

Step 1: Implement the Backend

Create a new file in macaw/workers/tts/ or macaw/workers/stt/. Follow the complete examples above and all patterns in the Required Patterns section.

Step 2: Create a Model Manifest

models/my-model/macaw.yaml
name: my-model-large
version: "1.0.0"
engine: my-engine
type: stt
resources:
  memory_mb: 4096
  gpu_recommended: true
capabilities:
  architecture: encoder-decoder
  streaming: true
  hot_words: false
  initial_prompt: true
  batch_inference: true
  word_timestamps: true
engine_config:
  beam_size: 5
  vad_filter: false
  compute_type: float16
  device: auto

vad_filter must be false

Always set vad_filter: false in the manifest. The runtime handles VAD independently. Enabling the engine's internal VAD duplicates work and causes incorrect behavior.

Step 3: Register in the Factory

Add one if block with a lazy import in macaw/workers/{stt,tts}/main.py:

macaw/workers/tts/main.py — _create_backend()
def _create_backend(engine: str, *, python_package: str | None = None) -> TTSBackend:
    if python_package:
        from macaw.workers._engine_loader import load_external_backend
        from macaw.workers.tts.interface import TTSBackend as _TTSBackend

        return load_external_backend(python_package, _TTSBackend)

    if engine == "kokoro":
        from macaw.workers.tts.kokoro import KokoroBackend
        return KokoroBackend()

    if engine == "qwen3-tts":
        from macaw.workers.tts.qwen3 import Qwen3TTSBackend
        return Qwen3TTSBackend()

    if engine == "chatterbox":
        from macaw.workers.tts.chatterbox import ChatterboxTurboBackend
        return ChatterboxTurboBackend()

    if engine == "my-tts":  # <-- add your engine here
        from macaw.workers.tts.my_engine import MyTTSBackend
        return MyTTSBackend()

    msg = f"Unsupported TTS engine: {engine}"
    raise ValueError(msg)

Lazy imports

Use lazy imports inside each if branch. This way, engine dependencies are only loaded when that specific engine is requested. Users who don't use your engine don't need to install its dependencies.

Step 4: Add to ENGINE_PACKAGE

Map the engine name to its Python package in macaw/engines.py:

macaw/engines.py
ENGINE_PACKAGE: dict[str, str] = {
    "faster-whisper": "faster_whisper",
    "kokoro": "kokoro",
    "qwen3-tts": "qwen_tts",
    "qwen3-asr": "qwen_asr",
    "chatterbox": "chatterbox",
    "my-tts": "my_tts_lib",        # <-- add this
}

This mapping is used by is_engine_available() to check whether the engine's dependency is installed before spawning a worker.

Step 5: Declare Dependencies

Add the engine dependency as an optional extra in pyproject.toml:

pyproject.toml
[project.optional-dependencies]
my-tts = ["my-tts-lib>=1.0"]

# Users install with:
# pip install macaw-openvoice[my-tts]

If the library lacks type stubs, add a mypy override:

pyproject.toml
[[tool.mypy.overrides]]
module = "my_tts_lib.*"
ignore_missing_imports = true

Step 6: Write Tests

Test your backend in isolation with mocked inference:

tests/unit/test_my_engine_backend.py
import pytest
from unittest.mock import AsyncMock, MagicMock, patch

from macaw.workers.tts.my_engine import MyTTSBackend


class TestMyTTSBackend:
    async def test_capabilities(self):
        backend = MyTTSBackend()
        caps = await backend.capabilities()
        assert caps.supports_streaming is False
        assert caps.supports_voice_cloning is False

    async def test_load_raises_when_dependency_missing(self):
        backend = MyTTSBackend()
        with patch("macaw.workers.tts.my_engine._MyTTSModel", None):
            with pytest.raises(Exception, match="not installed"):
                await backend.load("/fake/path", {"device": "cpu"})

    async def test_synthesize_empty_text_raises(self):
        backend = MyTTSBackend()
        backend._model = MagicMock()
        backend._model_path = "/fake"
        with pytest.raises(Exception, match="Empty text"):
            async for _ in backend.synthesize("  "):
                pass

    async def test_health_not_loaded(self):
        backend = MyTTSBackend()
        status = await backend.health()
        assert status["status"] == "not_loaded"

    async def test_health_loaded(self):
        backend = MyTTSBackend()
        backend._model = MagicMock()
        status = await backend.health()
        assert status["status"] == "ok"

Post-Load Hook

Both STTBackend and TTSBackend provide an optional post_load_hook() method that runs after load() and before warmup. Use it to load auxiliary models that depend on the main model:

Post-load hook example
class MyTTSBackend(TTSBackend):
    async def load(self, model_path: str, config: dict[str, object]) -> None:
        self._model = load_main_model(model_path)

    async def post_load_hook(self) -> None:
        # Load vocoder that requires the main model's config
        self._vocoder = load_vocoder(self._model.config)

Common use cases: vocoder loading, speaker embedding extractors, language-specific adapters, punctuation models.

Checklist

Before submitting your engine:

  • Implements all abstract methods from STTBackend or TTSBackend
  • Import guard at module level (try/except ImportError)
  • load() raises ModelLoadError when dependency is missing
  • All blocking calls use run_in_executor
  • unload() calls release_gpu_memory()
  • No raw exceptions escape (all wrapped in typed domain exceptions)
  • # type: ignore[override, misc] on generator methods
  • architecture property returns the correct type (STT only)
  • capabilities() accurately reflects engine features
  • vad_filter: false in the manifest
  • macaw.yaml includes required fields: name, version, engine, type, resources
  • Optional dependency declared in pyproject.toml (built-in only)
  • Engine registered in factory and ENGINE_PACKAGE (built-in only)
  • Unit tests with mocked inference
  • Integration tests marked with @pytest.mark.integration
  • make check passes (ruff + mypy)

What You Don't Need to Touch

The engine-agnostic design means you do not modify:

ComponentReason
API ServerRoutes are engine-agnostic
Session ManagerAdapts automatically via architecture property
VAD PipelineRuns before audio reaches the engine
PreprocessingEngines receive normalized PCM 16kHz
Postprocessing (ITN)Runs after transcription, independent of engine
SchedulerRoutes requests by model name, not engine type
CLICommands work with any registered model