Flowcat

A native-Rust runtime for real-time voice agents — built to run on your own infrastructure. Flowcat carries a phone or WebRTC call through a composable media pipeline — transport in → VAD / turn-taking → STT · LLM · TTS (or a single speech-to-speech model) → transport out — as one self-contained binary you deploy in your own VPC (or fully air-gapped). No hosted control plane, no phone-home, no Python or FreeSWITCH sidecar to operate. You bring your own provider credentials; a call's audio and data never leave infrastructure you control.

It is a clean-room, native-Rust counterpart to the design of pipecat: the same FrameProcessor pipeline model and the same provider breadth, packaged for teams that need to own the stack — self-hosted, auditable, and dense enough to run serious call volume per box.

License: Apache-2.0 · Status: pre-1.0, building in the open.

New here? The Quickstart takes you from git clone to a running pipeline, a real WebSocket audio round-trip, and a Python-driven brain in about five minutes — no credentials.

Where to go next

Building on Flowcat? Follow the path in order:

  1. Quickstart — clone → build → watch real audio move.
  2. Build an embedder — the host binary that carries a call.
  3. Configuration — runtime knobs and credentials.
  4. Providers & features — the STT / TTS / LLM / transport surface.
  5. Deployment — ship a release binary in your own VPC.

Contributing to Flowcat? Start with Contributing (build, test, add a provider) and the architecture docs beside it.

This site is generated from the Markdown in the Flowcat repository with mdBook.

Flowcat quickstart

From git clone to "I've watched the runtime move real audio and drive a conversation" in about five minutes — no credentials, no accounts, no cloud.

What this gets you: a built Flowcat, the FrameProcessor pipeline moving audio end-to-end, a real WebSocket media round-trip, and your conversation policy driven from Python over the RemoteBrain seam.

What it does not get you (yet): a live PSTN phone call. That needs a small embedder you wire to your carrier and control plane — step 5 shows exactly what that piece is and where the live-verified path starts. We'd rather you hit a working demo in five minutes than a phone call that needs an hour of setup to fail.

Everything in steps 1–4 is exercised in CI, so it runs on the first try.

Prerequisites

  • A recent stable Rust toolchain (rustup) — cargo on your PATH.
  • Python 3 (standard library only) for step 4 — no pip install.
  • git.

1. Clone & build

git clone https://github.com/AreevAI/flowcat.git
cd flowcat
cargo build -p flowcat-cli      # default features → no provider/network deps

The default build pulls no provider client dependencies — every STT/TTS/LLM, transport, and exporter is an opt-in Cargo feature. The first build compiles the workspace (a minute or two); after that, runs are instant.

2. Watch the pipeline move audio

cargo run -p flowcat-cli -- pipeline

A synthetic 440 Hz sine wave is pumped through a composable FrameProcessor graph (Source → Echo → Tap → Sink) while a FrameObserver counts frames:

flowcat pipeline demo
  source        : 440 Hz sine, 16000 Hz mono, 320-sample frames
  audio         : 50 frames (~1.00 s)
  chain         : Source -> Echo -> Tap -> Sink
  frames in     : 50 (InputAudio observed)
  frames out    : 50 (OutputAudio observed)
  echoed        : 50 (counted in Echo)
  wall time     : 2.071 ms
  result        : OK (in == out == sourced)

This is Flowcat's core: each stage is its own tokio task behind a bounded channel (natural backpressure), and the hot audio frame is an Arc — each hop moves a pointer, not a buffer. in == out == sourced means nothing was dropped.

3. Real audio over the WebSocket transport

cargo run -p flowcat-cli -- ws-echo

This stands up the actual generic WebSocket media transport — the same one a WS-media carrier connects to — streams PCM frames through it, and echoes them back, asserting they return byte-for-byte:

ws-echo: loopback server listening on ws://127.0.0.1:<port>
ws-echo: stream started (call_id=loopback)
ws-echo: echoed frame 1 (7 samples)
ws-echo: echoed frame 2 (6 samples)
ws-echo: echoed frame 3 (64 samples)
ws-echo: stream stopped after 3 echoed frame(s)
ws-echo: loopback OK — 3 frame(s) round-tripped byte-for-byte (3 echoed server-side)

Pass --connect ws://<host>:<port> to point the echo at a live peer instead of the in-process loopback.

4. Drive the conversation from Python

You don't have to write Rust to control the agent. Flowcat consults a "brain" at turn granularity (between turns) — your Python never touches the per-audio-frame path, so the runtime's latency profile is unaffected. Start the pure-stdlib reference server:

python3 examples/python-remote-brain/brain_server.py   # http://127.0.0.1:8080

In another terminal, play the role Flowcat plays on a call — start a session, then interpret a model tool call:

curl -s -X POST http://127.0.0.1:8080/session \
  -H 'Content-Type: application/json' \
  -d '{"brain_config":{"graph":"demo"},"provider":"gemini"}'
{ "system_prompt": "You are a friendly receptionist. Greet the caller and ask how you can help.",
  "tools": [ { "name": "book_appointment", "...": "..." }, { "name": "end_call", "...": "..." } ],
  "node_id": "greeting", "collected_vars": {} }
curl -s -X POST http://127.0.0.1:8080/tool-call \
  -H 'Content-Type: application/json' \
  -d '{"node_id":"greeting","tool":{"name":"book_appointment","args":{"day":"Tuesday"}},"collected_vars":{}}'
{ "action": "transition",
  "system_prompt": "Confirm the appointment day with the caller, then ask them to say 'confirm'.",
  "tools": [ { "name": "confirm", "...": "..." }, { "name": "end_call", "...": "..." } ],
  "say": "Sure — booking you for Tuesday. Shall I confirm?",
  "node_id": "confirm", "collected_vars": { "requested_day": "Tuesday" }, "finished": false }

That's the whole RemoteBrain wire contract: /session seeds state, and /tool-call returns one of transition / stay / end. Replace the decide() function in brain_server.py with your own logic — an LLM call, a DB lookup, a state machine. A Rust embedder wires this in with RemoteBrain::connect(...); see examples/python-remote-brain. To expose Python functions as model tools instead, see examples/python-mcp-tools.

5. From here to a live phone call

The steps above run the runtime in isolation. A real inbound/outbound PSTN call adds the one piece Flowcat deliberately leaves to you — the embedder: a small host binary that

  • terminates the call — the native in-process SipTransport (SIP/RTP, no softswitch required), or a carrier WebSocket transport if you already run one;
  • resolves & finalizes the call — your SessionSource, talking to your control plane (routing, auth, recording/transcript upload);
  • supplies the brain — your own AgentBrain, or the RemoteBrain from step 4.

Flowcat owns the media loop; you own the contract, routing, and credentials — which is what keeps the whole call on infrastructure you control. The combination verified end-to-end today is Gemini Live (speech-to-speech) + Plivo telephony, so start there. The trait seams and full call lifecycle are specified in DESIGN.md; the provider/transport surface and the "use it from Python" model are in the README.

Fully on-prem / air-gapped? Swap the cloud providers for the local connectors (Whisper STT; Kokoro / Piper / XTTS TTS; Ollama LLM) and no call audio ever leaves your infrastructure.

Troubleshooting

  • cargo: command not found — install Rust via rustup and reopen your shell.
  • First build is slow — that's the one-time dependency compile; re-runs are instant.
  • Port 8080 already in use — change PORT near the top of brain_server.py.
  • Run the full offline test suitecargo test (no network, no credentials).

Next: ready to carry a real call? → Build an embedder.

Build an embedder

The Quickstart runs the runtime in isolation. To carry a real call you write one small host binary — the embedder — that Flowcat deliberately leaves to you. This is the piece that keeps the whole call on infrastructure you control: Flowcat owns the media loop; you own the call contract, routing, and credentials.

The repo ships no embedder — only the credential-free flowcat demos. The wiring shown here lives in flowcat-core's own tests (pipeline/s2s.rs). The fastest on-ramp without writing Rust brain logic is the Python RemoteBrain.

The four seams

An embedder supplies four things, then calls one builder. Two have ready-made implementations you can use as-is; two you implement over your own systems.

SeamTraitUse the built-in…or implement for…
Media in/outMediaTransportSipAgent / SipTransport (native SIP/RTP), or a WS carriera custom transport
The modelRealtimeLlm + RealtimeKickoffGeminiLive (speech-to-speech)another realtime model
The conversationAgentBrainRemoteBrain (HTTP, step 4)your own engine, in-process
Call resolution + finalizeSessionSourceyour control plane (always yours)

AgentBrain — what the conversation does

Synchronous on purpose: pure decision logic, no I/O inside.

#![allow(unused)]
fn main() {
pub trait AgentBrain: Send {
    fn system_prompt(&self) -> String;
    fn tools(&self) -> Vec<ToolDecl>;
    fn current_node_id(&self) -> String;
    fn on_tool_call(&mut self, name: &str, args: &serde_json::Value) -> BrainAction;
    fn is_finished(&self) -> bool;
    fn collected_vars(&self) -> serde_json::Value;
}
}

Implement it over your graph/state machine, or use RemoteBrain to drive decisions from an HTTP service (your Python, your DB, your LLM) — same trait, over the wire.

SessionSource — your control plane

How Flowcat resolves a call to a config and reports the result back. Always yours; Flowcat never sees your API contract.

#![allow(unused)]
fn main() {
#[async_trait]
pub trait SessionSource: Send + Sync {
    async fn resolve(&self, run_id: i64, token: &str) -> Result<ResolvedCall, FlowcatError>;
    async fn complete(&self, run_id: i64, token: &str, fin: Finalize) -> Result<(), FlowcatError>;
    async fn artifact_upload_url(/* run_id, token, kind */) -> Result<UploadTarget, FlowcatError>;
    async fn put_bytes(/* url, bytes, content_type */) -> Result<(), FlowcatError>;
    async fn node_tools(/* run_id, token, node_id */) -> Result<Vec<ToolDecl>, FlowcatError>;
    async fn tool_call(/* run_id, token, node_id, tool, args */) -> Result<String, FlowcatError>;
}
}

Exact signatures and the ResolvedCall / Finalize / UploadTarget shapes: flowcat-core/src/session.rs.

The entry point

One builder wires the four seams into the running pipeline and returns a task you drive to completion:

#![allow(unused)]
fn main() {
pub async fn build_s2s_task<T, R, B, S>(
    transport: T,        // MediaTransport       — e.g. SipTransport
    realtime: R,         // RealtimeLlm+Kickoff  — e.g. GeminiLive
    brain: B,            // AgentBrain           — e.g. RemoteBrain
    session: S,          // SessionSource        — yours
    run_id: i64,
    token: String,
    model: String,
) -> Result<S2sTask>;
}

A minimal SIP embedder

Illustrative — accept inbound SIP calls and drive each with Gemini Live + a remote brain. Constructor argument lists are abbreviated; take the exact ones from the linked source / API reference.

// RemoteBrain needs a multi-threaded runtime.
#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 1. Bring up the native SIP user-agent and register with your trunk.
    let agent = SipAgent::start(SipConfig {
        server:   "sip:sip.example.com".into(),
        login:    std::env::var("SIP_LOGIN")?,
        password: std::env::var("SIP_PASSWORD")?,
        caller_id:"+15551230000".into(),
        public_ip: None,                  // set on a NAT'd host
        sip_port: None,                   // → 5060
        rtp_port_base: 16000,
        rtp_port_tries: 200,              // caps concurrent calls
    }).await?;
    agent.register().await?;

    // 2. One call per inbound INVITE.
    while let Some(invite) = agent.next_inbound().await {
        let transport = invite.answer().await?;      // → SipTransport (MediaTransport)

        let realtime = GeminiLive::new(/* api_key, voice/model from your config */);
        let brain = RemoteBrain::connect(
            "http://127.0.0.1:8080",
            serde_json::json!({ "graph": "receptionist" }),
            "gemini",
            None,
        ).await?;
        let session = MyControlPlane::new(/* … */);   // your SessionSource impl

        let task = build_s2s_task(
            transport, realtime, brain, session,
            /* run_id */ 1, /* token */ "…".into(),
            "models/gemini-live".into(),
        ).await?;

        tokio::spawn(async move { let _ = task.run().await; });
    }
    Ok(())
}

Cargo.toml (point the deps at the published crates or a git/path checkout):

[dependencies]
flowcat-core = { git = "https://github.com/AreevAI/flowcat" }
flowcat-services = { git = "https://github.com/AreevAI/flowcat", features = ["brain-http"] }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
serde_json = "1"

Choosing S2S vs cascaded

Two pipeline shapes, same four seams:

  • Speech-to-speech (S2S) — one realtime model both listens and speaks (build_s2s_task + a RealtimeLlm such as Gemini Live): fewest moving parts and a single hop on the hot path.
  • Cascaded — separate STT → LLM → TTS stages (build_cascaded_task): mix and match a provider per stage and swap any one independently — more control, more to wire.

Rule of thumb: start with S2S; reach for cascaded when you need a specific STT/LLM/TTS combination a single realtime model can't give you.

What's verified vs. coming

  • Live-verified today: speech-to-speech via build_s2s_task with Gemini Live + Plivo / native SIP. Start there.
  • Cascaded STT → LLM → TTS has a parallel builder, build_cascaded_task, with the same seam shape; the realtime path is the one verified end-to-end.

Where to read the exact API

Next: Configuration to tune turn-taking and voice, then Deployment to ship it.

Configuration

Flowcat has no central config file and no settings framework (no figment, envy, or config crate, no flowcat.toml). That is deliberate: Flowcat is a library you embed, so configuration lives in two clearly separated places.

LayerWho reads itHow it's set
Runtime knobsthe Flowcat runtime, at call timeFLOWCAT_* environment variables
Credentials & call settingsyour embedder, passed into constructorsRust code — SipConfig, each service's constructor

The important consequence: the runtime does not read provider API keys from the environment in production. Setting OPENAI_API_KEY does not make a running Flowcat pick it up — your embedder reads its own credentials however it likes and passes them to the provider constructor.

Contributing to Flowcat and running the live integration tests? Those use their own *_API_KEY environment variables — see Building and running tests.


1. Runtime environment variables

These are the variables the runtime itself reads. All are optional — each falls back to a built-in default. They tune the realtime (speech-to-speech) turn-taking and voice; they have no effect on the cascaded path.

VariableApplies toValuesDefault
FLOWCAT_VOICEGemini Live, OpenAI Realtimeprovider voice nameFenrir (Gemini), alloy (OpenAI)
FLOWCAT_VAD_START_SENSITIVITYGemini LiveSTART_SENSITIVITY_UNSPECIFIED · _LOW · _HIGHSTART_SENSITIVITY_LOW
FLOWCAT_VAD_END_SENSITIVITYGemini LiveEND_SENSITIVITY_UNSPECIFIED · _LOW · _HIGHEND_SENSITIVITY_HIGH
FLOWCAT_VAD_PREFIX_PADDING_MSGemini Liveu32 milliseconds500
FLOWCAT_VAD_SILENCE_DURATION_MSGemini Liveu32 milliseconds350

Turn-taking, in plain terms. START_SENSITIVITY_LOW + a 500 ms PREFIX_PADDING means a brief caller sound — a backchannel ("uh-huh"), a cough, line noise — is not committed as a turn, so the agent stops cutting off its own speech. The end side stays eager (END_SENSITIVITY_HIGH + 350 ms trailing silence) so the agent still replies promptly once the caller actually finishes. Invalid values fall back to the default rather than erroring.

Defined in flowcat-core/src/realtime/gemini_live.rs (VadConfig::from_env) and flowcat-services/src/realtime/openai.rs.


2. Credentials & call settings (programmatic)

Everything else is configured in Rust, by your embedder, and passed into the relevant constructor. This is what keeps credentials on infrastructure you control — they never transit a Flowcat-owned config surface.

SIP / telephony — SipConfig

Passed to SipAgent::start(cfg). Telephony trunk credentials live here and reach nothing else.

FieldTypeNotes
serverStringRegistrar / proxy URI, e.g. sip:sip.example.com
loginStringSIP auth username (trunk login)
passwordStringSIP auth password
caller_idStringE.164 / trunk number used as the From user on outbound
public_ipOption<Ipv4Addr>Advertise in Via/Contact/SDP for NAT; None → bound local address
sip_portOption<u16>Local SIP signaling port; None5060
rtp_port_baseu16First (even) RTP port to probe; default 16000
rtp_port_triesu16Even ports to probe from the base; default 200. Caps concurrent call media to this number.

Defined in flowcat-core/src/sip/agent.rs. See the Deployment guide for the firewall implications of the RTP range.

Provider credentials

Each STT / TTS / LLM / realtime service takes its key (and any voice / model settings) through its own constructor. Your embedder decides where those come from — its own env vars, a secrets manager, a vault. A common, simple choice is to read an env var named like the provider expects and pass it in:

#![allow(unused)]
fn main() {
// illustrative — your embedder owns this
let api_key = std::env::var("OPENAI_API_KEY")?;   // your choice, not the runtime's
let tts = OpenAiTts::new(&api_key, /* voice, model, … */);
}

The full set of provider constructors and their arguments is in flowcat-services; see the API reference for how to browse it as rustdoc.


Next: Deployment — build a release binary and ship it in your own VPC.

Flowcat feature-flag matrix

Every provider, transport, and exporter is a single Cargo feature, dep:-gated so the default build pulls none of their client dependencies. This file enumerates every feature across the four crates; it is derived from the Cargo.tomls and is the exhaustive companion to the README's summary table and PROVIDERS.md.

(D) = distinct client (own wire protocol). (W) = thin wrapper over a (D) family client (base_url + auth + default-model change). See CONTRIBUTING.md.


flowcat-coredefault = ["sip", "recorder"]

FeatureDefaultPullsWhat it gates
sipnative SIP user-agent (REGISTER/INVITE/ACK/BYE)
recordercall recorder-as-processor (WAV via hound)
vad-ortort, ndarraySilero VAD + Smart-Turn ONNX impls
filter-rnnoisennnoiselesspure-Rust RNNoise noise-suppression filter

The trait seams (Transport/Stt/Tts/Llm/RealtimeLlm/Vad/Turn/FrameSerializer/ Brain/SessionSource), the Gemini Live client, codec/resample, and the SIP/RTP/SDP stack are always present; only the heavy ONNX/RNNoise bodies are gated.

flowcat-servicesdefault = []

Nothing is on by default. Umbrellas: stt-all, tts-all, llm-all, realtime-all, obs-all.

Realtime / speech-to-speech (7 incl. core Gemini)

FeatureTagPulls
realtime-openai(D)tokio-tungstenite, tokio, base64
realtime-azure(W) over openai
realtime-grok(W) over openai
realtime-inworld(W) over openai
realtime-ultravox(D)tokio-tungstenite, tokio
realtime-novasonic(D)tokio, base64

Gemini Live is the 7th realtime impl and lives in flowcat-core (re-exported as flowcat_core::GeminiLive); it has no flowcat-services feature.

STT (20)

FeatureTagTransport
stt-deepgram(D)WS (ref impl)
stt-assemblyai(D)WS
stt-gladia(D)WS
stt-soniox(D)WS
stt-speechmatics(D)WS
stt-cartesia(D)WS
stt-azure(D)WS
stt-gradium(D)WS
stt-elevenlabs(D)segmented HTTP
stt-sarvam(D)REST
stt-mistral(D)REST
stt-openai(D)Whisper-HTTP base
stt-groq / stt-fal / stt-speaches / stt-xai(W) over openaiWhisper-HTTP
stt-google(D)gRPC (tonic)
stt-nvidia(D)gRPC / Riva (tonic)
stt-aws-transcribe(D)SigV4 WS (hmac/sha2)
stt-whisper-local(D)local (whisper-rs; C build, needs cmake)

TTS (29)

FeatureTagTransport
tts-cartesia(D)WS (ref impl)
tts-elevenlabs / tts-deepgram / tts-rime / tts-asyncai / tts-gradium / tts-soniox / tts-resemble(D)WS
tts-openai(D)HTTP base
tts-groq / tts-xai(W) over openaiOpenAI-TTS-HTTP
tts-azure(D)SSML WS/HTTP
tts-sarvam / tts-mistral / tts-hume / tts-inworld / tts-minimax / tts-camb / tts-speechmatics(D)HTTP
tts-fish / tts-lmnt / tts-neuphonic / tts-smallest(D)interruptible HTTP
tts-kokoro / tts-piper / tts-xtts(D)local/model HTTP
tts-google / tts-nvidia(D)gRPC (tonic)
tts-aws-polly(D)SigV4 (hmac/sha2)

LLM (23)

FeatureTag
llm-openai(D) — chat-completions SSE, ref impl
llm-openai-responses(D) — Responses API
llm-anthropic(D) — Messages API
llm-google(D) — Gemini generateContent
llm-aws-bedrock(D) — SigV4 event-stream (hmac/sha2)
llm-groq, llm-together, llm-fireworks, llm-openrouter, llm-perplexity, llm-deepseek, llm-cerebras, llm-sambanova, llm-nebius, llm-novita, llm-qwen, llm-grok, llm-nvidia-nim, llm-ollama, llm-sarvam, llm-mistral, llm-azure, llm-speaches(W) over llm-openai (base_url + auth) — 18 wrappers

Observability + MCP

FeaturePulls
obs-otelopentelemetry
obs-sentryreqwest
obs-langfusereqwest
mcpreqwest — MCP-as-processor client

Brain adapters

FeaturePulls
brain-httpreqwest, tokio/rt-multi-threadRemoteBrain: drives conversation policy from an HTTP service (e.g. a Python webhook). See examples/python-remote-brain.

flowcat-transportsdefault = []

FeaturePulls
webrtc-str0mstr0m, audiopus, tokio, tokio-util
wstokio-tungstenite, tokio, tokio-util
dailyreqwest
livekit— (stub)
localaudiopus, tokio (local mic/speaker)

flowcat-telephonydefault = ["plivo"]

Serializers are dependency-free flags (pure framing).

FeatureDefault
plivo
twilio, telnyx, exotel, vonage, genesys, asterisk, cloudonix, vobiz
dtmf-inband— (in-band Goertzel DTMF; RFC2833 is always available)

flowcat-agentdefault = ["brain"]

A declarative, graph-based agent: define an agent as a node/edge graph_spec (JSON/YAML) instead of hand-writing an AgentBrain. DeclarativeBrain is a ready-to-use flowcat_core::AgentBrain over the graph engine.

FeaturePulls
brainflowcat-core — the DeclarativeBrain AgentBrain adapter (default on)

With default-features = false the pure graph engine (parse / validate / {{var}} interpolation) builds with no flowcat-core dependency.


Toolchain caveats

  • gRPC (stt-google/stt-nvidia/tts-google/tts-nvidia) compiles .protos via tonic-build → needs protoc on PATH.
  • stt-whisper-local bundles whisper.cpp → needs cmake + a C/C++ toolchain. The Rust stub compiles; the dep's C build is the gate.

Everything else is rustls-only reqwest / tokio-tungstenite — no system OpenSSL, no AWS SDK (the AWS providers hand-roll SigV4 over hmac/sha2).

Deployment

This guide is for operators self-hosting Flowcat in their own VPC (or fully air-gapped). Flowcat is one self-contained binary with no hosted control plane and no phone-home — deployment is "ship a binary, open the right ports, give it your provider credentials."

What you actually deploy is your embedder — the small host binary that terminates calls and supplies the brain. The bundled flowcat CLI (pipeline, ws-echo) is for credential-free demos, not a production server. Everything below applies to the release binary you build from your embedder crate; the build mechanics are identical.


1. Build a release binary

Flowcat is a Cargo workspace. The default build pulls no provider or network dependencies — every STT/TTS/LLM, transport, carrier, and exporter is an opt-in feature, so you compile only what you ship.

# Demo binary, default features (native SIP + recorder, no providers):
cargo build --release -p flowcat-cli

# A cloud build — Gemini Live S2S over a WebSocket carrier, with telemetry:
cargo build --release -p flowcat-cli \
  --features "flowcat-services/realtime-all,flowcat-services/llm-all,flowcat-transports/ws,flowcat-services/obs-otel"

# Fully on-prem / air-gapped — local connectors only, no cloud egress:
cargo build --release -p flowcat-cli \
  --features "flowcat-services/stt-whisper-local,flowcat-services/tts-kokoro,flowcat-services/llm-ollama"

Feature groups (umbrellas in parentheses): stt-* (stt-all), tts-* (tts-all), llm-* (llm-all), realtime-* (realtime-all), obs-* (obs-otel, obs-sentry, obs-langfuse), transports ws / webrtc-str0m, carriers plivo (default) / twilio / telnyx / … . The authoritative, always-current list is the [features] table in each crate's Cargo.toml (core · services · transports · telephony).

Some local connectors pull native build deps — e.g. stt-whisper-local needs a C toolchain (cmake), and vad-ort pulls ONNX Runtime. Install those in your build image.

Fully static binary (optional)

For a dependency-free artifact you can drop into a scratch container or an air-gapped host, target musl:

rustup target add x86_64-unknown-linux-musl
cargo build --release --target x86_64-unknown-linux-musl -p flowcat-cli

(Pure-cloud feature sets build cleanly on musl; feature sets that pull native C libraries — Whisper, ONNX — may need the matching musl system libraries.)


2. Containerize

There is no official image yet — the only Dockerfile in the repo is the benchmark harness under bench/. A production multi-stage build is small:

# ---- build ----
FROM rust:1-bookworm AS build
WORKDIR /src
COPY . .
# swap in your embedder package + the features you need
RUN cargo build --release -p flowcat-cli --features "flowcat-services/realtime-all"

# ---- runtime ----
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates \
    && rm -rf /var/lib/apt/lists/*
COPY --from=build /src/target/release/flowcat /usr/local/bin/flowcat
# UDP for SIP signaling + the RTP media range (see §Networking)
EXPOSE 5060/udp 16000-16398/udp
ENTRYPOINT ["flowcat"]

A musl static build collapses the runtime stage to FROM scratch + the binary

  • ca-certificates.

3. Run under systemd

# /etc/systemd/system/flowcat.service
[Unit]
Description=Flowcat voice runtime
After=network-online.target
Wants=network-online.target

[Service]
ExecStart=/usr/local/bin/your-embedder
Restart=on-failure
RestartSec=2
# Runtime knobs (see Configuration). Credentials belong in an EnvironmentFile
# your embedder reads, not in the unit:
Environment=FLOWCAT_VAD_SILENCE_DURATION_MS=350
EnvironmentFile=/etc/flowcat/secrets.env
# Hardening
DynamicUser=yes
NoNewPrivileges=yes
ProtectSystem=strict
ProtectHome=yes

[Install]
WantedBy=multi-user.target

4. Networking

The native SIP transport binds:

PurposeDefaultConfigured by
SIP signalingUDP 5060SipConfig.sip_port
RTP mediaUDP 16000, even ports, up to rtp_port_tries (default 200 → 16000–16398)SipConfig.rtp_port_base / rtp_port_tries
Outbound to providersTCP 443 (HTTPS/WSS)per provider

Operational notes:

  • The RTP range caps concurrent calls. rtp_port_tries is the hard ceiling on simultaneous media streams — size it to your expected concurrency, and open exactly that even-port range in your firewall / security group.
  • Behind NAT (most cloud VMs), set SipConfig.public_ip so Flowcat advertises the reachable address in Via/Contact/SDP — otherwise media is offered on an unroutable internal IP.
  • A WebSocket carrier (e.g. Plivo/Twilio media streams) needs only outbound TCP 443 instead of the SIP/RTP ports — your embedder chooses the transport.
  • Flowcat exposes no HTTP control port of its own. Health checks, metrics endpoints, and admin APIs belong to your embedder.

5. Scale & capacity

The media loop is Rust — no GC, no GIL — so one process uses every core. There is no worker-fleet to size: scale vertically first, then add processes.

From the published benchmark (Azure 16-vCPU VM, WebSocket + μ-law load, 50 fps/call):

  • flat p99 ≤ 0.61 ms from 10 to 2,000 concurrent calls in a single process;
  • ~19.6 KB RAM per idle session, 7 tokio tasks per call.

Practical sizing:

  • Bound concurrency with rtp_port_tries (SIP) and your carrier's limits.
  • One process per box is usually right; run several behind your SIP proxy / carrier load balancer only when you exceed a single host.
  • Reproduce the benchmark on your own hardware: docker compose -f bench/compose.yml up --build (see bench/README.md).

6. Observability

Build with an exporter feature and wire it in your embedder:

  • obs-otel — OpenTelemetry traces/metrics
  • obs-sentry — error reporting
  • obs-langfuse — LLM call tracing

All are zero-cost when the feature is off. Per-call metrics and transcripts flow through the pipeline's FrameObserver seam; finalized recordings/transcripts are uploaded via your SessionSource (your storage, your URLs).


7. Production checklist

  • Release binary built with only the features you use.
  • public_ip set if the host is behind NAT.
  • Firewall opens UDP 5060 + your RTP even-port range (or just outbound 443 for a WS carrier).
  • rtp_port_tries sized to target concurrency.
  • Provider credentials supplied via your embedder (secrets manager / EnvironmentFile), not baked into the image.
  • FLOWCAT_VAD_* / FLOWCAT_VOICE tuned for your use case (see Configuration).
  • An exporter feature enabled and pointed at your collector.
  • Restart policy + health checks owned by the embedder.

API reference

Flowcat is a set of Rust crates; the canonical, type-level API reference is rustdoc.

Not yet on crates.io / docs.rs. Flowcat is pre-1.0 and not yet published, so there is no docs.rs page yet. Until then, generate the docs locally — it takes seconds. (When published, the hosted link will live here.)

Generate the docs locally

git clone https://github.com/AreevAI/flowcat.git
cd flowcat

# Core seams (no provider features needed):
cargo doc --no-deps -p flowcat-core --open

# Include the provider/transport surface (enable the features you care about):
cargo doc --no-deps \
  -p flowcat-core -p flowcat-services -p flowcat-transports -p flowcat-telephony \
  --features "flowcat-services/realtime-all,flowcat-transports/ws"

Feature-gated items only appear in rustdoc when their feature is enabled — pass the same features you'd build with (see Deployment).

Crate map

CrateWhat it exposes
flowcat-coreThe seams and runtime: AgentBrain, SessionSource, MediaTransport, RealtimeLlm / RealtimeKickoff, the FrameProcessor graph + Frame taxonomy, build_s2s_task / build_cascaded_task, native SipAgent / SipTransport, and the GeminiLive backend
flowcat-servicesSTT / TTS / LLM / realtime provider adapters (feature-gated), observability exporters, the RemoteBrain HTTP adapter (brain-http)
flowcat-transportsWebRTC (webrtc-str0m), WebSocket (ws), and other media transports
flowcat-telephonyCarrier serializers (Plivo, Twilio, Telnyx, …) and DTMF
flowcat-cliThe credential-free flowcat demo binary (pipeline, ws-echo)

Starting points

Contributing to Flowcat

Flowcat is Apache-2.0. By contributing you agree your contribution is licensed under those terms. Every source file carries the SPDX header // SPDX-License-Identifier: Apache-2.0 as its first line — keep that on any new file.

By participating you also agree to abide by our CODE_OF_CONDUCT.md. Security issues should be reported privately — see SECURITY.md.

This guide covers the two things a contributor most often does: add a provider and write a processor. Read PROCESSOR-DESIGN.md (the frozen pipeline API) and DESIGN.md (the runtime + trait seams) first.


Building and running tests

Prerequisites: a stable Rust toolchain (rustup); Python 3 (standard library only) for the examples/. The default build needs nothing else.

git clone https://github.com/AreevAI/flowcat.git && cd flowcat
cargo build -p flowcat-cli     # default features — no provider/network deps
cargo test                     # the full OFFLINE suite: no network, no credentials

cargo test is the green bar below — pure encode/decode fixtures, hermetic.

Live integration tests exercise a real provider over the network. They live in #[cfg(test)] blocks (most #[ignore]d, so they're skipped by default) and read their credentials from the environment. These are test credentials, not deployment configuration — in production the runtime never reads provider keys from the environment; an embedder passes them to each service constructor.

Each provider follows a PROVIDER_API_KEY (+ optional PROVIDER_VOICE_ID / PROVIDER_MODEL) convention; representative variables:

VariableUsed by
OPENAI_API_KEYOpenAI STT / TTS / LLM / Realtime
ANTHROPIC_API_KEYAnthropic LLM
GEMINI_API_KEYGemini Live realtime
GEMINI_LIVE_MODELoverride the Gemini Live model id in tests
DEEPGRAM_API_KEYDeepgram STT / TTS
CARTESIA_API_KEY, CARTESIA_VOICE_IDCartesia STT / TTS
AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGIONBedrock / Transcribe / Polly / Nova Sonic
XTTS_BASE_URL, KOKORO_BASE_URL, PIPER_BASE_URL, WHISPER_MODEL_PATHlocal connectors

The authoritative list is the #[cfg(test)] blocks in each provider module under flowcat-services/src. Run one live test by name with its credentials set:

GEMINI_API_KEY=… cargo test -p flowcat-core -- --ignored gemini_live

The bar (every PR)

  • cargo build (default features) compiles and pulls no new networked dependency into the default build.
  • cargo test is green — offline. Tests must not hit the network; provider tests are pure encode/decode fixtures (see below).
  • cargo clippy --all-targets is clean. The only sanctioned escape is #[allow(dead_code)] on a held-but-not-yet-wired config field of a stub.
  • New crypto / auth / signing paths (e.g. a new SigV4 or signature provider) get an independent known-answer test and an extra reviewer pass.

Adding a provider (STT / TTS / LLM / realtime)

Providers live in flowcat-services (one per file), each behind its own Cargo feature. The catalogue is organised by protocol family, and a family's real client is implemented once (PROVIDERS.md §0).

Triage your provider first:

  • (D) Distinct client — its own wire protocol (own WS/HTTP framing, own auth, own message schema). Write a real client: the network transport + a pure encode/decode seam + unit tests. Templates: Deepgram (STT-WS), Cartesia (TTS-WS), OpenAI (LLM/Whisper-HTTP).
  • (W) Thin wrapper — protocol-compatible with an existing (D) family member (OpenAI-compatible, Whisper-HTTP, OpenAI-Realtime, …), differing only in base_url + auth header + default model. Write a ~30-line struct that constructs the family's (D) client with that config and delegates the trait. Template: any of the llm-* OpenAI wrappers.

The mechanics:

  1. Implement the trait seam for your category (Stt / Tts / Llm / RealtimeLlm, defined in flowcat-core) in flowcat-services/src/<cat>/<name>.rs.
  2. Add a dep:-gated Cargo feature in flowcat-services/Cargo.toml. A (W) just enables its (D) family feature (e.g. llm-groq = ["llm-openai"]). A (D) pulls only its client dep (reqwest, tokio-tungstenite, tonic, …). No new dep may land in the default builddefault = [] in flowcat-services.
  3. Register the mod + pub use in the category's mod.rs, and add your feature to the relevant *-all umbrella so the CLI/CI fat build covers it.
  4. Write the fixture test. This is the coverage bar: a pure function that builds your provider's outbound request frame(s) and parses a recorded inbound response, asserting on the exact bytes/JSON — no live socket. For a SigV4 provider (AWS Bedrock/Transcribe/Polly), pin a known-answer signing test against the AWS-published vectors. Live verification needs the vendor's credentials and is out of scope for CI.

A "not yet wired" stub returns FlowcatError::Other("<provider>: not yet wired") and still compiles + passes — that's the floor, a real PR replaces it with the client + fixtures.


Writing a processor (the contract every author must know)

Each processor is a FrameProcessor (flowcat-core/src/processor/). The framework owns the per-processor tokio task, the bounded/priority channels, and the lifecycle; you write process_frame and optionally start/stop.

The one contract that surprises people — lifecycle/system frames bypass process_frame (PROCESSOR-DESIGN.md §2.1–§2.3):

  • Start → the framework calls your async fn start(&mut self, setup, params) (open sockets, spawn provider reader tasks here), then forwards the frame. It does not reach process_frame.
  • End / Stop / Cancel → the framework calls your async fn stop(&mut self, reason) (flush + close), then forwards. Also not via process_frame.
  • Interruption and other System frames ride an unbounded priority channel and are drained ahead of data/control by a biased select; the task loop handles interruption (draining interruptible queued frames, keeping uninterruptible ones, cancelling an in-flight interruptible process_frame).

So process_frame only ever sees Data/Control frames. Do not put socket open/close in process_frame — it will never run for the lifecycle frames that should trigger it. Other rules:

  • process_frame must not block. Long work (a provider round-trip) is driven by an internally-spawned task that feeds results back as frames — the Gemini reader-task pattern (flowcat-core/src/realtime/gemini_live.rs).
  • Push results via the Link (push / push_down / push_error / broadcast), never by calling another processor directly.
  • The hot audio frame is Arc<AudioFrame> — clone the Arc, don't copy PCM.
  • An Err returned from process_frame becomes an upstream non-fatal Error frame; return Err for recoverable faults, set fatal for terminal ones.
  • A pure observer/no-op processor is the default process_frame (forward unchanged) — don't override what you don't need.

Flowcat stays contract-agnostic: keep any embedder/control-plane knowledge out of flowcat-core. Conversation decisions and call bootstrap/finalize are the AgentBrain / SessionSource trait seams an embedder implements; the runtime treats brain_config as opaque bytes (DESIGN.md).


Feature-flag discipline (no default-build cost)

  • flowcat-core default = ["sip", "recorder"] — no HTTP/gRPC/ONNX. The only optional core deps are ort (vad-ort) and nnnoiseless (filter-rnnoise).
  • flowcat-services / flowcat-transports default = []. Every provider and transport is dep:-gated; the default build links none of their clients.
  • flowcat-telephony default = ["plivo"] (serializers are deps-free flags).
  • Adding a provider must not move any dependency out of optional/dep: gating. If a reviewer sees cargo tree on the default build grow, the PR is wrong.

The exhaustive matrix is FEATURES.md — keep it in sync when you add a feature.


Review

Substantive PRs get a code review; anything touching auth, signing, or signature verification also gets a security review. Be honest in the PR about what is fixture-tested vs live-verified — overclaiming a live-working provider is the thing reviewers push back on hardest.

Flowcat — design

Flowcat is a native-Rust real-time voice-agent runtime built to run on infrastructure you own: it carries a phone/WebRTC call's audio, runs a speech-to-speech model, and lets a pluggable "brain" drive the conversation — all as one self-contained binary you deploy in your own VPC (or air-gapped), with no hosted control plane and no Python or FreeSWITCH sidecar. Because the media loop is one tokio process (no GIL, no FFI), that single binary also holds serious call volume per box. It is an isolated, Apache-2.0 cargo workspace designed to be embedded by a host application.

Motivation + benchmark: bench/RESULTS.md — one Flowcat process holds a flat p99 from 10 to 2,000 concurrent calls where a Python deployment grows a multi-second tail and needs a worker fleet. Read this as runtime capacity and reliability headroom, not conversational latency (which your STT/LLM/TTS providers dominate). Native SIP is the current approach — softswitch optional, superseding the earlier FreeSWITCH gateway design. Native SIP details: SIP-DESIGN.md.

Scope of this document. This is the architecture-and-trait-seam spec, written at the first milestone (Plivo WS-media + native SIP, with Gemini Live as the first speech-to-speech brain). The seams below are unchanged, but the runtime has broadened since:

  • Two pipeline shapes, not just S2S — a single speech-to-speech model (RealtimeLlm, e.g. Gemini Live) or a cascaded STT → LLM → TTS pipeline. Both are built from flowcat-core::pipeline (build_s2s_task / build_cascaded_task).
  • ~80 STT/TTS/LLM + realtime providers, 5 transports, and 9 telephony serializers, now split into sibling crates (flowcat-services, flowcat-transports, flowcat-telephony), each behind one Cargo feature — see the README crate map + connector table and FEATURES.md.
  • Fully local / air-gapped by swapping in the local connectors (Whisper STT; Kokoro / Piper / XTTS TTS; Ollama LLM) — no call audio leaves your infrastructure.
  • Python without Rust — the RemoteBrain HTTP adapter drives the AgentBrain seam from a Python service (see QUICKSTART.md and examples/).

Treat the README, FEATURES.md, and PROCESSOR-DESIGN.md as authoritative for the current surface; this doc remains the reference for the trait seams and call lifecycle, which are stable. Concrete provider/crate names below (e.g. "the Gemini Live client") are the milestone's first implementation, not the limit of what ships.

Goal of the first milestone

A telephony call works end to end through Flowcat for two carrier styles:

  • WebSocket-media carriers (e.g. Plivo) — audio already arrives over a WS. Pure-Rust path, live-testable without extra infra. This is the easy path and the integration baseline.
  • SIP/RTP-only carriers — no provider WS-media. Flowcat speaks SIP/RTP natively (no softswitch): a SipAgent in flowcat-core REGISTERs the carrier's trunk and terminates INVITE/RTP in-process, and a SipTransport presents the call to the pipeline through the same MediaTransport seam the WS path uses. This is the native-SIP decision — one single Rust binary, no FreeSWITCH/mod_audio_stream gateway. (The earlier FreeSWITCH gateway approach is superseded — see SIP-DESIGN.md.)

The first milestone's brain is native Rust Gemini Live (the speech-to-speech path live-verified end to end); a cascaded STT → LLM → TTS pipeline is the other supported shape. Either way the conversation logic lives behind the AgentBrain trait — the embedder's own engine linked as an rlib (no PyO3 FFI), or the ready-made RemoteBrain HTTP adapter driving it from a Python service.

Two-plane fit + where Flowcat sits

 PSTN ─SIP/RTP ───────────────────────> SipAgent (in flowcat-core, runs in the embedder) ─┐
 PSTN ─Plivo <Stream> WS ─> host WS ─> WsCarrierTransport ────────────────────────────────┤
                                                                                          ▼
                          ┌──────────── the embedder (binary, host workspace) ──────────┐
                          │  HTTP: /telephony/ws/{provider}/{run} · answer-XML · health  │
                          │  runs flowcat SipAgent (SIP trunk REGISTER) + control plane  │
                          │  adapts its inbound WS → MediaSocket → WsCarrierTransport     │
                          │  impl AgentBrain  (→ the host's engine rlib, NO PyO3)        │
                          │  impl SessionSource (→ the host's control-plane API)         │
                          └───────────────────────────┬─────────────────────────────────┘
                                                       │ uses
                          ┌──────────── flowcat-core (lib, OSS workspace) ──────────────┐
                          │  MediaTransport seam: SipTransport (SIP/RTP) · WsCarrier+    │
                          │    MediaSerializer(plivo) · RealtimeLlm(GeminiLive)          │
                          │  Call pipeline · codec · recorder · sip/ (SipAgent, RTP/SDP) │
                          │  traits: AgentBrain·SessionSource·MediaTransport·RealtimeLlm │
                          └──────────────────────────────────────────────────────────────┘
                          shared database / object store (via the embedder's control plane only)

flowcat-core knows nothing about the embedder, web routing, SQL, or the wire contract. The embedder-specific glue (engine adapter, control-plane client, auth/routing) lives in the consumer crate.

Crate layout

The tree below is the milestone's core-centric view (providers, transports, and serializers shown inside flowcat-core). They have since been split into the sibling crates flowcat-services / flowcat-transports / flowcat-telephony; the README crate map is the current authority. The flowcat-core module seams shown here are still accurate.

flowcat/                         # ← own cargo workspace (Apache-2.0)
  Cargo.toml                     # [workspace] members = ["flowcat-core", "flowcat-cli", ...]
  DESIGN.md  LICENSE
  flowcat-core/                  # the runtime library
    src/
      lib.rs
      frame.rs                   # AudioFrame, ControlEvent, etc.
      error.rs                   # FlowcatError
      codec.rs                   # g711 ↔ pcm16, resample (rubato)
      audio.rs                   # AudioRecorder (mono mix → WAV bytes)
      transport/{mod.rs, media.rs, carrier.rs, socket.rs, ws_media.rs}  # MediaTransport seam + WsCarrierTransport
      serializer/{mod.rs, plivo.rs}
      sip/{mod.rs, agent.rs, transport.rs, rtp.rs, sdp.rs}  # native SIP UA + SipTransport (rsipstack + RTP/SDP/jitter)
      realtime/{mod.rs, gemini_live.rs}
      brain.rs                   # trait AgentBrain + ToolDecl + BrainAction
      session.rs                 # trait SessionSource + ResolvedCall + Usage
      pipeline.rs                # Call::run(...) — the orchestration loop
      transcript.rs              # transcript collector
  flowcat-cli/                   # example: local-mic / ws demo (DX), embedder-agnostic
  bench/  bench-rs/              # (existing benchmark kit)

# the embedder (lives in the host's own workspace):
#   glue binary that runs the SipAgent + the control-plane originate endpoint,
#   a telephony provider, and the carrier routes / inbound-resolve / originate.

The embedder's Cargo.toml path-deps (or git/crates.io-deps) flowcat-core and links its own engine. Cross-workspace deps are fine — flowcat-core builds in each consumer's graph and standalone via its own lockfile.

Trait contracts (the seams everything plugs into)

All async traits use async_trait. Audio is 16-bit little-endian mono PCM internally; sample rate is explicit on every buffer.

#![allow(unused)]
fn main() {
// frame.rs
pub struct AudioChunk { pub pcm: Vec<i16>, pub sample_rate: u32 } // mono

// transport/media.rs — THE pipeline seam. The pipeline never cares whether audio
// arrived as carrier WS frames or as RTP. SipTransport and WsCarrierTransport both impl it.
#[async_trait] pub trait MediaTransport: Send {
    async fn recv(&mut self) -> Option<MediaIn>;       // StreamStart{call_id} | Audio(@carrier_rate) | Stop
    async fn send_audio(&mut self, chunk: AudioChunk) -> Result<(), FlowcatError>; // bot audio out
    async fn send_clear(&mut self) -> Result<(), FlowcatError>;                    // barge-in flush (no-op for RTP)
    fn carrier_rate(&self) -> u32;                                                 // 8000 for telephony G.711
}
pub enum MediaIn { StreamStart { call_id: String }, Audio(AudioChunk), Stop }

// transport/socket.rs — WS building block; the host provides the raw socket. Used (with a
// serializer) by `WsCarrierTransport: MediaTransport` for the Plivo path. Native SIP bypasses this.
#[async_trait] pub trait MediaSocket: Send {
    async fn recv(&mut self) -> Option<WsIn>;        // Text(String) | Binary(Vec<u8>) | Close
    async fn send_text(&mut self, s: String) -> Result<(), FlowcatError>;
    async fn send_binary(&mut self, b: Vec<u8>) -> Result<(), FlowcatError>;
}

// serializer/mod.rs — per-carrier WS framing for WsCarrierTransport. Pure (no I/O). plivo only.
pub trait MediaSerializer: Send {
    fn on_message(&mut self, msg: &WsIn) -> SerIn;   // StreamStart{call_id,..} | Audio(AudioChunk) | Stop | Ignore
    fn encode_audio(&self, chunk: &AudioChunk) -> WsOut;   // text/binary to send back
    fn encode_clear(&self) -> Option<WsOut>;               // barge-in / interruption
    fn carrier_rate(&self) -> u32;                          // 8000 for telephony μ-law
}

// realtime/mod.rs — the speech-to-speech model abstraction (GeminiLive first).
#[async_trait] pub trait RealtimeLlm: Send {
    async fn connect(&mut self, setup: RealtimeSetup) -> Result<(), FlowcatError>; // system prompt+tools
    async fn send_audio(&mut self, chunk: AudioChunk) -> Result<(), FlowcatError>; // 16k PCM in
    async fn update_system(&mut self, prompt: String, tools: Vec<ToolDecl>) -> Result<(), FlowcatError>;
    async fn send_tool_result(&mut self, id: String, result: serde_json::Value) -> Result<(), FlowcatError>;
    async fn next_event(&mut self) -> Option<RealtimeEvent>; // AudioOut(24k) | UserText | BotText | ToolCall | Interrupted | Usage | Closed
}

// brain.rs — the conversation decision-maker. The embedder impls this over its own engine.
pub trait AgentBrain: Send {
    fn system_prompt(&self) -> String;
    fn tools(&self) -> Vec<ToolDecl>;           // transitions + endCall (+ later: node tools)
    fn on_tool_call(&mut self, name: &str, args: &serde_json::Value) -> BrainAction;
    fn is_finished(&self) -> bool;
    fn collected_vars(&self) -> serde_json::Value;
}
pub enum BrainAction { Transition { system_prompt: String, tools: Vec<ToolDecl>, say: Option<String> }, Stay, End { disposition: Option<String> } }
pub struct ToolDecl { pub name: String, pub description: String, pub params: serde_json::Value } // JSON-schema params

// session.rs — call bootstrap + finalize. The embedder impls this over its control-plane HTTP.
#[async_trait] pub trait SessionSource: Send + Sync {
    async fn resolve(&self, run_id: i64, token: &str) -> Result<ResolvedCall, FlowcatError>;
    async fn complete(&self, run_id: i64, token: &str, fin: Finalize) -> Result<(), FlowcatError>;
    async fn artifact_upload_url(&self, run_id: i64, token: &str, kind: &str) -> Result<UploadTarget, FlowcatError>;
    async fn put_bytes(&self, url: &str, bytes: Vec<u8>, content_type: &str) -> Result<(), FlowcatError>;
}
pub struct ResolvedCall { pub provider: String, pub brain_config: serde_json::Value, /* graph_spec+runtime+seed */ pub is_completed: bool }
pub struct Finalize { pub usage: serde_json::Value, pub collected_vars: serde_json::Value, pub recording_url: Option<String>, pub transcript_url: Option<String> }
}

brain_config is opaque to flowcat-core (it's the embedder's graph/spec + runtime options + seed vars); the embedder builds its brain from it. Flowcat never sees the contract.

The embedder assembles these seams into a runnable call with one of the two builders in flowcat-core::pipeline: build_s2s_task (a single RealtimeLlm such as Gemini Live) or build_cascaded_task (a MediaTransport + STT + LLM + TTS chain). Both accept any AgentBrain and drive the same lifecycle below.

Call lifecycle

The embedder owns the control plane; the shapes below are the typical wiring it provides.

WS-media inbound (e.g. Plivo): carrier → the embedder's answer proxy → inbound-run endpoint (verify sig, route, create run, token) → answer XML with <Stream> → the carrier opens a WS to the embedder's /telephony/ws/{provider}/{run}?token=SessionSource.resolve → the call pipeline runs.

WS-media outbound: initiate-call endpoint → run+token → carrier originate (answer_url) → the carrier GETs answer XML → <Stream> WS → same pipeline.

SIP inbound (native SIP): PSTN → SIP → the SipAgent running inside the embedder (the trunk is REGISTERed at startup) accepts the INVITE → the embedder resolves the call over its control plane (DID → workflow → create run+token) → builds a SipTransport for the dialog and runs the pipeline. Carrier CDR/recording webhooks are side-effect-only, never the media trigger.

SIP outbound (native SIP): initiate-call endpoint → run+token → the control plane POSTs to the embedder's originate endpoint ({run_id, token, to_number}, no ESL) → the SipAgent originates the INVITE to the E.164 → on answer builds a SipTransport and runs the pipeline. CallerID is configured on the trunk by the embedder's SIP trunk configuration.

Audio path

Telephony is G.711 μ-law 8 kHz. Gemini Live wants 16 kHz PCM in, emits 24 kHz PCM out.

carrier → μ-law decode → 8k→16k upsample (rubato) → Gemini
Gemini → 24k→8k downsample (rubato) → μ-law encode → carrier
recorder taps both legs → mono mix → WAV → object store

On the SIP path the SipTransport decodes inbound RTP (G.711 PCMU/PCMA per the negotiated codec) straight to 8 kHz PCM and re-encodes the bot leg to RTP — no WS hop, no intermediate L16 framing. On the Plivo path the PlivoSerializer handles the μ-law WS framing. Both feed the same resample/recorder. Crates: audio-codec-algorithms (G.711), rubato (resample), hand mix for the recorder.

The rates above are Gemini Live's (16 kHz in / 24 kHz out). A cascaded pipeline instead resamples the same 8 kHz carrier audio to whatever each STT/TTS provider expects; the carrier codec, rubato resampling, and the dual-leg recorder are identical either way.

Gemini Live protocol (native client)

WSS wss://generativelanguage.googleapis.com/ws/google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContent?key=<API_KEY>JSON frames (not protobuf).

  • client→server: setup (model, systemInstruction, tools, responseModalities:["AUDIO"], input/output transcription on) · realtimeInput.mediaChunks[{mimeType:"audio/pcm;rate=16000", data:b64}] · toolResponse.functionResponses · clientContent (kickoff turn).
  • server→client: setupComplete · serverContent.modelTurn.parts[].inlineData (24 kHz PCM b64, the bot audio) · serverContent.inputTranscription/outputTranscription · serverContent.interrupted (barge-in) · toolCall.functionCalls[{name,args,id}] · usageMetadata · goAway/sessionResumptionUpdate.

Tools = the brain's transitions as no-arg functions + endCall. On toolCall: call AgentBrain.on_tool_call; for Transition, RealtimeLlm.update_system(new prompt, new tools) + send_tool_result; for End, drain + finalize. v1 handles interrupted (clear carrier audio); goAway/reconnect is a documented follow-up (see ROADMAP.md).

What the embedder's control plane provides

Flowcat itself does not implement the control plane. To wire a SIP/RTP carrier, the embedder supplies (in its own, separately-reviewed code):

  1. A telephony provider for the carrier: sample_rate() = 8000; per-event webhook signature verification (e.g. HMAC over the carrier's event string); inbound parsing (DID/caller/call-id) and status → lifecycle mapping; plus any REST auth signer the carrier's API needs.
  2. Routes the carrier talks to: CDR/event webhooks (side-effect-only), a service-authed sip/inbound-resolve (DID → workflow → run+token), and an initiate-call branch whose originate path POSTs to the embedder's media-binary originate endpoint ({run_id, token, to_number}) — no ESL.
  3. A credential shape for the carrier (API key/secret, SIP login/password/server, caller-id), sealed at rest. The SIP trunk that actually REGISTERs is configured by the embedder's SIP trunk configuration (server / login / password / caller-id) passed to SipConfig/SipAgent.

OSS boundary & license

  • Flowcat (Apache-2.0): flowcat-core (runtime + the four traits MediaTransport·RealtimeLlm·AgentBrain·SessionSource + native SIP UA sip/ + WsCarrierTransport + codec/recorder + a demo brain) and the sibling crates flowcat-services (~80 STT/TTS/LLM/realtime providers + obs exporters + MCP), flowcat-transports, and flowcat-telephony (carrier serializers + DTMF), plus the flowcat-cli demo binary. Every provider/transport is one opt-in Cargo feature, so a fully local/air-gapped build is just a feature selection. No embedder contract.
  • The embedder (its own code): the glue binary (engine adapter + control-plane client + auth/routing), plus whatever editor, campaigns, billing, and multi-tenant control plane it provides. The host's brain implementation plugs in via the AgentBrain trait.
  • The Memory trait is OSS; a concrete backend is the embedder's choice (not wired here).

Security

  • Per-call token authorizes the media WS + every control-plane call. The WS query token is checked by the embedder before resolve.
  • Carrier signature verification (CDR webhooks) stays in the embedder's control plane (constant-time compare, replay window). The model never receives carrier/embedder credentials.
  • SIP trunk credentials live in the embedder's config — never logged. Inbound INVITE trust flows through the embedder's sip/inbound-resolve (service-authed; identity from the DID route, never the INVITE body). The outbound originate endpoint authenticates (run token / service auth) and validates to_number (E.164). RTP from an unexpected source addr is dropped (symmetric RTP). No new secret crosses the wire to the model/caller.
  • The embedder seals carrier config at rest. Both review gates (code + security) apply to all control-plane changes.

Testing strategy

  • Unit (CI, no infra): G.711 round-trip; resample ratios; Plivo serializer parse/encode; SIP: SDP offer/answer build+parse (PCMU/PCMA pick, ptime), RTP packetize/depacketize (seq/ts/PT), the SipTransportMediaTransport mapping (fed RTP → Audio; synthesized BYE → Stop); carrier signature accept/reject/tamper/replay + REST signing known-answer vector; the brain adapter (transition→tool→re-prompt→end); Gemini Live JSON message encode/decode against captured fixtures.
  • Integration (CI): a mock MediaTransport + mock RealtimeLlm driving the call pipeline to a clean finalize (no network).
  • Live (gated — needs infra + user OK): a carrier dev number (NEVER a production number) for the WS-media path; a SIP trunk registered by the native SipAgent for the SIP path. Inform the user before any live call (account guardrail).

Flowcat — the composable FrameProcessor pipeline (FROZEN API)

Status: FROZEN. This is the keystone API for the whole pipecat-parity program (see ROADMAP.md). Every later component (cascaded STT/TTS/LLM, VAD/turn, WebRTC/serializer transports, observability, transfer/DTMF) is implemented against the traits and types frozen here. Companion runtime doc: DESIGN.md (today's Call::run, the four seams, the audio path). Mirror source: pipecat pipecat/src/pipecat/{frames/frames.py, processors/frame_processor.py, pipeline/{pipeline,parallel_pipeline,task,runner}.py, observers/base_observer.py, metrics/metrics.py}.

Scope: the framework only — Frame, FrameProcessor, Pipeline, ParallelPipeline, PipelineTask, PipelineRunner, Observer, metrics frames, the service-processor trait signatures, and the seam→processor mapping. Provider impls, audio models, transports, and serializers are later work and only their trait signatures are frozen here (so the fan-out can start against a stable surface).

Non-negotiable: the live Gemini-Live S2S prod path keeps working on the current Call::run until the processor pipeline is proven equivalent (§7). It lands alongside Call::run, never as a rewrite-in-place.


0. Design goals & the constraints they come from

  1. Literal pipecat parity in shape, so the ~80-provider fan-out is a mechanical port: a Frame taxonomy, a FrameProcessor with process_frame(frame, direction) + push_frame, prev/next linking, Pipeline/ParallelPipeline, PipelineTask + PipelineRunner, Observer. (pipecat frame_processor.py:175, pipeline.py:91, task.py:142.)
  2. Protect the p99 moat. Today's Call::run is one tokio::select! loop (pipeline.rs:195) holding p99 ≤ 0.61 ms to 2,000 concurrent calls (bench/RESULTS.md). The channel-per-processor model adds per-frame hops; we must show the added cost stays in the sub-millisecond noise (§2.4).
  3. OSS-clean + compile-fast. flowcat-core stays embedder-agnostic (lib.rs) and must build without pulling every provider; providers/transports live in sibling crates behind one cargo feature each (§8).
  4. Extensible frame set. OSS users (and later components) must be able to add frame types without editing flowcat-core. This drives the enum-core + Frame::Custom(Arc<dyn CustomFrame>) escape hatch decision (§1.1).
  5. Zero ABI churn for the parallel fan-out. Trait method shapes frozen now; adding a provider must never require touching the framework.

1. Frame taxonomy

1.1 Enum core with a trait escape hatch — and why

pipecat models frames as a Python class tree with isinstance dispatch (frames.py:54 FrameSystemFrame/DataFrame/ControlFrame). The direct Rust analogues are (a) a closed enum Frame or (b) trait Frame: Any + downcast.

closed enumtrait Frame: Any + downcast
Dispatchmatch (no vtable, no alloc, branch-predicted)Any::downcast_ref (type-id compare) per handler
Exhaustivenesscompiler-checked; adding a variant flags every matchnone; missed types silently fall through
Per-frame costa stack enum move; the hot audio variant is Arc<AudioFrame>Box<dyn> / Arc<dyn> heap alloc per frame
OSS extensibilityclosed — users cannot add a variantopen — any type implementing the trait
Category (System/Data/Control)one method fn class()per-impl

The moat (constraint 2) wants the cheap, branch-predicted match and the alloc-free hot path; literal parity (constraint 1) wants exhaustiveness so the port is mechanical and a new audio/turn frame can't be silently dropped. But constraint 4 (OSS users add frame types) rules out a purely closed enum.

Decision: a closed enum Frame for every pipecat frame, plus one Custom variant carrying Arc<dyn CustomFrame> as the extension point. Core processors match on named variants with full exhaustiveness; OSS extensions ride in Custom and are downcast only by the processors that care. This is the standard "enum + escape hatch" pattern: 99% of frames are first-class and alloc-free; extensibility costs one Arc<dyn> only on the frames that use it.

#![allow(unused)]
fn main() {
// flowcat-core/src/processor/frame.rs   (NOTE: distinct from today's data-shape
// `frame.rs`, which is renamed to `types.rs` — see §8.4 migration step M0.)

use std::any::Any;
use std::sync::Arc;

/// Direction of frame flow. Mirrors pipecat `FrameDirection`
/// (frame_processor.py:56). `Downstream` = source→sink; `Upstream` = sink→source
/// (errors, end-of-task requests, RTVI acks).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Direction {
    Downstream,
    Upstream,
}

/// Frame scheduling class — mirrors pipecat's three base classes
/// (frames.py:95/106/118). Drives queue priority and interruptibility:
/// `System` jumps the queue and survives interruption; `Data` is dropped on
/// interruption; `Control` is ordered like Data but also survives interruption
/// when `uninterruptible()` is set (e.g. `End`, `Stop`).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FrameClass {
    System,
    Data,
    Control,
}

/// Per-frame metadata, present on every frame (mirrors the `Frame` base fields:
/// `id`, `name`, `pts`, `metadata` — frames.py:73-79). Cheap to clone (Arc the map).
#[derive(Debug, Clone)]
pub struct FrameMeta {
    /// Process-unique monotonic id (an `AtomicU64` bump; see `next_frame_id()`).
    pub id: u64,
    /// Human name for tracing/observers, e.g. "OutputAudio#42". Built lazily.
    pub name: &'static str,
    /// Presentation timestamp in **nanoseconds** on the pipeline clock, if set.
    pub pts: Option<i64>,
    /// Paired id when a frame was broadcast both directions (frames.py:76).
    pub broadcast_sibling_id: Option<u64>,
    /// Arbitrary sideband metadata (Arc so cloning a frame is cheap).
    pub extra: Option<Arc<serde_json::Map<String, serde_json::Value>>>,
    /// Transport source/destination track names (frames.py:78-79).
    pub transport_source: Option<Arc<str>>,
    pub transport_destination: Option<Arc<str>>,
}

/// OSS extension point: a frame type defined outside flowcat-core. Carried in
/// `Frame::Custom`. Processors that understand it `downcast_ref`; everyone else
/// forwards it unchanged in the direction it arrived.
pub trait CustomFrame: Any + Send + Sync + std::fmt::Debug {
    fn frame_class(&self) -> FrameClass;
    /// True if this frame must survive interruption (pipecat `UninterruptibleFrame`).
    fn uninterruptible(&self) -> bool { false }
    fn as_any(&self) -> &dyn Any;
}

/// The frame that flows through every processor. One closed enum mirroring the
/// pipecat frame tree (frames.py), plus `Custom` for OSS extensions.
///
/// The hot audio variants box their payload in `Arc<AudioFrame>` so cloning a
/// frame for the broadcast/observer paths never copies PCM.
#[derive(Debug, Clone)]
pub enum Frame {
    // ---- System frames (priority; survive interruption) — frames.py:846+ ----
    /// Pipeline init: carries sample rates + metric/trace toggles (StartFrame, :847).
    Start(StartParams),
    /// Immediate stop; flush nothing (CancelFrame, :873).
    Cancel { reason: Option<String> },
    /// Error notification pushed upstream (ErrorFrame, :890). `fatal` ⇒ task cancels.
    Error { message: String, fatal: bool, processor: Option<Arc<str>> },
    /// Barge-in (InterruptionFrame, :959). Broadcast both directions by the
    /// turn/VAD start strategy or any processor.
    Interruption,
    /// Raw caller audio from a transport input (InputAudioRawFrame, :1250).
    InputAudio(Arc<AudioFrame>),
    /// User-associated audio (UserAudioRawFrame, :1296) — carries `user_id`.
    UserAudio { audio: Arc<AudioFrame>, user_id: Arc<str> },
    /// Raw text input from a transport (InputTextRawFrame, :1282) — text-chat path.
    InputText(String),
    /// Inbound DTMF keypress (InputDTMFFrame, :1353).
    InputDtmf(KeypadEntry),
    /// VAD/turn lifecycle (frames.py:971-1104). One variant per pipecat frame.
    UserStartedSpeaking,
    UserStoppedSpeaking,
    UserSpeaking,
    BotStartedSpeaking,
    BotStoppedSpeaking,
    BotSpeaking,
    /// Definitive VAD edges with the deciding secs (VAD*SpeakingFrame, :1043/1058).
    VadUserStartedSpeaking { start_secs: f32 },
    VadUserStoppedSpeaking { stop_secs: f32 },
    /// Mute/unmute the STT service (STTMuteFrame, :1182).
    SttMute(bool),
    /// Performance metrics (MetricsFrame, :1108) — TTFB/processing/usage/turn.
    Metrics(Vec<MetricsData>),
    /// Transport-level message in/out urgent (Input/OutputTransportMessage*, :1193/1207).
    TransportMessage { payload: serde_json::Value, urgent: bool },
    /// SFU/transport lifecycle (BotConnected/ClientConnected, :1621/1633).
    ClientConnected,
    BotConnected,
    /// Function-call signalling (FunctionCallsStarted/InProgress/Cancel, :1155/1804/1169).
    FunctionCallsStarted(Vec<FunctionCall>),
    FunctionCallInProgress { call: FunctionCall, cancel_on_interruption: bool },
    FunctionCallCancel { function_name: String, tool_call_id: String },

    // ---- Data frames (ordered; dropped on interruption) — frames.py:190+ ----
    /// Output audio to a transport (OutputAudioRawFrame, :191).
    OutputAudio(Arc<AudioFrame>),
    /// TTS-generated audio, tagged with its context id (TTSAudioRawFrame, :231).
    TtsAudio { audio: Arc<AudioFrame>, context_id: Option<Arc<str>> },
    /// Generic text (TextFrame, :293) — flows LLM→aggregator→TTS.
    Text(String),
    /// LLM-generated text chunk (LLMTextFrame, :333).
    LlmText(String),
    /// Final transcription (TranscriptionFrame, :419).
    Transcription { text: String, user_id: Arc<str>, language: Option<Language>, final_: bool },
    /// Interim/partial transcription (InterimTranscriptionFrame, :445).
    InterimTranscription { text: String, user_id: Arc<str>, language: Option<Language> },
    /// Text the TTS should speak (TTSSpeakFrame, :744).
    TtsSpeak { text: String, append_to_context: Option<bool> },
    /// Word/segment text emitted by TTS with its context (TTSTextFrame, :400).
    TtsText { text: String, context_id: Option<Arc<str>> },
    /// Function-call result, fed back to the LLM (FunctionCallResultFrame, :719).
    /// Uninterruptible — once produced, context must always be updated.
    FunctionCallResult(FunctionCallResult),
    /// Trigger an LLM run over the current context (LLMRunFrame, :585).
    LlmRun,
    /// The universal LLM context to run (LLMContextFrame, :502).
    LlmContext(Arc<LlmContext>),
    /// Outbound DTMF (OutputDTMFFrame, :790).
    OutputDtmf(Vec<KeypadEntry>),

    // ---- Control frames (ordered; `End`/`Stop` survive interruption) — :1580+ ----
    /// Graceful shutdown after flush (EndFrame, :1581). Uninterruptible.
    End { reason: Option<String> },
    /// Stop but keep processors connected (StopFrame, :1605). Uninterruptible.
    Stop,
    /// LLM response framing (LLMFullResponseStart/End, :1699/1714).
    LlmResponseStart,
    LlmResponseEnd,
    /// TTS response framing (TTSStarted/Stopped, :1850/1867).
    TtsStarted { context_id: Option<Arc<str>> },
    TtsStopped { context_id: Option<Arc<str>> },
    /// Update a service's settings live (ServiceUpdateSettingsFrame, :1878).
    /// Uninterruptible. `target` = STT/TTS/LLM/Filter/Mixer/All.
    UpdateSettings { target: ServiceKind, settings: serde_json::Value },
    /// Speech-control params broadcast (SpeechControlParamsFrame, :1419) + VAD
    /// param updates (VADParamsUpdateFrame, :1939).
    SpeechControlParams { vad: Option<VadParams>, turn: Option<TurnParams> },
    /// Liveness probe (HeartbeatFrame, :1654).
    Heartbeat { timestamp_ns: i64 },
    /// Output transport ready (OutputTransportReadyFrame, :1644).
    OutputTransportReady,

    // ---- OSS extension point ----
    Custom(Arc<dyn CustomFrame>),
}
}

Frame carries its FrameMeta out of band to keep the enum small and match-cheap: the channel item is Envelope { meta: FrameMeta, frame: Frame, direction: Direction } (§2.1). (pipecat stuffs id/name/pts onto the frame object; we separate them so the hot variant — OutputAudio(Arc<AudioFrame>) — stays a thin pointer move and the meta travels alongside.)

#![allow(unused)]
fn main() {
impl Frame {
    /// Scheduling class — drives queue priority + interruptibility (§2.3).
    pub fn class(&self) -> FrameClass { /* match: System for Start/Cancel/Error/
        Interruption/Input*/Vad*/Metrics/FunctionCalls*/…; Control for End/Stop/
        Llm*Response/Tts*/UpdateSettings/Heartbeat/…; Data for the rest;
        Custom delegates to CustomFrame::frame_class() */ }

    /// True ⇒ kept in the queue and not cancelled on interruption (pipecat
    /// `UninterruptibleFrame`: End, Stop, FunctionCallResult, UpdateSettings — :1581/1605/719/1878).
    pub fn uninterruptible(&self) -> bool { /* match those; Custom delegates */ }
}
}
#![allow(unused)]
fn main() {
/// Mono 16-bit LE PCM with an explicit sample rate. **Unchanged** from today's
/// `frame.rs:14` AudioChunk — renamed `AudioFrame` and Arc-wrapped in the enum so
/// the hot path never copies PCM. (Keep an `AudioChunk` type alias for one release.)
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AudioFrame { pub pcm: Vec<i16>, pub sample_rate: u32, pub num_channels: u16 }
}

StartParams, FunctionCall, FunctionCallResult, LlmContext, KeypadEntry, Language, VadParams, TurnParams, ServiceKind, MetricsData are plain structs/ enums in flowcat-core/src/processor/{frame,metrics}.rs mirroring the pipecat fields cited above. StartParams mirrors StartFrame (frames.py:847): audio_in_sample_rate, audio_out_sample_rate, enable_metrics, enable_usage_metrics, enable_tracing, report_only_initial_ttfb.

1.2 What we deliberately omit from the v1 enum

To keep the enum reviewable, the long tail of pipecat's 100+ frames that no current component needs (vision/image frames :171/209/1267, sprite :274, summarization :1735/1751/1782, prompt-caching :671, pause/resume :928/1668, idle-timeout-update :1925, filter/mixer enable :1971/2000, service-switcher :2011) map to Custom until a component needs them first-class, at which point that component promotes them to a named variant (a non-breaking, additive change — adding an enum variant only forces non-exhaustive match sites to add an arm, which the compiler points at). The checklist (§9) names exactly which variants each early component promotes.


2. FrameProcessor trait + the channel runtime

2.1 The trait

#![allow(unused)]
fn main() {
// flowcat-core/src/processor/mod.rs
use async_trait::async_trait;

/// The frame envelope that travels a processor's input channel.
pub struct Envelope { pub meta: FrameMeta, pub frame: Frame, pub direction: Direction }

/// One-time per-task wiring handed to every processor at startup (mirrors pipecat
/// `FrameProcessorSetup`, frame_processor.py:71): the pipeline clock, the (optional)
/// observer fan-out, and the task's shared cancellation token.
#[derive(Clone)]
pub struct ProcessorSetup {
    pub clock: Clock,                       // monotonic ns; `clock.now_ns()`
    pub observer: Option<Observer>,         // Arc fan-out (§5)
    pub cancel: tokio_util::sync::CancellationToken,
    pub enable_metrics: bool,
    pub enable_usage_metrics: bool,
}

/// A processor's view of "downstream" / "upstream" — a `Sender` to each neighbour,
/// wired by `Pipeline::link`. Cloned into the processor's run loop.
#[derive(Clone)]
pub struct Link {
    next: Option<EnvelopeSender>,      // downstream neighbour input
    prev: Option<EnvelopeSender>,      // upstream neighbour input
    name: Arc<str>,                    // this processor's name (for observer events)
    clock: Clock,
    observer: Option<Observer>,
}

impl Link {
    /// Push a frame to the adjacent processor in `direction`. Mirrors pipecat
    /// `push_frame` (frame_processor.py:688): fires the observer `on_push_frame`
    /// hook, then enqueues onto the neighbour's input channel. Backpressure:
    /// `await`s if the neighbour's bounded channel is full (§2.2).
    pub async fn push(&self, meta: FrameMeta, frame: Frame, direction: Direction);
    /// Convenience: push a fresh frame downstream with new meta.
    pub async fn push_down(&self, frame: Frame);
    /// Push an `Error` frame upstream (pipecat `push_error`, :630).
    pub async fn push_error(&self, message: impl Into<String>, fatal: bool);
    /// Broadcast a frame both directions with paired sibling ids (pipecat
    /// `broadcast_frame`, :731) — used for `Interruption`.
    pub async fn broadcast(&self, frame: Frame);
}

/// The building block. Each processor runs in **its own tokio task** fed by a
/// bounded mpsc channel (§2.2). The framework owns the task loop; an impl only
/// writes `process_frame` (and optional `start`/`stop` hooks).
///
/// Mirrors pipecat `FrameProcessor` (frame_processor.py:175): `process_frame`,
/// prev/next links, system-frame priority, interruption handling — but the per-
/// processor task + queues are a *framework* concern here, not re-implemented per
/// processor as in Python.
#[async_trait]
pub trait FrameProcessor: Send + 'static {
    /// Stable, human-readable name (observer events, error attribution, tracing).
    fn name(&self) -> &str;

    /// Called once when the `Start` frame reaches this processor, before any data
    /// frame. Open sockets / spawn provider readers here. Default: no-op.
    async fn start(&mut self, _setup: &ProcessorSetup, _params: &StartParams) -> Result<()> { Ok(()) }

    /// Handle one frame. Push results via `link`. **Must not block**: long work
    /// (a provider round-trip) is driven by an internally-spawned task that feeds
    /// results back as frames (the Gemini reader-task pattern, gemini_live.rs:265).
    /// The default impl forwards the frame unchanged in its direction — so a
    /// pure observer/no-op processor is `process_frame` = default.
    async fn process_frame(&mut self, env: Envelope, link: &Link) -> Result<()> {
        link.push(env.meta, env.frame, env.direction).await; Ok(())
    }

    /// Called on `End`/`Stop`/`Cancel` after the terminal frame is forwarded.
    /// Flush + close. Default: no-op.
    async fn stop(&mut self, _reason: StopReason) -> Result<()> { Ok(()) }

    /// Whether this processor produces metrics (pipecat `can_generate_metrics`,
    /// :395). Services override to `true`.
    fn can_generate_metrics(&self) -> bool { false }
}
}

Result<()> is flowcat_core::Result (error.rs, FlowcatError); an Err returned from process_frame is converted to an upstream Frame::Error{fatal:false} by the task loop (pipecat __process_frame catch → push_error, :979).

2.2 The channel runtime — bounded mpsc, one task per processor

Each linked processor becomes a spawned task:

#![allow(unused)]
fn main() {
// Pseudocode of the framework-owned per-processor loop (processor/runtime.rs).
// Two channels per processor so System frames jump ahead of Data/Control —
// pipecat does this with a PriorityQueue (frame_processor.py:119); a second
// channel is the cheaper, branch-free Rust equivalent.
async fn run_processor(mut p: Box<dyn FrameProcessor>, mut rx: ProcessorRx, link: Link,
                       setup: ProcessorSetup) {
    loop {
        let env = tokio::select! {
            biased;                                   // system channel first
            Some(e) = rx.system.recv() => e,          // Start/Cancel/Interruption/…
            Some(e) = rx.normal.recv() => e,          // Data + Control
            else => break,
        };
        // observer on_process_frame hook (§5)
        if let Some(o) = &setup.observer { o.on_process(&link.name, &env, setup.clock.now_ns()); }
        match &env.frame {
            Frame::Start(p0)   => { p.start(&setup, p0).await.ok(); link.push(env.meta, env.frame, env.direction).await; }
            Frame::Interruption => { /* §2.5: drain `normal` of interruptible frames, keep uninterruptible; forward */ }
            Frame::Cancel{..} | Frame::End{..} | Frame::Stop
                               => { let _ = p.stop(reason(&env.frame)).await; link.push(env.meta, env.frame, env.direction).await; if terminal { break } }
            _                  => { if let Err(e) = p.process_frame(env, &link).await { link.push_error(e.to_string(), false).await; } }
        }
    }
}

struct ProcessorRx { system: mpsc::Receiver<Envelope>, normal: mpsc::Receiver<Envelope> }
type EnvelopeSender = ProcessorTx;   // holds both system+normal Senders; `Link::push`
                                     // routes by `frame.class()` (System→system chan)
}

Bounded vs unbounded — the decision. Pipecat uses unbounded asyncio queues. Flowcat uses bounded tokio::mpsc on the Data/Control (normal) channel and an unbounded channel on the System path:

  • Bounded normal channel (default capacity 64, the bench-rs value). Real-time audio is rate-limited by the wall clock (~50 audio frames/s/leg); a bounded channel gives natural backpressure — if a slow processor (e.g. a TTS provider stalling) can't keep up, the producer awaits instead of growing an unbounded queue and ballooning latency/RAM. This is the right behaviour for media: never buffer seconds of audio. Capacity 64 = ~1.3 s of audio headroom; a producer blocking on a full channel is the signal to interrupt/drop, not to buffer.
  • Unbounded system channel. Cancel/Interruption/Error/Start must never block on a full queue (an interruption that can't be delivered defeats barge-in). The system path is low-volume (events, not a stream), so unbounded is safe and removes the one place backpressure would be a correctness bug. (This is the Rust equivalent of pipecat's HIGH_PRIORITY jumping the queue, frame_processor.py:128.)

Backpressure on the bounded channel is per-hop and bounded by capacity, so end-to-end queueing is O(processors × 64) frames worst case — predictable, unlike an unbounded chain. A producer that hits a full channel during an active interruption is unblocked immediately because the interruption drains the consumer's queue (§2.5).

Scope of the backpressure guarantee. The inbound audio leg — InputAudio/UserAudio — is classified System (processor/frame.rs::class), so it rides the unbounded channel, matching pipecat (input audio is a SystemFrame). That is deliberate: caller/transport capture is wall-clock-rate-limited (~50 fps) and must never be blocked. So the "bounded media backpressure / never buffer seconds of audio" property above applies to the output/Data leg (OutputAudio/TtsAudio, where a stalling TTS/realtime stage is the thing to backpressure), not the input leg. The moat argument is unaffected (input is producer-rate-limited), but don't overclaim bounded backpressure on the input path. A follow-up may add an explicit input-side drop/age policy if a downstream stall is ever observed to grow the inbound queue.

2.3 System-frame priority & interruptibility (parity with pipecat)

  • Priority: Link::push routes a frame by frame.class(): System → the consumer's system channel; Data/Control → the normal channel. The biased select drains system first (pipecat FrameProcessorQueue, frame_processor.py:119-167).
  • Interruptibility: on Frame::Interruption, the task loop drains the normal channel, keeping any frame whose uninterruptible() is true (End/Stop/ FunctionCallResult/UpdateSettings), and cancels the in-flight process_frame only if the current frame is interruptible — exactly pipecat _start_interruption (frame_processor.py:828). The in-flight cancel is a select! between process_frame(...) and an interruption signal on the system channel.

2.4 The latency argument — the channel model stays inside the moat

The moat is p99 ≤ 0.61 ms round-trip to 2,000 calls on one process (bench/RESULTS.md, Azure 16-vCPU). Two facts bound the cost of moving from one select! loop to a channel-per-processor graph:

  1. bench-rs already measured exactly this model. bench-rs/src/main.rs builds a 7-stage pipeline of tasks connected by bounded mpsc channels (CHAN_CAP=64) — the literal analogue of the FrameProcessor graph designed here — and the authoritative bench numbers in RESULTS.md are that channel pipeline, not the monolithic loop. It reports 0.20 µs/frame, 0.029 µs/processor-hop on one core, and the end-to-end real-I/O sweep (real WS + μ-law) holds p99 ≤ 3 ms to 2,000 calls. So the channel model's per-hop cost is already inside the published moat — adopting it is not a regression from the bench, it is the bench.
  2. Per-frame hop budget. A live call is ~50–100 frames/s/leg. A cascaded pipeline is ≤ ~12 processors (transport-in, VAD, turn, STT, user-agg, LLM, assistant-agg, TTS, transport-out, + 2-3 filters/observers); S2S is ~5. At 0.029 µs/hop (RESULTS.md), 12 hops = 0.35 µs/frame of pure framework routing — three orders of magnitude below the 0.61 ms p99 and four below the ~10–20 ms audio frame period. The dominant per-frame cost is, as RESULTS.md §"DOES NOT PROVE" states, the shared μ-law/resample/ WS-syscall work — identical in both the loop and the graph model.

Verdict: bounded-channel-per-processor adds ≤ ~0.4 µs/frame of routing, ~10⁻³ of the p99 budget; the moat is preserved. A later step extends bench-rs to run the real Pipeline (not the standalone 7-stage mock) for both topologies and asserts p99 stays ≤ the current numbers — the gate that holds this claim honest (§9 step 11).

One nuance the design bakes in to keep this true: the hot audio frame is Arc<AudioFrame>, so each hop moves a pointer (the bench moved Bytes, similarly cheap); only Vec<i16> PCM produced by a codec/resample stage allocates, and that's shared I/O cost, not framework cost.

2.5 Interruption end-to-end

Barge-in (Frame::Interruption) is produced by the turn/VAD start strategy or any processor via link.broadcast(Frame::Interruption). It travels the system channel both directions; each processor drains its normal queue of interruptible frames and cancels in-flight interruptible work, then forwards. The transport-output processor additionally clears the carrier's playback buffer (today's transport.send_clear(), pipeline.rs:370 → a process_frame arm on TransportOutput). This is the literal port of pipecat broadcast_interruption (frame_processor.py:704).


3. Pipeline + ParallelPipeline

3.1 Pipeline — a linear chain of linked tasks

#![allow(unused)]
fn main() {
// flowcat-core/src/pipeline/mod.rs
pub struct Pipeline { processors: Vec<Box<dyn FrameProcessor>> }

impl Pipeline {
    pub fn new(processors: Vec<Box<dyn FrameProcessor>>) -> Self { Self { processors } }
}
}

Pipeline is itself a FrameProcessor (so it nests — pipecat Pipeline(BasePipeline), pipeline.py:91). It wraps the user processors with an internal Source and Sink processor (pipecat PipelineSource/PipelineSink, pipeline.py:21/55) so the PipelineTask can inject downstream frames at the head and observe upstream frames at the head, and observe downstream frames at the tail. link() is the framework wiring step run by PipelineTask::setup: it allocates the per-processor channels, builds each Link (prev/next senders), and tokio::spawns one run_processor task per element. The chain order is [Source, ...user, Sink] (pipeline.py:119).

3.2 ParallelPipeline — fan-out / fan-in with lifecycle sync

#![allow(unused)]
fn main() {
pub struct ParallelPipeline { branches: Vec<Pipeline> }
impl ParallelPipeline { pub fn new(branches: Vec<Pipeline>) -> Self { /* ... */ } }
}

Also a FrameProcessor. Mirrors pipecat ParallelPipeline (parallel_pipeline.py:24):

  • A frame entering the parallel block is queued into every branch's source.
  • Each branch has its own Source/Sink; the Sink's downstream output funnels to the parallel block's single downstream, de-duplicating by meta.id (pipecat _parallel_push_frame + _seen_ids, parallel_pipeline.py:168) so a frame fanned to N branches is emitted once.
  • Lifecycle frames (Start/End/Cancel) are synchronized: the block holds a per-frame counter = branch count, buffers non-lifecycle output while synchronizing, and only forwards the lifecycle frame (and flushes the buffer) once all branches have passed it (parallel_pipeline.py:158/182). This prevents a fast branch's End from shutting the transport down while a slow branch still has audio to flush — a correctness invariant we port verbatim.

ParallelPipeline is needed for the cascaded path's service-switcher / parallel STT and for tee'd observers; it is not on the v1 Gemini S2S critical path, so it lands with unit tests but is first exercised by the cascaded path.


4. PipelineTask + PipelineRunner

4.1 PipelineTask — one running pipeline's lifecycle

Mirrors pipecat PipelineTask (task.py:142). Owns: the wrapped pipeline (Source + user + Sink), the push queue, the clock, the observer fan-out, idle detection, heartbeat/watchdog, and the start/end/finished signalling.

#![allow(unused)]
fn main() {
// flowcat-core/src/pipeline/task.rs
pub struct PipelineTaskParams {
    pub audio_in_sample_rate: u32,    // default 16000
    pub audio_out_sample_rate: u32,   // default 24000 (S2S) / per-TTS (cascaded)
    pub enable_metrics: bool,
    pub enable_usage_metrics: bool,
    pub enable_tracing: bool,
    pub enable_heartbeats: bool,
    pub heartbeat_period: Duration,         // default 1s   (task.py:59)
    pub heartbeat_monitor: Duration,        // default 10s  (task.py:60)
    pub idle_timeout: Option<Duration>,     // default 300s (task.py:62)
    pub cancel_on_idle: bool,               // default true
    pub idle_timeout_frames: Vec<FrameKind>,// default [BotSpeaking, UserSpeaking]
}

pub struct PipelineTask { /* pipeline, clock, observers, channels, flags */ }

impl PipelineTask {
    pub fn new(pipeline: Pipeline, params: PipelineTaskParams,
               observers: Vec<Observer>) -> Self;

    /// Queue a downstream frame into the head of the pipeline.
    pub async fn queue_frame(&self, frame: Frame);
    pub async fn queue_frames(&self, frames: impl IntoIterator<Item = Frame>);

    /// Graceful: queue an `End` so the pipeline drains then shuts down (task.py:568).
    pub async fn stop_when_done(&self);
    /// Immediate: queue a `Cancel` (task.py:577).
    pub async fn cancel(&self, reason: Option<String>);

    pub fn has_finished(&self) -> bool;

    /// Run to completion: setup() spawns all processor tasks, inject `Start`, wait
    /// for it to reach the Sink (pipeline ready), pump queued frames, and exit when
    /// a terminal frame (`End`/`Stop`/`Cancel`) reaches the Sink. (task.py:586/818.)
    pub async fn run(self) -> Result<()>;

    /// Event hooks (pipecat task.py event handlers), each a registered async closure:
    pub fn on_started(&mut self, f: impl Fn() + ...);
    pub fn on_finished(&mut self, f: impl Fn(StopReason) + ...);
    pub fn on_error(&mut self, f: impl Fn(&str, bool) + ...);
    pub fn on_idle_timeout(&mut self, f: impl Fn() + ...);
    pub fn on_frame_reached_downstream(&mut self, types: &[FrameKind], f: ...);
    pub fn on_frame_reached_upstream(&mut self, types: &[FrameKind], f: ...);
}
}

Lifecycle (task.py:818 _process_push_queue + :898 _sink_push_frame):

  1. setup builds channels, spawns every processor task, starts the clock + idle task.
  2. Inject Frame::Start(params) at the head; block until it reaches the Sink (every processor has run start()), then signal ready.
  3. Pump queue_frame'd frames into the head.
  4. The Source processor watches upstream frames: an upstream End/Stop/Cancel request (a processor wanting to end the call, today's BrainAction::End) is converted to the corresponding downstream lifecycle frame (task.py:859 _source_push_frame).
  5. The Sink watches downstream frames: when End/Stop/Cancel reaches it, the task signals "ended" and exits the run loop; Heartbeat is timestamped for the monitor; Error{fatal} triggers Cancel.

Idle detection (task.py:970): a watcher resets on each idle_timeout_frames frame seen by the observer; on timeout it fires on_idle_timeout and, if cancel_on_idle, cancels. Heartbeat/watchdog (task.py:941/950): a task pushes Heartbeat every heartbeat_period; the monitor warns if none returns within heartbeat_monitor — and (Rust addition) doubles as a per-task watchdog since a wedged processor would block the heartbeat from traversing. Graceful shutdown: End flushes; Cancel does not (bounded by cancel_timeout, after which the task force-aborts the processor tasks via the shared CancellationToken).

4.2 PipelineRunner — supervise tasks + signals

Mirrors pipecat PipelineRunner (runner.py:25). Runs one or many PipelineTasks, installs SIGINT/SIGTERM handlers that cancel() all tasks for graceful drain (a deploy-cutover drain story relies on SIGTERM grace), and joins them.

#![allow(unused)]
fn main() {
pub struct PipelineRunner { /* tasks, signal guard */ }
impl PipelineRunner {
    pub fn new(handle_sigint: bool, handle_sigterm: bool) -> Self;
    pub async fn run(&self, task: PipelineTask) -> Result<()>;
    pub async fn cancel_all(&self);
}
}

In the embedder, each inbound/outbound call constructs one PipelineTask and hands it to a process-wide PipelineRunner — replacing today's "spawn Call::run per call".


5. Observer trait + metrics frames

5.1 Observer

Non-intrusive monitoring, mirroring pipecat BaseObserver (base_observer.py:70). An Observer sees every processed/pushed frame without sitting in the chain — this is the seam the observability layer (OpenTelemetry/Sentry/Langfuse/RTVI) plugs into.

#![allow(unused)]
fn main() {
// flowcat-core/src/observer.rs
pub struct FrameEvent<'a> {
    pub processor: &'a str,
    pub frame: &'a Frame,
    pub meta: &'a FrameMeta,
    pub direction: Direction,
    pub timestamp_ns: i64,    // pipeline clock
}
pub struct FramePushEvent<'a> {
    pub source: &'a str, pub destination: &'a str,
    pub frame: &'a Frame, pub meta: &'a FrameMeta,
    pub direction: Direction, pub timestamp_ns: i64,
}

#[async_trait]
pub trait FrameObserver: Send + Sync {
    /// A processor is about to handle a frame (base_observer.py:79).
    async fn on_process(&self, _e: &FrameEvent<'_>) {}
    /// A frame was pushed source→destination (base_observer.py:91).
    async fn on_push(&self, _e: &FramePushEvent<'_>) {}
    /// The pipeline finished starting (base_observer.py:103).
    async fn on_pipeline_started(&self) {}
}

/// Cheap clonable fan-out over many observers (pipecat `TaskObserver` proxy,
/// task.py:401). Hooks are invoked **synchronously on the hot path only when
/// enabled** — when no observer is registered the loop skips the call entirely
/// (the `if let Some(o)` in run_processor), so observation is zero-cost-when-off.
#[derive(Clone, Default)]
pub struct Observer(Arc<[Arc<dyn FrameObserver>]>);
}

Built-in observers shipped here (ported from pipecat): TurnTrackingObserver (user/bot speaking edges → turn boundaries), UserBotLatencyObserver (TTFB of the bot's first audio after the user stops), and the IdleFrameObserver that drives idle detection (task.py:70). The observability layer adds the exporters.

5.2 Metrics frames

Frame::Metrics(Vec<MetricsData>) carries the same data as pipecat MetricsFrame (frames.py:1108) + metrics/metrics.py:

#![allow(unused)]
fn main() {
// flowcat-core/src/processor/metrics.rs   (mirrors metrics.py)
pub enum MetricsData {
    Ttfb        { processor: String, model: Option<String>, seconds: f64 }, // metrics.py:29
    Processing  { processor: String, model: Option<String>, seconds: f64 }, // metrics.py:39
    LlmUsage    { processor: String, model: Option<String>, tokens: LlmTokenUsage }, // :68
    TtsUsage    { processor: String, characters: u64 },                     // :78
    TurnPrediction { processor: String, is_complete: bool, probability: f32,
                     e2e_processing_ms: f64 },                              // :101
}
pub struct LlmTokenUsage {                                                  // metrics.py:49
    pub prompt_tokens: u64, pub completion_tokens: u64, pub total_tokens: u64,
    pub cache_read_input_tokens: Option<u64>, pub cache_creation_input_tokens: Option<u64>,
    pub reasoning_tokens: Option<u64>,
}
}

A FrameProcessor produces metrics via helper methods on Link that gate on setup.enable_metrics and emit a Frame::Metrics downstream: start_ttfb/stop_ttfb, start_processing/stop_processing, report_llm_usage, report_tts_usage — the literal port of frame_processor.py:411-489. report_only_initial_ttfb (StartParams) is honored.


6. Seam → processor mapping

Today's five trait seams (MediaTransport, RealtimeLlm, AgentBrain, SessionSource, MediaSerializer) and Call::run's inline logic become processors. The mapping:

Today (seam / inline logic)becomescratenotes
MediaTransport::recv (media.rs:49)TransportInput processor — a source: reads the transport, emits Frame::InputAudio/UserStartedSpeaking/lifecycle downstreamflowcat-transports (trait stays in core)pipecat BaseInputTransport
MediaTransport::send_audio/send_clear (media.rs:53/57)TransportOutput processor — a sink: consumes OutputAudio/TtsAudio, plays to carrier; on Interruption clears playback (was pipeline.rs:370)flowcat-transportspipecat BaseOutputTransport; emits BotStarted/StoppedSpeaking
RealtimeLlm (realtime/mod.rs:22)RealtimeLlmService processor — consumes InputAudio, emits TtsAudio(bot)/Transcription/FunctionCallsStarted/Interruption/Metrics; the reader-task→mpsc bridge (gemini_live.rs:265) becomes the processor's internal task feeding linkflowcat-services (realtime-gemini feature)the trait below; Gemini is one impl
AgentBrain (brain.rs:22)BrainProcessor — consumes FunctionCallsStarted/tool-call frames, emits UpdateSettings(new prompt+tools) on transition / End on terminal; holds the graph statethe embedder (its glue)pipecat has no peer; this is the embedder's engine adapter
SessionSource (session.rs:21)stays embedder glue, NOT a processor — a service the BrainProcessor + a FinalizeProcessor call; bootstrap/finalize/artifact-upload is control-plane I/O, not a media frame stagethe embeddersee §6.2 — it leaves flowcat-core for OSS cleanliness
MediaSerializer (serializer/mod.rs)stays a pure FrameSerializer — no change of shape; TransportInput/Output for a WS carrier compose a FrameSerializer exactly as WsCarrierTransport does todayflowcat-telephonypipecat FrameSerializer
Call::run orchestration (pipeline.rs:130)the Pipeline graph + PipelineTask — the select! loop's arms become each processor's process_frame; LiveState/finalize become FinalizeProcessor+SessionSourcecore + the embedderthe whole point of this framework
LiveState recorder/transcript (pipeline.rs:451)RecorderProcessor + TranscriptProcessor — observers/sinks that tap audio + text framesflowcat-core (recorder), the embedder (finalize)pipecat recorder-as-processor

6.1 New service-processor traits the cascaded path needs (signatures only)

These are frozen here so the provider implementations (the fan-out) build against them. Each is a FrameProcessor plus a service-specific async contract; the framework's process_frame arm calls the contract and emits the result frames. Impls are later work — this framework ships only the trait + a no-op/mock impl for the integration test.

#![allow(unused)]
fn main() {
// flowcat-core/src/service/mod.rs

/// Streaming speech→text. Consumes `InputAudio`, emits `InterimTranscription` then
/// final `Transcription`. Mirrors pipecat `STTService`. (22 providers.)
#[async_trait]
pub trait SttService: Send {
    fn name(&self) -> &str;
    async fn start(&mut self, params: &StartParams) -> Result<()>;
    /// Feed one audio chunk; transcripts arrive asynchronously via the returned
    /// stream of `Frame`s (the processor forwards them downstream).
    async fn run_stt(&mut self, audio: Arc<AudioFrame>) -> Result<Vec<Frame>>;
    async fn set_muted(&mut self, muted: bool);
}

/// Streaming text→speech. Consumes `TtsSpeak`/`Text`, emits `TtsStarted`,
/// `TtsAudio`*, `TtsStopped`. Mirrors pipecat `TTSService`. (31 providers.)
#[async_trait]
pub trait TtsService: Send {
    fn name(&self) -> &str;
    fn sample_rate(&self) -> u32;
    async fn start(&mut self, params: &StartParams) -> Result<()>;
    async fn run_tts(&mut self, text: &str) -> Result<Vec<Frame>>;
}

/// Context-driven LLM. Consumes `LlmContext`/`LlmRun`, emits `LlmResponseStart`,
/// `LlmText`*, optional `FunctionCallsStarted`, `LlmResponseEnd`. (26 providers.)
#[async_trait]
pub trait LlmService: Send {
    fn name(&self) -> &str;
    async fn start(&mut self, params: &StartParams) -> Result<()>;
    async fn run_llm(&mut self, ctx: &LlmContext) -> Result<BoxStream<'_, Frame>>;
    fn set_tools(&mut self, tools: Vec<ToolDecl>);
}

/// The realtime S2S contract (today's RealtimeLlm, realtime/mod.rs:22 — UNCHANGED
/// shape, restated here as the canonical service trait the processor wraps).
#[async_trait]
pub trait RealtimeLlmService: Send {
    async fn connect(&mut self, setup: RealtimeSetup) -> Result<()>;
    async fn send_audio(&mut self, chunk: Arc<AudioFrame>) -> Result<()>;
    async fn update_system(&mut self, prompt: String, tools: Vec<ToolDecl>) -> Result<()>;
    async fn send_tool_result(&mut self, id: String, result: serde_json::Value) -> Result<()>;
    async fn next_event(&mut self) -> Option<RealtimeEvent>;
}

// ---- audio-intelligence traits — signatures only ----

/// Voice-activity detector. Mirrors pipecat `VADAnalyzer`. Silero (ONNX/`ort`)
/// is the reference impl.
pub trait VadAnalyzer: Send {
    fn sample_rate(&self) -> u32;
    /// Classify a frame of audio: Quiet / Starting / Speaking / Stopping.
    fn analyze(&mut self, audio: &AudioFrame) -> VadState;
    fn set_params(&mut self, params: VadParams);
}

/// End-of-turn / semantic-completion analyzer. Mirrors pipecat `BaseTurnAnalyzer`
/// (Smart-Turn v2/v3 is the reference impl).
pub trait TurnAnalyzer: Send {
    /// Given accumulated speech + VAD edges, predict whether the turn is complete.
    fn analyze_turn(&mut self, audio: &AudioFrame, vad: VadState) -> TurnPrediction;
    fn set_params(&mut self, params: TurnParams);
}
}

RealtimeSetup, RealtimeEvent, ToolDecl, BrainAction are unchanged from today's frame.rs (renamed types.rs, §8.4) — the existing Gemini client and an embedder AgentBrain impl satisfy them with thin wrapping into the processor shape.

6.2 Where SessionSource lives — and why it leaves flowcat-core

SessionSource (session.rs) is embedder-specific control-plane I/O (resolve a run+token, upload artifacts, finalize over HTTP, node-tools/tool-call relay). It is not a media-frame stage and it is the one seam that is inherently about the embedder's contract. For OSS cleanliness it lives in the embedder as a plain service the BrainProcessor and a FinalizeProcessor call — flowcat-core keeps only the media-pipeline framework. The node_tools/tool_call relay (session.rs:53/66) becomes a ToolRelay dependency injected into BrainProcessor. This is a clean cut: flowcat-core has zero embedder knowledge (honoring lib.rs and the DESIGN.md OSS boundary), and the embedder's glue owns bootstrap/finalize.


7. Migration / no-regression strategy

The prod Gemini-Live S2S path must not regress (live-verified runs per the voice-live memory notes). Strategy: build the processor pipeline beside Call::run, prove equivalence, then cut over.

7.1 The S2S processor pipeline (what the migration assembles)

TransportInput → RealtimeLlmService(Gemini) → BrainProcessor → TransportOutput
                                ▲                    │
                          (tool calls)      RecorderProcessor (taps both legs)
                          ToolRelay (embedder)       TranscriptProcessor
                                                     FinalizeProcessor (on End → SessionSource)

Every arm of today's select! (pipeline.rs:195-380) maps to a process_frame:

  • carrier audio in (pipeline.rs:221) → TransportInput emits InputAudio → Gemini service send_audio.
  • RealtimeEvent::AudioOut (pipeline.rs:249) → Gemini service emits TtsAudioTransportOutput plays.
  • RealtimeEvent::ToolCall MCP branch (pipeline.rs:276) → BrainProcessor recognizes the node's workflow tools (via ToolRelay) and emits a FunctionCallResult straight back to the Gemini service (no transition).
  • ToolCall transition/end (pipeline.rs:307) → BrainProcessor emits UpdateSettings (new prompt+tools → Gemini update_system) or End.
  • RealtimeEvent::Interrupted (pipeline.rs:367) → Frame::Interruption broadcast → TransportOutput clears.
  • Usage (pipeline.rs:375) → Frame::MetricsRecorderProcessor/finalize accumulate.
  • loop break → terminal EndFinalizeProcessor runs the LiveState/finalize artifact-upload + SessionSource.complete logic (pipeline.rs:538).

7.2 The equivalence test (the gate that authorizes cutover)

This migration is not done until this passes. Reuse the exact mocks already in pipeline.rs:634 (MockSocket scripted Plivo frames, MockRealtime scripted event script, MockBrain, MockSession capturing the finalize payload). Build two harnesses driven by the same scripted inputs:

  1. Call::run (today) → capture: outbound WS frames, send_audio count, tool-result statuses+ids, the Finalize payload (recording/transcript keys, collected_vars incl. folded disposition, usage totals), and the relayed tool_calls.
  2. The processor PipelineTask (new) → capture the same set off the same mocks.

Assert byte-for-byte equality of the captured outputs (the same 6 assertion blocks that call_run_bridges_audio_both_ways_and_finalizes and mcp_tool_call_is_relayed_not_treated_as_transition already check, run against both harnesses). Specifically equal: the count + order of playAudio frames sent to the carrier; the (id, status) tool-result sequence; fin.recording_url/transcript_url (stored keys, not presigned URLs); fin.collected_vars incl. disposition; fin.usage.total_tokens; the relayed (node_id, tool_name, args) and verbatim MCP result. Plus: a timing assertion that the processor pipeline's per-frame routing p99 ≤ the bench-rs channel-pipeline number (§2.4) so cutover can't regress the moat.

Only when this differential test is green (CI, no network) does a later step rewire the embedder to construct the PipelineTask instead of Call::run, and Call::run is deleted (cleanup mandate — no parallel implementations). A live one-call smoke on a carrier dev number (never prod) confirms the live path post-cutover.


8. Crate split & feature matrix

8.1 Crates (the target crate layout)

CrateContentsDeps it may pullLicense
flowcat-coreframework (frame, processor, pipeline, task, runner, observer, metrics) + audio (codec/resample/recorder) + native SIP UA + all trait seams (Transport/Stt/Tts/Llm/RealtimeLlm/Vad/Turn/FrameSerializer/Brain)tokio, tokio-util, async-trait, serde(_json), bytes, thiserror, tracing, rubato, audio-codec-algorithms, hound, rsipstack, randApache-2.0
flowcat-servicesevery STT/TTS/LLM/realtime provider, one cargo feature eachper-feature: reqwest/tonic/tokio-tungstenite/ort/whisper-rsApache-2.0
flowcat-transportsstr0m WebRTC + Opus, WebSocket, Daily, LiveKit, local, avatarsstr0m, opus/audiopus, per-feature SDKsApache-2.0
flowcat-telephonycarrier FrameSerializers (Twilio/Telnyx/Plivo/…) + DTMF (RFC2833 + Goertzel)base64, serde_jsonApache-2.0
flowcat-clidemos/examples (parity with pipecat examples/)the aboveApache-2.0
the embedderthe host's glue: BrainProcessor (its engine adapter), SessionSource, ToolRelay, FinalizeProcessor, routingflowcat-* + the host's enginethe host's license

flowcat-core must not depend on reqwest/tonic/ort/str0m — those live in the sibling crates so core stays compile-fast and dependency-light (constraint 3). The Gemini Live client moves from flowcat-core/src/realtime/ into flowcat-services behind the realtime-gemini feature (the trait RealtimeLlmService stays in core).

8.2 Feature-flag matrix (pipecat "extras" parity)

  • flowcat-services: stt-deepgram, stt-whisper-local, tts-cartesia, tts-elevenlabs, llm-openai, llm-anthropic, realtime-gemini, realtime-openai, … — one feature per provider, each pulling only its client dep. Umbrella features stt-all/tts-all/ llm-all for the CLI/tests.
  • flowcat-core: sip (native SIP UA; embedders that need telephony enable it), recorder, vad-ort (the ONNX runtime is heavy → opt-in even though the trait is always present).
  • flowcat-transports: webrtc-str0m, ws, daily, livekit, local.
  • flowcat-telephony: twilio, telnyx, plivo, dtmf-inband.

8.3 Dependency choices (locked here so the fan-out doesn't relitigate)

NeedCrateWhy
ONNX (Silero VAD, Smart-Turn)ort (ONNX Runtime)the mature Rust ONNX binding; pipecat ships ONNX models; behind vad-ort
WebRTCstr0m (sans-I/O)the chosen WebRTC stack; sans-I/O fits the tokio task model
Opusaudiopus (libopus binding)WebRTC codec; opus pure-Rust is immature
gRPC (Google STT/TTS)tonicthe tokio-native gRPC stack
HTTP/WS providersreqwest + tokio-tungstenitealready in the tree (tungstenite is the Gemini socket)
local Whisperwhisper-rsthe mature whisper.cpp binding (toolchain note in §5)

8.4 Module-rename housekeeping (step M0, done before any new code)

Today's flowcat-core/src/frame.rs holds data shapes (AudioChunk, RealtimeEvent, ToolDecl, …), not pipeline frames. To avoid a name collision with the new processor/frame.rs (the Frame enum), rename the existing frame.rstypes.rs (pure mechanical, update lib.rs re-exports, keep pub use aliases for one release). AudioChunk gets a type alias to the new AudioFrame. This is the first checklist step.


9. Implementation checklist (execute in order — engineer-ready)

Each step says what to build, where, and the tests that gate it. Steps 1–8 are flowcat-core (single PR each or a small stack); step 9 is the migration; 10–12 are cross-cutting. The framework itself does not touch the security-sensitive surfaces (those come with the later work on transfer/DTMF/WebRTC-signaling/serializer-sigs).

  1. M0 — rename frame.rstypes.rs (§8.4). No behavior change. Test: workspace builds + existing suite green.
  2. **processor/frame.rs — the Frame enum + FrameMeta + Direction + FrameClass
    • CustomFrame + AudioFrame** (§1). Tests: class()/uninterruptible() table tests for every variant; a Custom frame round-trips through a no-op processor unchanged; a broadcast pairs sibling ids.
  3. processor/metrics.rsMetricsData + LlmTokenUsage (§5.2). Tests: serde round-trip; mirrors metrics.py field-for-field.
  4. processor/mod.rs + processor/runtime.rsFrameProcessor trait, Link, Envelope, ProcessorSetup, the bounded/unbounded dual-channel run_processor loop, system-frame priority, interruption drain (§2). Tests: a 3-processor hand-wired chain forwards frames in order; a System frame overtakes a backlog of Data frames; an Interruption drops interruptible queued frames but keeps an End; a process_frame Err becomes an upstream Error. Promotes to named variants only what these tests need.
  5. observer.rsFrameObserver, Observer fan-out, FrameEvent/FramePushEvent, TurnTrackingObserver, UserBotLatencyObserver, IdleFrameObserver (§5.1). Tests: an observer sees every push; zero-cost-when-none (no observer ⇒ hook not called); turn-tracking emits boundaries off scripted speaking edges.
  6. pipeline/mod.rsPipeline (Source/Sink wrap, link() spawns tasks), nesting (§3.1). Tests: a Pipeline is a FrameProcessor and nests; downstream injected at head reaches the Sink; upstream observed at head.
  7. pipeline/parallel.rsParallelPipeline (fan-out, id-dedup, lifecycle sync) (§3.2). Tests: a frame fans to N branches and emits once; a fast branch's End is held until the slow branch passes it (the sync invariant).
  8. pipeline/task.rsPipelineTask (Start→ready handshake, push pump, Source/Sink upstream/downstream handling, idle, heartbeat/watchdog, graceful vs cancel, event hooks) (§4.1). Tests: Start reaches Sink before any data frame; stop_when_done drains then ends; cancel skips flush; idle timeout fires + cancels; an upstream End request from a processor converts to a downstream End.
  9. pipeline/runner.rsPipelineRunner (SIGINT/SIGTERM → cancel_all, join) (§4.2). Tests: a simulated signal cancels a running task; multiple tasks join cleanly.
  10. The migration — the S2S processor pipeline + the equivalence test (§7). Build TransportInput/TransportOutput (core trait + the WS-carrier composition reusing FrameSerializer), wrap the existing Gemini client as RealtimeLlmService behind a processor, BrainProcessor + ToolRelay + FinalizeProcessor in the embedder. The gate: the differential test (§7.2) asserting the processor PipelineTask produces byte-identical outputs to Call::run off the shared scripted mocks, plus the p99 timing assertion. Cutover (delete Call::run) only after this is green — a separate PR.
  11. Service-processor trait stubs (§6.1): land SttService/TtsService/LlmService/ VadAnalyzer/TurnAnalyzer traits + a no-op mock impl of each in flowcat-core/src/ service/ and flowcat-core/src/audio/. Test: a mock cascaded pipeline (mock-STT→mock-LLM→mock-TTS) runs a turn end-to-end through a real Pipeline — the fixture the provider implementations build against.
  12. Extend bench-rs to drive the real Pipeline (not the standalone 7-stage mock) for both S2S and cascaded topologies and assert p99 ≤ the published moat. Wire that bench assertion into CI (the .github/workflows/ jobs that already run cargo test --workspace --locked).
  13. Crate split (§8.1): add flowcat-services/-transports/ -telephony skeletons + the feature matrix; move the Gemini client into flowcat-services behind realtime-gemini; Apache headers/NOTICE. (Can run in parallel with 9–11.)

Build order rationale: the enum (1) and the processor runtime (3) are the spine; everything else composes them. The equivalence test (9) is the single gate that makes the whole multi-quarter program safe — nothing in the later provider waves starts until 1–9 are frozen and green (see ROADMAP.md).


10. Open questions (need an architect/user call before/at cutover)

  1. Custom-frame downcast ergonomics for OSS users. The Frame::Custom(Arc<dyn CustomFrame>) escape hatch is the agreed extensibility model, but a provider that needs a first-class, hot-path frame (rare) must promote a variant in core — i.e. a PR to flowcat-core. Acceptable? (Recommendation: yes — the long tail rides Custom; only genuinely-hot frames get variants, which is also true of pipecat's own evolution.)
  2. BrainProcessor location for the OSS demo. flowcat-core ships a demo brain today (lib.rs); the embedder's engine adapter is the host's own code. Confirm the demo BrainProcessor stays in flowcat-core (so the OSS pipeline is runnable end-to-end) while the engine adapter lives in the embedder. (Recommendation: yes — matches the DESIGN.md OSS boundary.)
  3. Heartbeat-as-watchdog default. Pipecat ships heartbeats off by default. An embedder with a deploy-cutover drain story likely wants them on with the 1s/10s defaults — it can flip enable_heartbeats true in its own PipelineTaskParams without changing the core default.
  4. str0m vs a thin WebRTC sidecar for Daily. Not a decision for this framework, but the TransportInput/Output trait shape frozen here must not assume the transport owns its own event loop — confirmed sans-I/O-friendly above.
  5. Lifecycle frames bypass process_frame (RESOLVED). The framework loop (run_processor, §2.2) routes Start and downstream End/Stop/Cancel to the start/stop hooks and never to process_frame; an upstream terminal is a "request to end" that DOES reach process_frame so the Source can convert it (§4.1). This is the single most surprising property for processor authors and was the root of two bring-up hangs (the internal Sink and Source each needed an edge wired for it). Ruled correct and kept (the framework — not the author — owns the lifecycle; routing lifecycle through process_frame would force per-processor lifecycle boilerplate, the exact thing the design eliminates). The contract is now stated on the FrameProcessor trait doc itself (processor/mod.rs) so every later / OSS author hits it. Every later component depends on knowing it.
  6. Source-emit affordance for transports (RESOLVED). A source processor (a transport reading recv()) cannot self-emit from the frozen FrameProcessor::start hook (it gets &ProcessorSetup + &StartParams but no Link). The migration worked around this with a bespoke external pump feeding PipelineTask::queue_sender(). The open question was whether the frozen trait should grow a first-class source affordance before more transports proliferate. Decision: (a) codify the external-pump pattern — NO trait change. flowcat-core ships a small [SourcePump]/[SourceHandle] helper (pipeline/source_pump.rs) that wraps a transport's reader task and emits frames at the pipeline head via the task's queue_sender(); the helper owns the spawn + abort lifecycle so every transport author gets a one-liner instead of re-deriving the pump. Why (a) over a run_source trait hook: (i) the head-injection path is the only way to preserve the Start→ready ordering guarantee — PipelineTask::run blocks on the Start→Sink handshake before it drains the head queue, so a pumped InputAudio provably cannot reach any process_frame before that processor's start() ran (the invariant that matters for .expect()-in-process_frame processors). A runtime-spawned run_source(link, …) would emit before Start traverses downstream, re-introducing exactly the ordering hazard the bring-up fixed — making it safe would require threading a new start-barrier through the frozen runtime, the most delicate part of the runtime. (ii) This is literally pipecat's own modelBaseInputTransport spawns a reader task and pushes; it has no synchronous source-emit method either. (iii) Backpressure stays correct: the head queue is unbounded (input must never block — §2.2) and the first bounded normal channel one hop in applies natural backpressure. (iv) Zero churn on a just-frozen + reviewed trait; the helper is additive and re-uses the proven handshake. Code landed alongside this ruling: SourcePump/SourceHandle + 2 unit tests (one asserts the Start handshake holds for pump-injected frames, one asserts abort-on-drop); s2s.rs's bespoke spawn_transport_pump refactored onto it (the §7.2 differential test stays green). 157 flowcat tests green (--locked), clippy-clean. Note: this adds to the frozen public surface (new pipeline::{SourcePump, SourceHandle} re-exports).

STATUS: implemented 2026-05-31

Flowcat — native SIP (supersedes the FreeSWITCH gateway)

Decision: SIP is native Rust inside flowcat; FreeSWITCH is removed. Flowcat speaks SIP/RTP directly, so it's a single-binary native voice runtime — one project, no separate softswitch. (Supersedes the earlier FreeSWITCH gateway approach described in DESIGN.md.)

1. The transport seam (do this first — everything depends on it)

Generalize the pipeline so it doesn't care whether audio arrived as carrier WS frames or RTP. New in flowcat-core (transport/mod.rs):

#![allow(unused)]
fn main() {
#[async_trait]
pub trait MediaTransport: Send {
    /// Next inbound event: the call started, a chunk of caller audio (at
    /// `carrier_rate`), or the call ended. `None` = transport closed.
    async fn recv(&mut self) -> Option<MediaIn>;
    /// Play bot audio (at `carrier_rate`) to the caller.
    async fn send_audio(&mut self, chunk: AudioChunk) -> Result<(), FlowcatError>;
    /// Barge-in: flush any buffered playback. No-op where unsupported.
    async fn send_clear(&mut self) -> Result<(), FlowcatError>;
    fn carrier_rate(&self) -> u32; // 8000 for telephony G.711
}

pub enum MediaIn {
    StreamStart { call_id: String },
    Audio(AudioChunk),  // at carrier_rate
    Stop,
}
}
  • Call becomes Call<Tr: MediaTransport, R: RealtimeLlm, B: AgentBrain, S: SessionSource> (4 params, was 5). The loop: tr.recv()MediaIn::{StreamStart, Audio, Stop}; realtime AudioOut → resample → tr.send_audio; Interruptedtr.send_clear. The resampler, recorder, transcript, brain/tool handling, finalize are UNCHANGED.
  • WS path preserved via an adapter: WsCarrierTransport<So: MediaSocket, Se: MediaSerializer> impls MediaTransportrecv() loops socket.recv()serializer.on_message until a non-Ignore SerIn, maps to MediaIn; send_audio/send_clear go through serializer.encode_audio/encode_clearsocket.send_*; carrier_rate = serializer's. The embedder builds WsCarrierTransport::new(AxumWsSocket, PlivoSerializer) for Plivo.
  • Keep MediaSocket/MediaSerializer/PlivoSerializer (used by the adapter). The gateway serializer + GatewayEncoding are removed (no more gateway).

2. The SIP/RTP module (flowcat-core/src/sip/)

Add deps to flowcat-core/Cargo.toml: a SIP UA stack — rsipstack (primary; REGISTER + INVITE/ACK/BYE transactions + dialogs; backs the rustpbx softswitch) — and an RTP layer (rsipstack's RTP if sufficient, else webrtc-util/rtp). G.711 + resample already present (audio-codec-algorithms, rubato). If rsipstack's API proves too thin, fall back to ezk-sip-ua (cleaner, SDP/RTP integrated) and report the switch.

  • SipAgent (process-level, one per trunk): REGISTER to {server} with {login}/ {password} + periodic re-REGISTER/keepalive; NAT-friendly (rport/symmetric RTP); accept inbound INVITE → yield an inbound call (Call-ID, From, To/DID) to the host; originate outbound INVITE to an E.164 with a configured CallerID. G.711 only (disallow all; allow PCMU,PCMA), 8 kHz, ptime 20 ms, a=sendrecv.
  • SipTransport (per dialog) impls MediaTransport: on answer/established emit StreamStart{call_id = Call-ID}; decode inbound RTP (G.711 per negotiated codec) → 20 ms MediaIn::Audio @ 8000; on BYE/timeout → Stop. send_audio → G.711-encode → RTP packets (monotonic seq + timestamp, 20 ms cadence, correct PT/SSRF). send_clear → no-op (RTP has no flush; barge-in = stop sending). A small fixed playout jitter buffer (reorder by seq, bounded depth, drop late) — telephony fixed-rate makes this simple; document the depth.
  • Unit-test (no live socket): SDP offer/answer build + parse (PCMU/PCMA pick, ptime), RTP packetize/depacketize (seq/ts/PT), the MediaTransport mapping (a fed RTP stream → Audio; a synthesized BYE → Stop). Live registration/INVITE against the trunk is gated on the user.

3. Host + control-plane rewire (embedder side)

  • The embedder runs a SipAgent at startup, registering the SIP trunk from its own SIP trunk configuration (server / login / password / caller-id, plus public-IP/ports as needed) passed to SipConfig/SipAgent. On inbound INVITE → the embedder's service-authed sip/inbound-resolve (body {dialed_did, caller, call_id}) → {run_id, token} → build Call::new(SipTransport, gemini, <brain>, <session>, run_id, token) → run. Add an internal outbound endpoint (authed by the run's token, or service auth) {run_id, token, to_number} → SipAgent originates the INVITE, builds the Call, runs it. The WS-media path is unchanged.
  • control plane: the SIP originate path no longer uses ESL — it POSTs to the embedder's media binary's originate endpoint (internal base from settings). Remove any FreeSWITCH ESL routes
    • settings. Keep sip/inbound-resolve, the carrier provider (CDR webhook verify + status still useful), and the sealed credentials. The REST signer may go unused → keep (tested, small) or remove if truly dead.

4. Removals (after native SIP compiles)

  • flowcat/deploy/freeswitch/ (whole dir) · flowcat-core/src/serializer/gateway.rs + GatewayEncoding + its lib.rs export · the embedder's FreeSWITCH ESL routes + their module registration · ESL config fields · FreeSWITCH env in any .env.example.
  • Update DESIGN.md: SIP media = native SIP (no FreeSWITCH); bring-up = register the trunk in the embedder + a sofia-free register check.

Security (re-review the new surface)

Trunk SIP credentials live in the embedder's config — never logged; the inbound INVITE trust flows through sip/inbound-resolve (service-authed, identity from the DID route, never the INVITE body); the outbound originate endpoint must authenticate (run token / service auth) and validate to_number (E.164). RTP from an unexpected source addr should be dropped (symmetric RTP). No new secret crosses the wire to the model/caller.

Contributor reference. This is the provider-implementation taxonomy — how connectors are triaged and built. If you just want to use providers, see Providers & features and Configuration.

Flowcat provider protocol-family map

Mirrors the per-provider feature matrix + shared-file pattern used elsewhere; covers the STT / TTS / LLM provider breadth (see ROADMAP.md).

Purpose. This map covers the ~70 remaining providers, each landing in an isolated worktree. The merge hazard is contention on the two shared files — flowcat-services/Cargo.toml ([features]/[dependencies]) and each category's src/{stt,tts,llm}/mod.rs (the mod-decl + pub use list). Pre-creating every provider's module home + feature + dep + a compiling stub means each provider fills one body — never adds a mod decl or a Cargo line → conflict-free merge. It also classifies every provider as (D) distinct client or (W) thin wrapper so a protocol family's real client is implemented once and the wrappers reuse it.


0. The two-letter triage

TagMeaningWhat the fan-out agent does
(D)Distinct client — its own wire protocol (own WS/HTTP framing, own auth, own message schema). Needs a real, from-scratch impl + wire-fixture tests.Implement the client + pure encode/decode seam + unit tests (the Deepgram/Cartesia/OpenAI template).
(W)Thin wrapper — OpenAI-/Whisper-/an-existing-client-compatible: same wire protocol as a (D) family member, differing only in base_url + auth header + default model.A small struct that constructs the family's (D) client with the provider's base URL/auth and delegates the trait (the GrokRealtime-over-OpenAiRealtime template).

The discipline: implement each family's (D) client once; every (W) in that family is ~30 lines of config + delegation. A category agent should own a whole family (the (D) client + all its (W)s) so the wrapper contract is set by the same hand.


1. LLM (26 providers; openai✓ done, anthropic = stub-home here)

The headline finding (confirmed against pipecat): the LLM list is overwhelmingly OpenAI-compatible. In pipecat 15 of the LLM services literally class XLLMService(OpenAILLMService) — they are a base_url + default-model change. They are all (W) over the existing OpenAiLlm (which already has .base_url() + .model() builders and was written for exactly this — its doc already names OpenRouter). Only 3 are (D).

ProviderTagFamily / howbase_url (verified in pipecat)
openai(D) ✓doneOpenAiLlm (chat-completions SSE)https://api.openai.com/v1
openai_responses(D)OpenAI Responses API (/responses, different request+event schema from chat-completions) — its own decodehttps://api.openai.com/v1
anthropic(D)Messages API (/v1/messages, anthropic-version header, content-block-delta SSE) — own clienthttps://api.anthropic.com
google (gemini)(D)generativelanguage streamGenerateContent (Gemini text API; distinct from the realtime client already in core)https://generativelanguage.googleapis.com
aws_bedrock(D)Bedrock InvokeModelWithResponseStream (SigV4 + AWS event-stream framing) — own SigV4 path(region host)
groq(W)OpenAiLlmhttps://api.groq.com/openai/v1
together(W)OpenAiLlmhttps://api.together.xyz/v1
fireworks(W)OpenAiLlmhttps://api.fireworks.ai/inference/v1
openrouter(W)OpenAiLlmhttps://openrouter.ai/api/v1
perplexity(W)OpenAiLlmhttps://api.perplexity.ai
deepseek(W)OpenAiLlmhttps://api.deepseek.com/v1
cerebras(W)OpenAiLlmhttps://api.cerebras.ai/v1
sambanova(W)OpenAiLlmhttps://api.sambanova.ai/v1
nebius(W)OpenAiLlmhttps://api.tokenfactory.nebius.com/v1/
novita(W)OpenAiLlmhttps://api.novita.ai/openai
qwen(W)OpenAiLlmhttps://dashscope-intl.aliyuncs.com/compatible-mode/v1
grok (xai)(W)OpenAiLlmhttps://api.x.ai/v1
nvidia_nim(W)OpenAiLlmhttps://integrate.api.nvidia.com/v1
ollama(W)OpenAiLlmhttp://localhost:11434/v1
sarvam(W)OpenAiLlmhttps://api.sarvam.ai/v1
mistral(W)¹OpenAiLlmhttps://api.mistral.ai/v1
azure(W)²OpenAiLlmAzure endpoint + api-version, api-key auth
speaches(W)OpenAiLlmhttp://localhost:11434/v1 (self-hosted, OpenAI-compatible)

¹ mistral: the brief flags it as arguably (D), but pipecat's MistralLLMService(OpenAILLMService) is OpenAI-compatible → (W). (Mistral STT/TTS are (D) — see below; only the LLM is a wrapper.) ² azure: OpenAI-compatible wire but a different auth (api-key header) + URL shape (?api-version=). It needs OpenAiLlm to accept an api-key-style header — a tiny (D-ish) seam on the OpenAI client (an auth-mode toggle), then the rest is (W). Grouped with the (W) cohort but flagged for the auth seam.

LLM (D) count: 5 (openai✓, openai_responses, anthropic, google/gemini, aws_bedrock). LLM (W) count: 18. → One agent implements the 4 new (D) clients; one agent fans out all 18 (W)s (each a base_url+model struct over OpenAiLlm). The anthropic stub-home is set up here because its llm-anthropic feature already exists from earlier but the impl was never landed (the earlier "anthropic done" is the feature, not the module) — this map fills it.


2. STT (~21 remaining; deepgram✓ done)

Protocol families (cross-checked against pipecat base classes):

  • Streaming-WebSocket (each its own JSON schema)WebsocketSTTService subclasses. Mostly (D) (distinct message schemas), except where they ride a sibling's protocol.
  • Whisper-HTTP segmentedBaseWhisperSTTService / SegmentedSTTService: POST a finished audio segment to an OpenAI-/audio/transcriptions-shaped endpoint. One (D) Whisper-HTTP client, the rest (W) over it.
  • gRPC — Google + NVIDIA Riva use tonic. (D) each (distinct protos).
  • Local — whisper.cpp via whisper-rs. (D), C-toolchain (see §5).
ProviderTagFamily / protocolNotes
deepgram(D) ✓donestreaming WS /v1/listenreference impl
assemblyai(D)streaming WS v3 wss://streaming.assemblyai.com/v3/wsown JSON schema
gladia(D)streaming WS (init-then-stream session)own schema
soniox(D)streaming WS wss://stt-rt.soniox.com/transcribe-websocketown schema
speechmatics(D)streaming WS wss://*.rt.speechmatics.com/v2own schema
cartesia(D)streaming WS (/stt/websocket)own schema (sibling of the TTS WS)
aws_transcribe(D)AWS Transcribe streaming WS (SigV4-signed) + event framingSigV4 (see §5)
azure(D)Azure Speech SDK / WSown protocol
elevenlabs(D)segmented HTTP /v1/speech-to-textown (not Whisper-shaped)
openai(D) Whisper-HTTP basePOST /audio/transcriptions (Whisper)the Whisper-HTTP (D) family client
groq(W)Whisper-HTTPhttps://api.groq.com/openai/v1
fal(W)Whisper-HTTPfal endpoint
speaches(W)Whisper-HTTPself-hosted OpenAI-compatible
xai(W)³Whisper-HTTP / OpenAI-STT WShttps://api.x.ai/v1
sarvam(D)sarvam REST STTown schema
mistral(D)mistral STT RESTown schema
google(D) gRPCCloud Speech tonic streaminggRPC (see §5)
nvidia (riva)(D) gRPCRiva ASR tonicgRPC (see §5)
gradium(D)gradium WS STTown schema
whisper_local(D)whisper-rs (whisper.cpp)C-toolchain (see §5)
speaches¹see groq rowlisted once above

³ xai STT: pipecat's is WebsocketSTTService; xAI exposes an OpenAI-compatible STT — treat as (W) over the Whisper-HTTP/OpenAI-STT family unless a live test shows a distinct WS schema (flag for the implementer). Conservative grouping: (W).

STT (D) count: ~16 (deepgram✓ + assemblyai, gladia, soniox, speechmatics, cartesia, aws_transcribe, azure, elevenlabs, openai-Whisper-base, sarvam, mistral, google-gRPC, nvidia/riva-gRPC, gradium, whisper_local). STT (W) count: ~4 (groq, fal, speaches, xai — all Whisper-HTTP over the openai client). The brief also names nvidia, riva, speachesriva is nvidia's STT (one provider, one feature stt-nvidia); speaches is the (W).


3. TTS (~30 remaining; cartesia✓ done)

Two big families + a long tail of HTTP-POST-audio providers:

  • Streaming-WebSocket TTSWebsocketTTSService subclasses: connect a WS, send a synthesis request, read base64/binary PCM chunks. The Cartesia ref impl is the template. Each has its own request/response schema → (D).
  • HTTP-POST-audio (one-shot / segmented)TTSService subclasses that POST text and read an audio body (mp3/pcm/opus). The wire shape is similar (POST → audio bytes) but the request schemas + audio containers differ enough that there is no single reusable client; most are (D), a few are (W) where they share an OpenAI-TTS shape.
ProviderTagFamily / protocol
cartesia(D) ✓doneWS streaming (reference)
elevenlabs(D)WS streaming /v1/text-to-speech/{voice}/stream-input
deepgram(D)WS streaming /v1/speak
rime(D)WS streaming
asyncai(D)WS streaming
gradium(D)WS streaming
soniox(D)WS streaming
resemble(D)WS streaming
openai(D) HTTP basePOST /audio/speech → audio body (the OpenAI-TTS-HTTP family client)
groq(W)OpenAI-TTS-HTTP (/audio/speech) over the openai client
xai(W)OpenAI-TTS-HTTP shape (XAIHttpTTSService)
aws_polly(D)AWS Polly (SigV4 + SynthesizeSpeech) — §5
azure(D)Azure Speech SSML over WS/HTTP
google(D)Google Cloud TTS (HTTP/gRPC) — §5 for the gRPC variant
sarvam(D)sarvam HTTP TTS
mistral(D)mistral HTTP TTS
nvidia(D)NVIDIA Riva TTS (gRPC) — §5
hume(D)Hume HTTP
inworld(D)Inworld HTTP
minimax(D)MiniMax HTTP
camb(D)Camb HTTP
fish(D)Fish Audio (interruptible)
lmnt(D)LMNT (interruptible)
neuphonic(D)Neuphonic (interruptible)
smallest(D)Smallest (interruptible)
speechmatics(D)Speechmatics TTS
kokoro(D)Kokoro (local model) — C/ONNX-ish, may need a model file
piper(D)Piper (local model, subprocess/ONNX)
xtts(D)XTTS (local model)
asyncai¹see WS row

TTS (D) count: ~28 (cartesia✓ + 27). TTS (W) count: ~2 (groq, xai — OpenAI-TTS-HTTP). TTS is the least wrapper-friendly category — almost every vendor has a bespoke request body. The leverage here is two shared client helpers (a WsTtsClient connect/send/read-base64 skeleton mirroring Cartesia, and an HttpTtsClient POST-and-read-audio skeleton) that the (D) impls reuse for the transport plumbing while each supplies its own request encode + container decode. (Those helpers are an implementation convenience for the implementations, not part of this map.)


To keep each protocol family's (D) client authored once and its (W)s consistent, group by family, not by alphabetical slice:

AgentOwnsWhy grouped
A — LLM/OpenAI-compatiblethe 18 LLM **(W)**s over OpenAiLlmidentical pattern; one PR, 18 ~30-line structs + the base_url table
B — LLM/distinctanthropic, google-gemini, openai_responses, aws_bedrock (the 4 new LLM (D))4 distinct clients incl. the SigV4 Bedrock path
C — STT/Whisper-HTTPopenai-STT (D) client + groq/fal/speaches/xai (W)one Whisper-HTTP client, 4 wrappers
D — STT/streaming-WSassemblyai, gladia, soniox, speechmatics, cartesia-STT, elevenlabs-STT, azure-STT, gradium-STT, sarvam, mistral-STTall WS/REST distinct schemas; the Deepgram template
E — STT/gRPC+AWS+localgoogle-gRPC, nvidia/riva-gRPC, aws_transcribe (SigV4), whisper_localthe toolchain-heavy STT (§5) — keep together so the tonic/SigV4/whisper-rs plumbing is solved once
F — TTS/streaming-WSelevenlabs, deepgram, rime, asyncai, gradium, soniox, resemble TTSshare the WsTtsClient helper (Cartesia template)
G — TTS/HTTP cloudopenai-TTS (D) + groq/xai (W), hume, inworld, minimax, camb, sarvam, mistral, speechmatics, azure-TTSshare the HttpTtsClient helper
H — TTS/interruptible+local+AWS+gRPCfish, lmnt, neuphonic, smallest (interruptible); kokoro, piper, xtts (local models); aws_polly (SigV4); google-TTS / nvidia-TTS (gRPC)the bespoke + toolchain-heavy TTS tail (§5)

8 groups, family-coherent, no two touching the same provider. The new auth/SigV4 paths (aws_bedrock, aws_transcribe, aws_polly, the azure api-key seam) are the security-sensitive ones.


5. Toolchain / dependency notes for the fan-out

This crate declares every dep optional + dep:-gated so a default build pulls nothing and --features <provider> compiles the stub. Three families need a toolchain/dep beyond the existing reqwest/tokio-tungstenite/tokio/base64:

FamilyDepGate(s)Toolchain note
Google / NVIDIA-Riva STT+TTStonic 0.14 (already declared)stt-google, stt-nvidia, tts-google, tts-nvidiagRPC; needs the .protos compiled — tonic-build at build time (a build.rs, added by the impl, not here). protoc must be on PATH for codegen.
whisper_localwhisper-rs 0.16 (already declared)stt-whisper-localbundles whisper.cpp → needs cmake + a C/C++ toolchain to build. Not present by default on this machine unless cmake is installed — building --features stt-whisper-local may fail at the C build step. The Rust stub itself compiles; the dep's C build is the gate.
AWS (Bedrock LLM, Transcribe STT, Polly TTS)hand-rolled SigV4no AWS SDKllm-aws-bedrock, stt-aws-transcribe, tts-aws-pollyno new crate; SigV4 over reqwest + a SHA-256/HMAC (hmac/sha2) — declared optional.

opentelemetry/gRPC/whisper-rs are the only non-pure-Rust-network deps. Everything else is reqwest (rustls) or tokio-tungstenite (rustls) — the rustls-only + zero-default-cost discipline holds. This adds hmac + sha2 (optional) for the AWS SigV4 family; no other new dep.


6. Acceptance

  • cargo build -p flowcat-services (default) → pulls nothing, compiles the trait re-exports + stubs only.
  • cargo build -p flowcat-services --features stt-all,tts-all,llm-all → compiles every stub (modulo stt-whisper-local's C build, which needs cmake — see §5).
  • cargo test -p flowcat-services (existing tests) → still green.
  • Every stub: Apache SPDX header + //! WS-n: <provider> — TODO + a frozen-trait impl whose bodies return FlowcatError::Other("<provider>: not yet wired (WS-n)") (or todo!() for the &str name() accessor) so the crate compiles. #[allow(dead_code)] on the held config fields is the sanctioned clippy escape on a stub.

7. What this map deliberately does NOT do

  • It does not implement any provider body (that is the fan-out).
  • It does not add the WS/HTTP TTS helper clients (WsTtsClient/HttpTtsClient) or the gRPC build.rs / .protos — those are owned by the impls.
  • It does not touch flowcat-core, the Deepgram/Cartesia/OpenAI reference impls' logic, other crates, or the embedder.
  • It does not create a per-(W) stub where the (W) is literally the (D) client + a base_url (the LLM wrappers) — those get a thin module stub anyway (so the mod/feature exists for the fan-out to fill), but their stub body just notes "wrapper over OpenAiLlm — set base_url".

Flowcat roadmap

This is a forward-looking, best-effort roadmap — directional, not a commitment. Shipped capabilities are described in the README and FEATURES.md; this file tracks what is not yet done.

Recently landed

  • Composable FrameProcessor pipeline — typed frame taxonomy, system-frame priority/interruption model, linear + parallel pipelines. (Frozen API in PROCESSOR-DESIGN.md.)
  • In-process native SIP/RTP/SDP user-agent for G.711 telephony.
  • ~80 STT/TTS/LLM + realtime connectors, each a dep:-gated Cargo feature (FEATURES.md).
  • Runnable demos in flowcat-cli: an in-process pipeline demo and a WebSocket PCM echo-bot (pipeline, ws-echo).
  • Use it from Python (out-of-process) — the RemoteBrain HTTP adapter (brain-http) drives conversation policy from a Python service, and the mcp client exposes Python functions as tools. Reference servers in examples/. This is the supported Python path today.

Planned

Python bindings (PyO3)

The out-of-process path above already lets Python drive Flowcat without writing Rust. The next step is in-process bindings: import flowcat, build a pipeline in Python, and pass Python callables/objects as the brain — single process, no service to host. The binding will keep Python at turn granularity (releasing the GIL during the Rust media loop) so the tail-latency guarantees hold. This is a developer-ergonomics step, not a capability gap. Not yet started.

WebRTC browser transport

Complete the str0m-based WebRTC transport (+ Opus) so a browser client can connect directly, alongside the existing carrier/WebSocket paths.

Local device backend

The local mic/speaker transport currently ships its shape as a stub (no device backend, to keep the default build free of a heavy platform audio dependency). Add an optional, feature-gated device backend so the local demo can capture and play real audio.

Broader live-verified provider coverage

Connectors are currently fixture/wire-tested — each provider's message framing is pinned by unit tests, but end-to-end live calls require that vendor's credentials and are not exercised in CI. Expand the set of provider paths that have been verified against the live service.

LiveKit transport

The livekit transport is currently a stub; wire up real signaling.

How to influence the roadmap

Open an issue or start a discussion. Adding a new connector is usually a small, self-contained contribution — see CONTRIBUTING.md and the provider-family taxonomy in PROVIDERS.md.

Changelog

All notable changes to this project are documented here. The format is based on Keep a Changelog, and this project aims to follow Semantic Versioning from 1.0.0 onward. Until then (pre-1.0), minor versions may include breaking changes.

[Unreleased]

Added

  • flowcat-cli ships two runnable, credential-free demos (the OSS examples surface): pipeline (an in-process FrameProcessor pipeline over a synthetic sine-wave source) and ws-echo (PCM echo over the generic WebSocket transport, with a self-contained --loopback round-trip or --connect <ws://url>).
  • RemoteBrain HTTP adapter (flowcat-services, feature brain-http): drive a call's conversation policy from an out-of-process HTTP service (e.g. a Python webhook) via the AgentBrain seam, at turn granularity. Includes a documented JSON wire contract, a request timeout, a fail-safe (Stay on transient error or timeout), and fixture tests.
  • examples/ for using Flowcat from Python without writing Rust: a pure-stdlib reference python-remote-brain server and a python-mcp-tools MCP server.
  • ROADMAP.md describing planned work (in-process PyO3 bindings, WebRTC browser transport, a local audio device backend, broader live-verified coverage).
  • Standard project docs: CODE_OF_CONDUCT.md, SECURITY.md, this changelog, and GitHub issue / pull-request templates.

Changed

  • Documentation scrubbed for the public release: design docs are now embedder-agnostic, and SPINOUT.md is repurposed as a workspace-independence note for contributors.

Removed

  • Internal-only planning and operations documents that are not relevant to the public project.

Breaking

  • Renamed the runtime's environment-variable keys off the previous prefix to the FLOWCAT_* family: FLOWCAT_VOICE, FLOWCAT_VAD_START_SENSITIVITY, FLOWCAT_VAD_END_SENSITIVITY, FLOWCAT_VAD_PREFIX_PADDING_MS, FLOWCAT_VAD_SILENCE_DURATION_MS, and FLOWCAT_MINIMAX_GROUP_ID. Embedders setting these must update the key names.

Code of Conduct

Our pledge

The Flowcat project is committed to providing a friendly, safe, and welcoming environment for all contributors and participants, regardless of background or identity. We expect everyone who takes part in the project — in issues, pull requests, discussions, and any other project space — to help keep it that way.

Our standard

This project adopts the Contributor Covenant, version 2.1 as its code of conduct. The full text is available at:

In short: be respectful and constructive, assume good faith, welcome newcomers, and accept feedback gracefully. Behaviour that harasses, demeans, or excludes others is not tolerated. Maintainers are responsible for clarifying standards and may remove, edit, or reject contributions, comments, and other interactions that are not aligned with this Code of Conduct.

Scope

This Code of Conduct applies within all project spaces, and also when an individual is officially representing the project in public spaces.

Reporting

If you experience or witness unacceptable behaviour, report it confidentially to the maintainers at security@areev.ai. All reports will be reviewed and investigated promptly and fairly, and the privacy and security of the reporter will be respected.

Enforcement

Maintainers will follow the Contributor Covenant's Enforcement Guidelines when determining the consequences for any action they deem in violation of this Code of Conduct.


Attribution: this Code of Conduct adopts the Contributor Covenant, version 2.1, available at https://www.contributor-covenant.org/version/2/1/code_of_conduct/.

Security policy

Supported versions

Flowcat is pre-1.0 and under active development. Security fixes are applied to the latest main. Until a 1.0 release, only the most recent published version (and main) is supported.

Reporting a vulnerability

Please do not open a public issue for security vulnerabilities.

Report privately through either channel:

  1. GitHub private vulnerability reporting (preferred) — use the repository's Security → Report a vulnerability tab to open a private advisory.
  2. Emailsecurity@areev.ai, with a description and reproduction steps.

Please include, where possible:

  • the affected crate(s) and version / commit,
  • the feature flags and provider/transport involved,
  • a minimal reproduction or proof of concept, and
  • the impact you foresee.

What to expect

  • We aim to acknowledge a report within 3 business days.
  • We will work with you on an assessment and a fix, and keep you updated on progress.
  • We support coordinated disclosure: we ask that you give us a reasonable window to release a fix before any public disclosure, and we will credit you for the report unless you prefer to remain anonymous.

Scope

This project is a media-pipeline runtime that speaks third-party provider protocols using credentials you supply. Vulnerabilities in third-party services or SDKs should be reported to those vendors. Issues in Flowcat's own code — for example in its SIP/RTP/SDP parsing, the SigV4 signing paths, WebSocket framing, or audio codec handling — are in scope.