Flowcat
A native-Rust runtime for real-time voice agents — built to run on your own infrastructure. Flowcat carries a phone or WebRTC call through a composable media pipeline — transport in → VAD / turn-taking → STT · LLM · TTS (or a single speech-to-speech model) → transport out — as one self-contained binary you deploy in your own VPC (or fully air-gapped). No hosted control plane, no phone-home, no Python or FreeSWITCH sidecar to operate. You bring your own provider credentials; a call's audio and data never leave infrastructure you control.
It is a clean-room, native-Rust counterpart to the design of
pipecat: the same FrameProcessor
pipeline model and the same provider breadth, packaged for teams that need to
own the stack — self-hosted, auditable, and dense enough to run serious call
volume per box.
License: Apache-2.0 · Status: pre-1.0, building in the open.
New here? The Quickstart takes you from
git cloneto a running pipeline, a real WebSocket audio round-trip, and a Python-driven brain in about five minutes — no credentials.
Where to go next
Building on Flowcat? Follow the path in order:
- Quickstart — clone → build → watch real audio move.
- Build an embedder — the host binary that carries a call.
- Configuration — runtime knobs and credentials.
- Providers & features — the STT / TTS / LLM / transport surface.
- Deployment — ship a release binary in your own VPC.
Contributing to Flowcat? Start with Contributing (build, test, add a provider) and the architecture docs beside it.
This site is generated from the Markdown in the Flowcat repository with mdBook.
Flowcat quickstart
From git clone to "I've watched the runtime move real audio and drive a
conversation" in about five minutes — no credentials, no accounts, no cloud.
What this gets you: a built Flowcat, the FrameProcessor pipeline moving audio
end-to-end, a real WebSocket media round-trip, and your conversation policy driven
from Python over the RemoteBrain seam.
What it does not get you (yet): a live PSTN phone call. That needs a small embedder you wire to your carrier and control plane — step 5 shows exactly what that piece is and where the live-verified path starts. We'd rather you hit a working demo in five minutes than a phone call that needs an hour of setup to fail.
Everything in steps 1–4 is exercised in CI, so it runs on the first try.
Prerequisites
- A recent stable Rust toolchain (rustup) —
cargoon yourPATH. - Python 3 (standard library only) for step 4 — no
pip install. git.
1. Clone & build
git clone https://github.com/AreevAI/flowcat.git
cd flowcat
cargo build -p flowcat-cli # default features → no provider/network deps
The default build pulls no provider client dependencies — every STT/TTS/LLM, transport, and exporter is an opt-in Cargo feature. The first build compiles the workspace (a minute or two); after that, runs are instant.
2. Watch the pipeline move audio
cargo run -p flowcat-cli -- pipeline
A synthetic 440 Hz sine wave is pumped through a composable FrameProcessor graph
(Source → Echo → Tap → Sink) while a FrameObserver counts frames:
flowcat pipeline demo
source : 440 Hz sine, 16000 Hz mono, 320-sample frames
audio : 50 frames (~1.00 s)
chain : Source -> Echo -> Tap -> Sink
frames in : 50 (InputAudio observed)
frames out : 50 (OutputAudio observed)
echoed : 50 (counted in Echo)
wall time : 2.071 ms
result : OK (in == out == sourced)
This is Flowcat's core: each stage is its own tokio task behind a bounded channel
(natural backpressure), and the hot audio frame is an Arc — each hop moves a
pointer, not a buffer. in == out == sourced means nothing was dropped.
3. Real audio over the WebSocket transport
cargo run -p flowcat-cli -- ws-echo
This stands up the actual generic WebSocket media transport — the same one a WS-media carrier connects to — streams PCM frames through it, and echoes them back, asserting they return byte-for-byte:
ws-echo: loopback server listening on ws://127.0.0.1:<port>
ws-echo: stream started (call_id=loopback)
ws-echo: echoed frame 1 (7 samples)
ws-echo: echoed frame 2 (6 samples)
ws-echo: echoed frame 3 (64 samples)
ws-echo: stream stopped after 3 echoed frame(s)
ws-echo: loopback OK — 3 frame(s) round-tripped byte-for-byte (3 echoed server-side)
Pass --connect ws://<host>:<port> to point the echo at a live peer instead of the
in-process loopback.
4. Drive the conversation from Python
You don't have to write Rust to control the agent. Flowcat consults a "brain" at turn granularity (between turns) — your Python never touches the per-audio-frame path, so the runtime's latency profile is unaffected. Start the pure-stdlib reference server:
python3 examples/python-remote-brain/brain_server.py # http://127.0.0.1:8080
In another terminal, play the role Flowcat plays on a call — start a session, then interpret a model tool call:
curl -s -X POST http://127.0.0.1:8080/session \
-H 'Content-Type: application/json' \
-d '{"brain_config":{"graph":"demo"},"provider":"gemini"}'
{ "system_prompt": "You are a friendly receptionist. Greet the caller and ask how you can help.",
"tools": [ { "name": "book_appointment", "...": "..." }, { "name": "end_call", "...": "..." } ],
"node_id": "greeting", "collected_vars": {} }
curl -s -X POST http://127.0.0.1:8080/tool-call \
-H 'Content-Type: application/json' \
-d '{"node_id":"greeting","tool":{"name":"book_appointment","args":{"day":"Tuesday"}},"collected_vars":{}}'
{ "action": "transition",
"system_prompt": "Confirm the appointment day with the caller, then ask them to say 'confirm'.",
"tools": [ { "name": "confirm", "...": "..." }, { "name": "end_call", "...": "..." } ],
"say": "Sure — booking you for Tuesday. Shall I confirm?",
"node_id": "confirm", "collected_vars": { "requested_day": "Tuesday" }, "finished": false }
That's the whole RemoteBrain wire contract: /session seeds state, and
/tool-call returns one of transition / stay / end. Replace the decide()
function in brain_server.py with your own logic — an LLM call, a DB lookup, a
state machine. A Rust embedder wires this in with RemoteBrain::connect(...); see
examples/python-remote-brain. To expose Python
functions as model tools instead, see
examples/python-mcp-tools.
5. From here to a live phone call
The steps above run the runtime in isolation. A real inbound/outbound PSTN call adds the one piece Flowcat deliberately leaves to you — the embedder: a small host binary that
- terminates the call — the native in-process
SipTransport(SIP/RTP, no softswitch required), or a carrier WebSocket transport if you already run one; - resolves & finalizes the call — your
SessionSource, talking to your control plane (routing, auth, recording/transcript upload); - supplies the brain — your own
AgentBrain, or theRemoteBrainfrom step 4.
Flowcat owns the media loop; you own the contract, routing, and credentials — which
is what keeps the whole call on infrastructure you control. The combination
verified end-to-end today is Gemini Live (speech-to-speech) + Plivo telephony,
so start there. The trait seams and full call lifecycle are specified in
DESIGN.md; the provider/transport surface and the "use it from
Python" model are in the README.
Fully on-prem / air-gapped? Swap the cloud providers for the local connectors (Whisper STT; Kokoro / Piper / XTTS TTS; Ollama LLM) and no call audio ever leaves your infrastructure.
Troubleshooting
cargo: command not found— install Rust via rustup and reopen your shell.- First build is slow — that's the one-time dependency compile; re-runs are instant.
- Port 8080 already in use — change
PORTnear the top ofbrain_server.py. - Run the full offline test suite —
cargo test(no network, no credentials).
Next: ready to carry a real call? → Build an embedder.
Build an embedder
The Quickstart runs the runtime in isolation. To carry a real call you write one small host binary — the embedder — that Flowcat deliberately leaves to you. This is the piece that keeps the whole call on infrastructure you control: Flowcat owns the media loop; you own the call contract, routing, and credentials.
The repo ships no embedder — only the credential-free
flowcatdemos. The wiring shown here lives in flowcat-core's own tests (pipeline/s2s.rs). The fastest on-ramp without writing Rust brain logic is the PythonRemoteBrain.
The four seams
An embedder supplies four things, then calls one builder. Two have ready-made implementations you can use as-is; two you implement over your own systems.
| Seam | Trait | Use the built-in… | or implement for… |
|---|---|---|---|
| Media in/out | MediaTransport | SipAgent / SipTransport (native SIP/RTP), or a WS carrier | a custom transport |
| The model | RealtimeLlm + RealtimeKickoff | GeminiLive (speech-to-speech) | another realtime model |
| The conversation | AgentBrain | RemoteBrain (HTTP, step 4) | your own engine, in-process |
| Call resolution + finalize | SessionSource | — | your control plane (always yours) |
AgentBrain — what the conversation does
Synchronous on purpose: pure decision logic, no I/O inside.
#![allow(unused)] fn main() { pub trait AgentBrain: Send { fn system_prompt(&self) -> String; fn tools(&self) -> Vec<ToolDecl>; fn current_node_id(&self) -> String; fn on_tool_call(&mut self, name: &str, args: &serde_json::Value) -> BrainAction; fn is_finished(&self) -> bool; fn collected_vars(&self) -> serde_json::Value; } }
Implement it over your graph/state machine, or use RemoteBrain to drive
decisions from an HTTP service (your Python, your DB, your LLM) — same trait,
over the wire.
SessionSource — your control plane
How Flowcat resolves a call to a config and reports the result back. Always yours; Flowcat never sees your API contract.
#![allow(unused)] fn main() { #[async_trait] pub trait SessionSource: Send + Sync { async fn resolve(&self, run_id: i64, token: &str) -> Result<ResolvedCall, FlowcatError>; async fn complete(&self, run_id: i64, token: &str, fin: Finalize) -> Result<(), FlowcatError>; async fn artifact_upload_url(/* run_id, token, kind */) -> Result<UploadTarget, FlowcatError>; async fn put_bytes(/* url, bytes, content_type */) -> Result<(), FlowcatError>; async fn node_tools(/* run_id, token, node_id */) -> Result<Vec<ToolDecl>, FlowcatError>; async fn tool_call(/* run_id, token, node_id, tool, args */) -> Result<String, FlowcatError>; } }
Exact signatures and the
ResolvedCall/Finalize/UploadTargetshapes:flowcat-core/src/session.rs.
The entry point
One builder wires the four seams into the running pipeline and returns a task you drive to completion:
#![allow(unused)] fn main() { pub async fn build_s2s_task<T, R, B, S>( transport: T, // MediaTransport — e.g. SipTransport realtime: R, // RealtimeLlm+Kickoff — e.g. GeminiLive brain: B, // AgentBrain — e.g. RemoteBrain session: S, // SessionSource — yours run_id: i64, token: String, model: String, ) -> Result<S2sTask>; }
A minimal SIP embedder
Illustrative — accept inbound SIP calls and drive each with Gemini Live + a remote brain. Constructor argument lists are abbreviated; take the exact ones from the linked source / API reference.
// RemoteBrain needs a multi-threaded runtime. #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<(), Box<dyn std::error::Error>> { // 1. Bring up the native SIP user-agent and register with your trunk. let agent = SipAgent::start(SipConfig { server: "sip:sip.example.com".into(), login: std::env::var("SIP_LOGIN")?, password: std::env::var("SIP_PASSWORD")?, caller_id:"+15551230000".into(), public_ip: None, // set on a NAT'd host sip_port: None, // → 5060 rtp_port_base: 16000, rtp_port_tries: 200, // caps concurrent calls }).await?; agent.register().await?; // 2. One call per inbound INVITE. while let Some(invite) = agent.next_inbound().await { let transport = invite.answer().await?; // → SipTransport (MediaTransport) let realtime = GeminiLive::new(/* api_key, voice/model from your config */); let brain = RemoteBrain::connect( "http://127.0.0.1:8080", serde_json::json!({ "graph": "receptionist" }), "gemini", None, ).await?; let session = MyControlPlane::new(/* … */); // your SessionSource impl let task = build_s2s_task( transport, realtime, brain, session, /* run_id */ 1, /* token */ "…".into(), "models/gemini-live".into(), ).await?; tokio::spawn(async move { let _ = task.run().await; }); } Ok(()) }
Cargo.toml (point the deps at the published crates or a git/path checkout):
[dependencies]
flowcat-core = { git = "https://github.com/AreevAI/flowcat" }
flowcat-services = { git = "https://github.com/AreevAI/flowcat", features = ["brain-http"] }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
serde_json = "1"
Choosing S2S vs cascaded
Two pipeline shapes, same four seams:
- Speech-to-speech (S2S) — one realtime model both listens and speaks
(
build_s2s_task+ aRealtimeLlmsuch as Gemini Live): fewest moving parts and a single hop on the hot path. - Cascaded — separate STT → LLM → TTS stages (
build_cascaded_task): mix and match a provider per stage and swap any one independently — more control, more to wire.
Rule of thumb: start with S2S; reach for cascaded when you need a specific STT/LLM/TTS combination a single realtime model can't give you.
What's verified vs. coming
- Live-verified today: speech-to-speech via
build_s2s_taskwith Gemini Live + Plivo / native SIP. Start there. - Cascaded STT → LLM → TTS has a parallel builder,
build_cascaded_task, with the same seam shape; the realtime path is the one verified end-to-end.
Where to read the exact API
- Trait seams:
brain.rs,session.rs,realtime/mod.rs,transport - Builders:
pipeline/s2s.rs - SIP:
sip/agent.rs· SIP design - The full contract & call lifecycle: Design overview
- Browse it all as rustdoc: API reference
Next: Configuration to tune turn-taking and voice, then Deployment to ship it.
Configuration
Flowcat has no central config file and no settings framework (no figment,
envy, or config crate, no flowcat.toml). That is deliberate: Flowcat is a
library you embed, so configuration lives in two clearly separated places.
| Layer | Who reads it | How it's set |
|---|---|---|
| Runtime knobs | the Flowcat runtime, at call time | FLOWCAT_* environment variables |
| Credentials & call settings | your embedder, passed into constructors | Rust code — SipConfig, each service's constructor |
The important consequence: the runtime does not read provider API keys from the
environment in production. Setting OPENAI_API_KEY does not make a running
Flowcat pick it up — your embedder reads its own credentials however it likes and
passes them to the provider constructor.
Contributing to Flowcat and running the live integration tests? Those use their own
*_API_KEYenvironment variables — see Building and running tests.
1. Runtime environment variables
These are the variables the runtime itself reads. All are optional — each falls back to a built-in default. They tune the realtime (speech-to-speech) turn-taking and voice; they have no effect on the cascaded path.
| Variable | Applies to | Values | Default |
|---|---|---|---|
FLOWCAT_VOICE | Gemini Live, OpenAI Realtime | provider voice name | Fenrir (Gemini), alloy (OpenAI) |
FLOWCAT_VAD_START_SENSITIVITY | Gemini Live | START_SENSITIVITY_UNSPECIFIED · _LOW · _HIGH | START_SENSITIVITY_LOW |
FLOWCAT_VAD_END_SENSITIVITY | Gemini Live | END_SENSITIVITY_UNSPECIFIED · _LOW · _HIGH | END_SENSITIVITY_HIGH |
FLOWCAT_VAD_PREFIX_PADDING_MS | Gemini Live | u32 milliseconds | 500 |
FLOWCAT_VAD_SILENCE_DURATION_MS | Gemini Live | u32 milliseconds | 350 |
Turn-taking, in plain terms. START_SENSITIVITY_LOW + a 500 ms
PREFIX_PADDING means a brief caller sound — a backchannel ("uh-huh"), a cough,
line noise — is not committed as a turn, so the agent stops cutting off its own
speech. The end side stays eager (END_SENSITIVITY_HIGH + 350 ms trailing
silence) so the agent still replies promptly once the caller actually finishes.
Invalid values fall back to the default rather than erroring.
Defined in
flowcat-core/src/realtime/gemini_live.rs(VadConfig::from_env) andflowcat-services/src/realtime/openai.rs.
2. Credentials & call settings (programmatic)
Everything else is configured in Rust, by your embedder, and passed into the relevant constructor. This is what keeps credentials on infrastructure you control — they never transit a Flowcat-owned config surface.
SIP / telephony — SipConfig
Passed to SipAgent::start(cfg). Telephony trunk credentials live here and reach
nothing else.
| Field | Type | Notes |
|---|---|---|
server | String | Registrar / proxy URI, e.g. sip:sip.example.com |
login | String | SIP auth username (trunk login) |
password | String | SIP auth password |
caller_id | String | E.164 / trunk number used as the From user on outbound |
public_ip | Option<Ipv4Addr> | Advertise in Via/Contact/SDP for NAT; None → bound local address |
sip_port | Option<u16> | Local SIP signaling port; None → 5060 |
rtp_port_base | u16 | First (even) RTP port to probe; default 16000 |
rtp_port_tries | u16 | Even ports to probe from the base; default 200. Caps concurrent call media to this number. |
Defined in
flowcat-core/src/sip/agent.rs. See the Deployment guide for the firewall implications of the RTP range.
Provider credentials
Each STT / TTS / LLM / realtime service takes its key (and any voice / model settings) through its own constructor. Your embedder decides where those come from — its own env vars, a secrets manager, a vault. A common, simple choice is to read an env var named like the provider expects and pass it in:
#![allow(unused)] fn main() { // illustrative — your embedder owns this let api_key = std::env::var("OPENAI_API_KEY")?; // your choice, not the runtime's let tts = OpenAiTts::new(&api_key, /* voice, model, … */); }
The full set of provider constructors and their arguments is in
flowcat-services;
see the API reference for how to browse it as rustdoc.
Next: Deployment — build a release binary and ship it in your own VPC.
Flowcat feature-flag matrix
Every provider, transport, and exporter is a single Cargo feature, dep:-gated
so the default build pulls none of their client dependencies. This file
enumerates every feature across the four crates; it is derived from the
Cargo.tomls and is the exhaustive companion to the README's summary table and
PROVIDERS.md.
(D) = distinct client (own wire protocol). (W) = thin wrapper over a (D)
family client (base_url + auth + default-model change). See
CONTRIBUTING.md.
flowcat-core — default = ["sip", "recorder"]
| Feature | Default | Pulls | What it gates |
|---|---|---|---|
sip | ✅ | — | native SIP user-agent (REGISTER/INVITE/ACK/BYE) |
recorder | ✅ | — | call recorder-as-processor (WAV via hound) |
vad-ort | — | ort, ndarray | Silero VAD + Smart-Turn ONNX impls |
filter-rnnoise | — | nnnoiseless | pure-Rust RNNoise noise-suppression filter |
The trait seams (Transport/Stt/Tts/Llm/RealtimeLlm/Vad/Turn/FrameSerializer/ Brain/SessionSource), the Gemini Live client, codec/resample, and the SIP/RTP/SDP stack are always present; only the heavy ONNX/RNNoise bodies are gated.
flowcat-services — default = []
Nothing is on by default. Umbrellas: stt-all, tts-all, llm-all,
realtime-all, obs-all.
Realtime / speech-to-speech (7 incl. core Gemini)
| Feature | Tag | Pulls |
|---|---|---|
realtime-openai | (D) | tokio-tungstenite, tokio, base64 |
realtime-azure | (W) over openai | — |
realtime-grok | (W) over openai | — |
realtime-inworld | (W) over openai | — |
realtime-ultravox | (D) | tokio-tungstenite, tokio |
realtime-novasonic | (D) | tokio, base64 |
Gemini Live is the 7th realtime impl and lives in
flowcat-core(re-exported asflowcat_core::GeminiLive); it has noflowcat-servicesfeature.
STT (20)
| Feature | Tag | Transport |
|---|---|---|
stt-deepgram | (D) | WS (ref impl) |
stt-assemblyai | (D) | WS |
stt-gladia | (D) | WS |
stt-soniox | (D) | WS |
stt-speechmatics | (D) | WS |
stt-cartesia | (D) | WS |
stt-azure | (D) | WS |
stt-gradium | (D) | WS |
stt-elevenlabs | (D) | segmented HTTP |
stt-sarvam | (D) | REST |
stt-mistral | (D) | REST |
stt-openai | (D) | Whisper-HTTP base |
stt-groq / stt-fal / stt-speaches / stt-xai | (W) over openai | Whisper-HTTP |
stt-google | (D) | gRPC (tonic) |
stt-nvidia | (D) | gRPC / Riva (tonic) |
stt-aws-transcribe | (D) | SigV4 WS (hmac/sha2) |
stt-whisper-local | (D) | local (whisper-rs; C build, needs cmake) |
TTS (29)
| Feature | Tag | Transport |
|---|---|---|
tts-cartesia | (D) | WS (ref impl) |
tts-elevenlabs / tts-deepgram / tts-rime / tts-asyncai / tts-gradium / tts-soniox / tts-resemble | (D) | WS |
tts-openai | (D) | HTTP base |
tts-groq / tts-xai | (W) over openai | OpenAI-TTS-HTTP |
tts-azure | (D) | SSML WS/HTTP |
tts-sarvam / tts-mistral / tts-hume / tts-inworld / tts-minimax / tts-camb / tts-speechmatics | (D) | HTTP |
tts-fish / tts-lmnt / tts-neuphonic / tts-smallest | (D) | interruptible HTTP |
tts-kokoro / tts-piper / tts-xtts | (D) | local/model HTTP |
tts-google / tts-nvidia | (D) | gRPC (tonic) |
tts-aws-polly | (D) | SigV4 (hmac/sha2) |
LLM (23)
| Feature | Tag |
|---|---|
llm-openai | (D) — chat-completions SSE, ref impl |
llm-openai-responses | (D) — Responses API |
llm-anthropic | (D) — Messages API |
llm-google | (D) — Gemini generateContent |
llm-aws-bedrock | (D) — SigV4 event-stream (hmac/sha2) |
llm-groq, llm-together, llm-fireworks, llm-openrouter, llm-perplexity, llm-deepseek, llm-cerebras, llm-sambanova, llm-nebius, llm-novita, llm-qwen, llm-grok, llm-nvidia-nim, llm-ollama, llm-sarvam, llm-mistral, llm-azure, llm-speaches | (W) over llm-openai (base_url + auth) — 18 wrappers |
Observability + MCP
| Feature | Pulls |
|---|---|
obs-otel | opentelemetry |
obs-sentry | reqwest |
obs-langfuse | reqwest |
mcp | reqwest — MCP-as-processor client |
Brain adapters
| Feature | Pulls |
|---|---|
brain-http | reqwest, tokio/rt-multi-thread — RemoteBrain: drives conversation policy from an HTTP service (e.g. a Python webhook). See examples/python-remote-brain. |
flowcat-transports — default = []
| Feature | Pulls |
|---|---|
webrtc-str0m | str0m, audiopus, tokio, tokio-util |
ws | tokio-tungstenite, tokio, tokio-util |
daily | reqwest |
livekit | — (stub) |
local | audiopus, tokio (local mic/speaker) |
flowcat-telephony — default = ["plivo"]
Serializers are dependency-free flags (pure framing).
| Feature | Default |
|---|---|
plivo | ✅ |
twilio, telnyx, exotel, vonage, genesys, asterisk, cloudonix, vobiz | — |
dtmf-inband | — (in-band Goertzel DTMF; RFC2833 is always available) |
flowcat-agent — default = ["brain"]
A declarative, graph-based agent: define an agent as a node/edge graph_spec
(JSON/YAML) instead of hand-writing an AgentBrain. DeclarativeBrain is a
ready-to-use flowcat_core::AgentBrain over the graph engine.
| Feature | Pulls |
|---|---|
brain | flowcat-core — the DeclarativeBrain AgentBrain adapter (default on) |
With default-features = false the pure graph engine (parse / validate /
{{var}} interpolation) builds with no flowcat-core dependency.
Toolchain caveats
- gRPC (
stt-google/stt-nvidia/tts-google/tts-nvidia) compiles.protos viatonic-build→ needsprotocon PATH. stt-whisper-localbundles whisper.cpp → needscmake+ a C/C++ toolchain. The Rust stub compiles; the dep's C build is the gate.
Everything else is rustls-only reqwest / tokio-tungstenite — no system
OpenSSL, no AWS SDK (the AWS providers hand-roll SigV4 over hmac/sha2).
Deployment
This guide is for operators self-hosting Flowcat in their own VPC (or fully air-gapped). Flowcat is one self-contained binary with no hosted control plane and no phone-home — deployment is "ship a binary, open the right ports, give it your provider credentials."
What you actually deploy is your embedder — the small host binary that terminates calls and supplies the brain. The bundled
flowcatCLI (pipeline,ws-echo) is for credential-free demos, not a production server. Everything below applies to the release binary you build from your embedder crate; the build mechanics are identical.
1. Build a release binary
Flowcat is a Cargo workspace. The default build pulls no provider or network dependencies — every STT/TTS/LLM, transport, carrier, and exporter is an opt-in feature, so you compile only what you ship.
# Demo binary, default features (native SIP + recorder, no providers):
cargo build --release -p flowcat-cli
# A cloud build — Gemini Live S2S over a WebSocket carrier, with telemetry:
cargo build --release -p flowcat-cli \
--features "flowcat-services/realtime-all,flowcat-services/llm-all,flowcat-transports/ws,flowcat-services/obs-otel"
# Fully on-prem / air-gapped — local connectors only, no cloud egress:
cargo build --release -p flowcat-cli \
--features "flowcat-services/stt-whisper-local,flowcat-services/tts-kokoro,flowcat-services/llm-ollama"
Feature groups (umbrellas in parentheses): stt-* (stt-all), tts-*
(tts-all), llm-* (llm-all), realtime-* (realtime-all), obs-*
(obs-otel, obs-sentry, obs-langfuse), transports ws / webrtc-str0m,
carriers plivo (default) / twilio / telnyx / … . The authoritative,
always-current list is the [features] table in each crate's Cargo.toml
(core ·
services ·
transports ·
telephony).
Some local connectors pull native build deps — e.g.
stt-whisper-localneeds a C toolchain (cmake), andvad-ortpulls ONNX Runtime. Install those in your build image.
Fully static binary (optional)
For a dependency-free artifact you can drop into a scratch container or an air-gapped host, target musl:
rustup target add x86_64-unknown-linux-musl
cargo build --release --target x86_64-unknown-linux-musl -p flowcat-cli
(Pure-cloud feature sets build cleanly on musl; feature sets that pull native C libraries — Whisper, ONNX — may need the matching musl system libraries.)
2. Containerize
There is no official image yet — the only Dockerfile in the repo is the
benchmark harness under bench/. A production multi-stage build is small:
# ---- build ----
FROM rust:1-bookworm AS build
WORKDIR /src
COPY . .
# swap in your embedder package + the features you need
RUN cargo build --release -p flowcat-cli --features "flowcat-services/realtime-all"
# ---- runtime ----
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates \
&& rm -rf /var/lib/apt/lists/*
COPY --from=build /src/target/release/flowcat /usr/local/bin/flowcat
# UDP for SIP signaling + the RTP media range (see §Networking)
EXPOSE 5060/udp 16000-16398/udp
ENTRYPOINT ["flowcat"]
A musl static build collapses the runtime stage to FROM scratch + the binary
ca-certificates.
3. Run under systemd
# /etc/systemd/system/flowcat.service
[Unit]
Description=Flowcat voice runtime
After=network-online.target
Wants=network-online.target
[Service]
ExecStart=/usr/local/bin/your-embedder
Restart=on-failure
RestartSec=2
# Runtime knobs (see Configuration). Credentials belong in an EnvironmentFile
# your embedder reads, not in the unit:
Environment=FLOWCAT_VAD_SILENCE_DURATION_MS=350
EnvironmentFile=/etc/flowcat/secrets.env
# Hardening
DynamicUser=yes
NoNewPrivileges=yes
ProtectSystem=strict
ProtectHome=yes
[Install]
WantedBy=multi-user.target
4. Networking
The native SIP transport binds:
| Purpose | Default | Configured by |
|---|---|---|
| SIP signaling | UDP 5060 | SipConfig.sip_port |
| RTP media | UDP 16000, even ports, up to rtp_port_tries (default 200 → 16000–16398) | SipConfig.rtp_port_base / rtp_port_tries |
| Outbound to providers | TCP 443 (HTTPS/WSS) | per provider |
Operational notes:
- The RTP range caps concurrent calls.
rtp_port_triesis the hard ceiling on simultaneous media streams — size it to your expected concurrency, and open exactly that even-port range in your firewall / security group. - Behind NAT (most cloud VMs), set
SipConfig.public_ipso Flowcat advertises the reachable address in Via/Contact/SDP — otherwise media is offered on an unroutable internal IP. - A WebSocket carrier (e.g. Plivo/Twilio media streams) needs only outbound TCP 443 instead of the SIP/RTP ports — your embedder chooses the transport.
- Flowcat exposes no HTTP control port of its own. Health checks, metrics endpoints, and admin APIs belong to your embedder.
5. Scale & capacity
The media loop is Rust — no GC, no GIL — so one process uses every core. There is no worker-fleet to size: scale vertically first, then add processes.
From the published benchmark (Azure 16-vCPU VM, WebSocket + μ-law load, 50 fps/call):
- flat p99 ≤ 0.61 ms from 10 to 2,000 concurrent calls in a single process;
- ~19.6 KB RAM per idle session, 7 tokio tasks per call.
Practical sizing:
- Bound concurrency with
rtp_port_tries(SIP) and your carrier's limits. - One process per box is usually right; run several behind your SIP proxy / carrier load balancer only when you exceed a single host.
- Reproduce the benchmark on your own hardware:
docker compose -f bench/compose.yml up --build(seebench/README.md).
6. Observability
Build with an exporter feature and wire it in your embedder:
obs-otel— OpenTelemetry traces/metricsobs-sentry— error reportingobs-langfuse— LLM call tracing
All are zero-cost when the feature is off. Per-call metrics and transcripts flow
through the pipeline's FrameObserver seam; finalized recordings/transcripts are
uploaded via your SessionSource (your storage, your URLs).
7. Production checklist
- Release binary built with only the features you use.
-
public_ipset if the host is behind NAT. - Firewall opens UDP 5060 + your RTP even-port range (or just outbound 443 for a WS carrier).
-
rtp_port_triessized to target concurrency. -
Provider credentials supplied via your embedder (secrets manager /
EnvironmentFile), not baked into the image. -
FLOWCAT_VAD_*/FLOWCAT_VOICEtuned for your use case (see Configuration). - An exporter feature enabled and pointed at your collector.
- Restart policy + health checks owned by the embedder.
API reference
Flowcat is a set of Rust crates; the canonical, type-level API reference is rustdoc.
Not yet on crates.io / docs.rs. Flowcat is pre-1.0 and not yet published, so there is no docs.rs page yet. Until then, generate the docs locally — it takes seconds. (When published, the hosted link will live here.)
Generate the docs locally
git clone https://github.com/AreevAI/flowcat.git
cd flowcat
# Core seams (no provider features needed):
cargo doc --no-deps -p flowcat-core --open
# Include the provider/transport surface (enable the features you care about):
cargo doc --no-deps \
-p flowcat-core -p flowcat-services -p flowcat-transports -p flowcat-telephony \
--features "flowcat-services/realtime-all,flowcat-transports/ws"
Feature-gated items only appear in rustdoc when their feature is enabled — pass the same features you'd build with (see Deployment).
Crate map
| Crate | What it exposes |
|---|---|
flowcat-core | The seams and runtime: AgentBrain, SessionSource, MediaTransport, RealtimeLlm / RealtimeKickoff, the FrameProcessor graph + Frame taxonomy, build_s2s_task / build_cascaded_task, native SipAgent / SipTransport, and the GeminiLive backend |
flowcat-services | STT / TTS / LLM / realtime provider adapters (feature-gated), observability exporters, the RemoteBrain HTTP adapter (brain-http) |
flowcat-transports | WebRTC (webrtc-str0m), WebSocket (ws), and other media transports |
flowcat-telephony | Carrier serializers (Plivo, Twilio, Telnyx, …) and DTMF |
flowcat-cli | The credential-free flowcat demo binary (pipeline, ws-echo) |
Starting points
- Writing an embedder? → Build an embedder, then the seam sources linked there.
- Architecture & call lifecycle? → Design overview and Frame processor model.
- Which providers exist? → Providers & connectors.
Contributing to Flowcat
Flowcat is Apache-2.0. By contributing you agree your contribution is licensed
under those terms. Every source file carries the SPDX header
// SPDX-License-Identifier: Apache-2.0 as its first line — keep that on any new
file.
By participating you also agree to abide by our
CODE_OF_CONDUCT.md. Security issues should be reported
privately — see SECURITY.md.
This guide covers the two things a contributor most often does: add a provider
and write a processor. Read PROCESSOR-DESIGN.md
(the frozen pipeline API) and DESIGN.md (the runtime + trait seams)
first.
Building and running tests
Prerequisites: a stable Rust toolchain (rustup); Python 3
(standard library only) for the examples/. The default build needs nothing else.
git clone https://github.com/AreevAI/flowcat.git && cd flowcat
cargo build -p flowcat-cli # default features — no provider/network deps
cargo test # the full OFFLINE suite: no network, no credentials
cargo test is the green bar below — pure encode/decode fixtures, hermetic.
Live integration tests exercise a real provider over the network. They live in
#[cfg(test)] blocks (most #[ignore]d, so they're skipped by default) and read
their credentials from the environment. These are test credentials, not
deployment configuration — in production the runtime never reads provider keys
from the environment; an embedder passes them to each service constructor.
Each provider follows a PROVIDER_API_KEY (+ optional PROVIDER_VOICE_ID /
PROVIDER_MODEL) convention; representative variables:
| Variable | Used by |
|---|---|
OPENAI_API_KEY | OpenAI STT / TTS / LLM / Realtime |
ANTHROPIC_API_KEY | Anthropic LLM |
GEMINI_API_KEY | Gemini Live realtime |
GEMINI_LIVE_MODEL | override the Gemini Live model id in tests |
DEEPGRAM_API_KEY | Deepgram STT / TTS |
CARTESIA_API_KEY, CARTESIA_VOICE_ID | Cartesia STT / TTS |
AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION | Bedrock / Transcribe / Polly / Nova Sonic |
XTTS_BASE_URL, KOKORO_BASE_URL, PIPER_BASE_URL, WHISPER_MODEL_PATH | local connectors |
The authoritative list is the #[cfg(test)] blocks in each provider module under
flowcat-services/src. Run one live test by name with its
credentials set:
GEMINI_API_KEY=… cargo test -p flowcat-core -- --ignored gemini_live
The bar (every PR)
cargo build(default features) compiles and pulls no new networked dependency into the default build.cargo testis green — offline. Tests must not hit the network; provider tests are pure encode/decode fixtures (see below).cargo clippy --all-targetsis clean. The only sanctioned escape is#[allow(dead_code)]on a held-but-not-yet-wired config field of a stub.- New crypto / auth / signing paths (e.g. a new SigV4 or signature provider) get an independent known-answer test and an extra reviewer pass.
Adding a provider (STT / TTS / LLM / realtime)
Providers live in flowcat-services (one per file), each behind its own Cargo
feature. The catalogue is organised by protocol family, and a family's real
client is implemented once (PROVIDERS.md §0).
Triage your provider first:
- (D) Distinct client — its own wire protocol (own WS/HTTP framing, own auth, own message schema). Write a real client: the network transport + a pure encode/decode seam + unit tests. Templates: Deepgram (STT-WS), Cartesia (TTS-WS), OpenAI (LLM/Whisper-HTTP).
- (W) Thin wrapper — protocol-compatible with an existing (D) family member
(OpenAI-compatible, Whisper-HTTP, OpenAI-Realtime, …), differing only in
base_url+ auth header + default model. Write a ~30-line struct that constructs the family's (D) client with that config and delegates the trait. Template: any of thellm-*OpenAI wrappers.
The mechanics:
- Implement the trait seam for your category (
Stt/Tts/Llm/RealtimeLlm, defined inflowcat-core) inflowcat-services/src/<cat>/<name>.rs. - Add a
dep:-gated Cargo feature inflowcat-services/Cargo.toml. A (W) just enables its (D) family feature (e.g.llm-groq = ["llm-openai"]). A (D) pulls only its client dep (reqwest,tokio-tungstenite,tonic, …). No new dep may land in the default build —default = []inflowcat-services. - Register the
mod+pub usein the category'smod.rs, and add your feature to the relevant*-allumbrella so the CLI/CI fat build covers it. - Write the fixture test. This is the coverage bar: a pure function that builds your provider's outbound request frame(s) and parses a recorded inbound response, asserting on the exact bytes/JSON — no live socket. For a SigV4 provider (AWS Bedrock/Transcribe/Polly), pin a known-answer signing test against the AWS-published vectors. Live verification needs the vendor's credentials and is out of scope for CI.
A "not yet wired" stub returns FlowcatError::Other("<provider>: not yet wired")
and still compiles + passes — that's the floor, a real PR replaces it with the
client + fixtures.
Writing a processor (the contract every author must know)
Each processor is a FrameProcessor (flowcat-core/src/processor/). The
framework owns the per-processor tokio task, the bounded/priority channels, and
the lifecycle; you write process_frame and optionally start/stop.
The one contract that surprises people — lifecycle/system frames bypass
process_frame (PROCESSOR-DESIGN.md §2.1–§2.3):
Start→ the framework calls yourasync fn start(&mut self, setup, params)(open sockets, spawn provider reader tasks here), then forwards the frame. It does not reachprocess_frame.End/Stop/Cancel→ the framework calls yourasync fn stop(&mut self, reason)(flush + close), then forwards. Also not viaprocess_frame.Interruptionand other System frames ride an unbounded priority channel and are drained ahead of data/control by abiasedselect; the task loop handles interruption (draining interruptible queued frames, keeping uninterruptible ones, cancelling an in-flight interruptibleprocess_frame).
So process_frame only ever sees Data/Control frames. Do not put socket
open/close in process_frame — it will never run for the lifecycle frames that
should trigger it. Other rules:
process_framemust not block. Long work (a provider round-trip) is driven by an internally-spawned task that feeds results back as frames — the Gemini reader-task pattern (flowcat-core/src/realtime/gemini_live.rs).- Push results via the
Link(push/push_down/push_error/broadcast), never by calling another processor directly. - The hot audio frame is
Arc<AudioFrame>— clone the Arc, don't copy PCM. - An
Errreturned fromprocess_framebecomes an upstream non-fatalErrorframe; returnErrfor recoverable faults, setfatalfor terminal ones. - A pure observer/no-op processor is the default
process_frame(forward unchanged) — don't override what you don't need.
Flowcat stays contract-agnostic: keep any embedder/control-plane knowledge
out of flowcat-core. Conversation decisions and call bootstrap/finalize are the
AgentBrain / SessionSource trait seams an embedder implements; the runtime
treats brain_config as opaque bytes (DESIGN.md).
Feature-flag discipline (no default-build cost)
flowcat-coredefault =["sip", "recorder"]— no HTTP/gRPC/ONNX. The only optional core deps areort(vad-ort) andnnnoiseless(filter-rnnoise).flowcat-services/flowcat-transportsdefault =[]. Every provider and transport isdep:-gated; the default build links none of their clients.flowcat-telephonydefault =["plivo"](serializers are deps-free flags).- Adding a provider must not move any dependency out of
optional/dep:gating. If a reviewer seescargo treeon the default build grow, the PR is wrong.
The exhaustive matrix is FEATURES.md — keep it in sync when you
add a feature.
Review
Substantive PRs get a code review; anything touching auth, signing, or signature verification also gets a security review. Be honest in the PR about what is fixture-tested vs live-verified — overclaiming a live-working provider is the thing reviewers push back on hardest.
Flowcat — design
Flowcat is a native-Rust real-time voice-agent runtime built to run on
infrastructure you own: it carries a phone/WebRTC call's audio, runs a
speech-to-speech model, and lets a pluggable "brain" drive the conversation — all
as one self-contained binary you deploy in your own VPC (or air-gapped), with
no hosted control plane and no Python or FreeSWITCH sidecar. Because the media
loop is one tokio process (no GIL, no FFI), that single binary also holds
serious call volume per box. It is an isolated, Apache-2.0 cargo workspace
designed to be embedded by a host application.
Motivation + benchmark:
bench/RESULTS.md— one Flowcat process holds a flat p99 from 10 to 2,000 concurrent calls where a Python deployment grows a multi-second tail and needs a worker fleet. Read this as runtime capacity and reliability headroom, not conversational latency (which your STT/LLM/TTS providers dominate). Native SIP is the current approach — softswitch optional, superseding the earlier FreeSWITCH gateway design. Native SIP details:SIP-DESIGN.md.
Scope of this document. This is the architecture-and-trait-seam spec, written at the first milestone (Plivo WS-media + native SIP, with Gemini Live as the first speech-to-speech brain). The seams below are unchanged, but the runtime has broadened since:
- Two pipeline shapes, not just S2S — a single speech-to-speech model (
RealtimeLlm, e.g. Gemini Live) or a cascaded STT → LLM → TTS pipeline. Both are built fromflowcat-core::pipeline(build_s2s_task/build_cascaded_task).- ~80 STT/TTS/LLM + realtime providers, 5 transports, and 9 telephony serializers, now split into sibling crates (
flowcat-services,flowcat-transports,flowcat-telephony), each behind one Cargo feature — see theREADMEcrate map + connector table andFEATURES.md.- Fully local / air-gapped by swapping in the local connectors (Whisper STT; Kokoro / Piper / XTTS TTS; Ollama LLM) — no call audio leaves your infrastructure.
- Python without Rust — the
RemoteBrainHTTP adapter drives theAgentBrainseam from a Python service (seeQUICKSTART.mdandexamples/).Treat the
README,FEATURES.md, andPROCESSOR-DESIGN.mdas authoritative for the current surface; this doc remains the reference for the trait seams and call lifecycle, which are stable. Concrete provider/crate names below (e.g. "the Gemini Live client") are the milestone's first implementation, not the limit of what ships.
Goal of the first milestone
A telephony call works end to end through Flowcat for two carrier styles:
- WebSocket-media carriers (e.g. Plivo) — audio already arrives over a WS. Pure-Rust path, live-testable without extra infra. This is the easy path and the integration baseline.
- SIP/RTP-only carriers — no provider WS-media. Flowcat speaks SIP/RTP
natively (no softswitch): a
SipAgentinflowcat-coreREGISTERs the carrier's trunk and terminates INVITE/RTP in-process, and aSipTransportpresents the call to the pipeline through the sameMediaTransportseam the WS path uses. This is the native-SIP decision — one single Rust binary, no FreeSWITCH/mod_audio_streamgateway. (The earlier FreeSWITCH gateway approach is superseded — seeSIP-DESIGN.md.)
The first milestone's brain is native Rust Gemini Live (the speech-to-speech path
live-verified end to end); a cascaded STT → LLM → TTS pipeline is the other supported
shape. Either way the conversation logic lives behind the AgentBrain trait — the embedder's
own engine linked as an rlib (no PyO3 FFI), or the ready-made RemoteBrain HTTP adapter
driving it from a Python service.
Two-plane fit + where Flowcat sits
PSTN ─SIP/RTP ───────────────────────> SipAgent (in flowcat-core, runs in the embedder) ─┐
PSTN ─Plivo <Stream> WS ─> host WS ─> WsCarrierTransport ────────────────────────────────┤
▼
┌──────────── the embedder (binary, host workspace) ──────────┐
│ HTTP: /telephony/ws/{provider}/{run} · answer-XML · health │
│ runs flowcat SipAgent (SIP trunk REGISTER) + control plane │
│ adapts its inbound WS → MediaSocket → WsCarrierTransport │
│ impl AgentBrain (→ the host's engine rlib, NO PyO3) │
│ impl SessionSource (→ the host's control-plane API) │
└───────────────────────────┬─────────────────────────────────┘
│ uses
┌──────────── flowcat-core (lib, OSS workspace) ──────────────┐
│ MediaTransport seam: SipTransport (SIP/RTP) · WsCarrier+ │
│ MediaSerializer(plivo) · RealtimeLlm(GeminiLive) │
│ Call pipeline · codec · recorder · sip/ (SipAgent, RTP/SDP) │
│ traits: AgentBrain·SessionSource·MediaTransport·RealtimeLlm │
└──────────────────────────────────────────────────────────────┘
shared database / object store (via the embedder's control plane only)
flowcat-core knows nothing about the embedder, web routing, SQL, or the wire contract.
The embedder-specific glue (engine adapter, control-plane client, auth/routing) lives in the
consumer crate.
Crate layout
The tree below is the milestone's core-centric view (providers, transports, and serializers shown inside
flowcat-core). They have since been split into the sibling cratesflowcat-services/flowcat-transports/flowcat-telephony; theREADMEcrate map is the current authority. Theflowcat-coremodule seams shown here are still accurate.
flowcat/ # ← own cargo workspace (Apache-2.0)
Cargo.toml # [workspace] members = ["flowcat-core", "flowcat-cli", ...]
DESIGN.md LICENSE
flowcat-core/ # the runtime library
src/
lib.rs
frame.rs # AudioFrame, ControlEvent, etc.
error.rs # FlowcatError
codec.rs # g711 ↔ pcm16, resample (rubato)
audio.rs # AudioRecorder (mono mix → WAV bytes)
transport/{mod.rs, media.rs, carrier.rs, socket.rs, ws_media.rs} # MediaTransport seam + WsCarrierTransport
serializer/{mod.rs, plivo.rs}
sip/{mod.rs, agent.rs, transport.rs, rtp.rs, sdp.rs} # native SIP UA + SipTransport (rsipstack + RTP/SDP/jitter)
realtime/{mod.rs, gemini_live.rs}
brain.rs # trait AgentBrain + ToolDecl + BrainAction
session.rs # trait SessionSource + ResolvedCall + Usage
pipeline.rs # Call::run(...) — the orchestration loop
transcript.rs # transcript collector
flowcat-cli/ # example: local-mic / ws demo (DX), embedder-agnostic
bench/ bench-rs/ # (existing benchmark kit)
# the embedder (lives in the host's own workspace):
# glue binary that runs the SipAgent + the control-plane originate endpoint,
# a telephony provider, and the carrier routes / inbound-resolve / originate.
The embedder's Cargo.toml path-deps (or git/crates.io-deps) flowcat-core and links its
own engine. Cross-workspace deps are fine — flowcat-core builds in each consumer's graph and
standalone via its own lockfile.
Trait contracts (the seams everything plugs into)
All async traits use async_trait. Audio is 16-bit little-endian mono PCM internally;
sample rate is explicit on every buffer.
#![allow(unused)] fn main() { // frame.rs pub struct AudioChunk { pub pcm: Vec<i16>, pub sample_rate: u32 } // mono // transport/media.rs — THE pipeline seam. The pipeline never cares whether audio // arrived as carrier WS frames or as RTP. SipTransport and WsCarrierTransport both impl it. #[async_trait] pub trait MediaTransport: Send { async fn recv(&mut self) -> Option<MediaIn>; // StreamStart{call_id} | Audio(@carrier_rate) | Stop async fn send_audio(&mut self, chunk: AudioChunk) -> Result<(), FlowcatError>; // bot audio out async fn send_clear(&mut self) -> Result<(), FlowcatError>; // barge-in flush (no-op for RTP) fn carrier_rate(&self) -> u32; // 8000 for telephony G.711 } pub enum MediaIn { StreamStart { call_id: String }, Audio(AudioChunk), Stop } // transport/socket.rs — WS building block; the host provides the raw socket. Used (with a // serializer) by `WsCarrierTransport: MediaTransport` for the Plivo path. Native SIP bypasses this. #[async_trait] pub trait MediaSocket: Send { async fn recv(&mut self) -> Option<WsIn>; // Text(String) | Binary(Vec<u8>) | Close async fn send_text(&mut self, s: String) -> Result<(), FlowcatError>; async fn send_binary(&mut self, b: Vec<u8>) -> Result<(), FlowcatError>; } // serializer/mod.rs — per-carrier WS framing for WsCarrierTransport. Pure (no I/O). plivo only. pub trait MediaSerializer: Send { fn on_message(&mut self, msg: &WsIn) -> SerIn; // StreamStart{call_id,..} | Audio(AudioChunk) | Stop | Ignore fn encode_audio(&self, chunk: &AudioChunk) -> WsOut; // text/binary to send back fn encode_clear(&self) -> Option<WsOut>; // barge-in / interruption fn carrier_rate(&self) -> u32; // 8000 for telephony μ-law } // realtime/mod.rs — the speech-to-speech model abstraction (GeminiLive first). #[async_trait] pub trait RealtimeLlm: Send { async fn connect(&mut self, setup: RealtimeSetup) -> Result<(), FlowcatError>; // system prompt+tools async fn send_audio(&mut self, chunk: AudioChunk) -> Result<(), FlowcatError>; // 16k PCM in async fn update_system(&mut self, prompt: String, tools: Vec<ToolDecl>) -> Result<(), FlowcatError>; async fn send_tool_result(&mut self, id: String, result: serde_json::Value) -> Result<(), FlowcatError>; async fn next_event(&mut self) -> Option<RealtimeEvent>; // AudioOut(24k) | UserText | BotText | ToolCall | Interrupted | Usage | Closed } // brain.rs — the conversation decision-maker. The embedder impls this over its own engine. pub trait AgentBrain: Send { fn system_prompt(&self) -> String; fn tools(&self) -> Vec<ToolDecl>; // transitions + endCall (+ later: node tools) fn on_tool_call(&mut self, name: &str, args: &serde_json::Value) -> BrainAction; fn is_finished(&self) -> bool; fn collected_vars(&self) -> serde_json::Value; } pub enum BrainAction { Transition { system_prompt: String, tools: Vec<ToolDecl>, say: Option<String> }, Stay, End { disposition: Option<String> } } pub struct ToolDecl { pub name: String, pub description: String, pub params: serde_json::Value } // JSON-schema params // session.rs — call bootstrap + finalize. The embedder impls this over its control-plane HTTP. #[async_trait] pub trait SessionSource: Send + Sync { async fn resolve(&self, run_id: i64, token: &str) -> Result<ResolvedCall, FlowcatError>; async fn complete(&self, run_id: i64, token: &str, fin: Finalize) -> Result<(), FlowcatError>; async fn artifact_upload_url(&self, run_id: i64, token: &str, kind: &str) -> Result<UploadTarget, FlowcatError>; async fn put_bytes(&self, url: &str, bytes: Vec<u8>, content_type: &str) -> Result<(), FlowcatError>; } pub struct ResolvedCall { pub provider: String, pub brain_config: serde_json::Value, /* graph_spec+runtime+seed */ pub is_completed: bool } pub struct Finalize { pub usage: serde_json::Value, pub collected_vars: serde_json::Value, pub recording_url: Option<String>, pub transcript_url: Option<String> } }
brain_config is opaque to flowcat-core (it's the embedder's graph/spec + runtime options +
seed vars); the embedder builds its brain from it. Flowcat never sees the contract.
The embedder assembles these seams into a runnable call with one of the two builders in
flowcat-core::pipeline: build_s2s_task (a single RealtimeLlm such as Gemini Live) or
build_cascaded_task (a MediaTransport + STT + LLM + TTS chain). Both accept any
AgentBrain and drive the same lifecycle below.
Call lifecycle
The embedder owns the control plane; the shapes below are the typical wiring it provides.
WS-media inbound (e.g. Plivo): carrier → the embedder's answer proxy →
inbound-run endpoint (verify sig, route, create run, token) → answer XML with <Stream> →
the carrier opens a WS to the embedder's /telephony/ws/{provider}/{run}?token= →
SessionSource.resolve → the call pipeline runs.
WS-media outbound: initiate-call endpoint → run+token → carrier originate (answer_url) →
the carrier GETs answer XML → <Stream> WS → same pipeline.
SIP inbound (native SIP): PSTN → SIP → the SipAgent running inside the embedder
(the trunk is REGISTERed at startup) accepts the INVITE → the embedder resolves the call over its
control plane (DID → workflow → create run+token) → builds a SipTransport for the dialog and
runs the pipeline. Carrier CDR/recording webhooks are side-effect-only, never the media trigger.
SIP outbound (native SIP): initiate-call endpoint → run+token → the control plane POSTs to
the embedder's originate endpoint ({run_id, token, to_number}, no ESL) → the SipAgent originates
the INVITE to the E.164 → on answer builds a SipTransport and runs the pipeline. CallerID is
configured on the trunk by the embedder's SIP trunk configuration.
Audio path
Telephony is G.711 μ-law 8 kHz. Gemini Live wants 16 kHz PCM in, emits 24 kHz PCM out.
carrier → μ-law decode → 8k→16k upsample (rubato) → Gemini
Gemini → 24k→8k downsample (rubato) → μ-law encode → carrier
recorder taps both legs → mono mix → WAV → object store
On the SIP path the SipTransport decodes inbound RTP (G.711 PCMU/PCMA per the negotiated
codec) straight to 8 kHz PCM and re-encodes the bot leg to RTP — no WS hop, no intermediate L16
framing. On the Plivo path the PlivoSerializer handles the μ-law WS framing. Both feed the
same resample/recorder. Crates: audio-codec-algorithms (G.711), rubato (resample), hand mix
for the recorder.
The rates above are Gemini Live's (16 kHz in / 24 kHz out). A cascaded pipeline instead
resamples the same 8 kHz carrier audio to whatever each STT/TTS provider expects; the carrier
codec, rubato resampling, and the dual-leg recorder are identical either way.
Gemini Live protocol (native client)
WSS wss://generativelanguage.googleapis.com/ws/google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContent?key=<API_KEY> — JSON frames (not protobuf).
- client→server:
setup(model,systemInstruction,tools,responseModalities:["AUDIO"], input/output transcription on) ·realtimeInput.mediaChunks[{mimeType:"audio/pcm;rate=16000", data:b64}]·toolResponse.functionResponses·clientContent(kickoff turn). - server→client:
setupComplete·serverContent.modelTurn.parts[].inlineData(24 kHz PCM b64, the bot audio) ·serverContent.inputTranscription/outputTranscription·serverContent.interrupted(barge-in) ·toolCall.functionCalls[{name,args,id}]·usageMetadata·goAway/sessionResumptionUpdate.
Tools = the brain's transitions as no-arg functions + endCall. On toolCall: call
AgentBrain.on_tool_call; for Transition, RealtimeLlm.update_system(new prompt, new tools) +
send_tool_result; for End, drain + finalize. v1 handles interrupted (clear carrier audio);
goAway/reconnect is a documented follow-up (see ROADMAP.md).
What the embedder's control plane provides
Flowcat itself does not implement the control plane. To wire a SIP/RTP carrier, the embedder supplies (in its own, separately-reviewed code):
- A telephony provider for the carrier:
sample_rate() = 8000; per-event webhook signature verification (e.g. HMAC over the carrier's event string); inbound parsing (DID/caller/call-id) and status → lifecycle mapping; plus any REST auth signer the carrier's API needs. - Routes the carrier talks to: CDR/event webhooks (side-effect-only), a service-authed
sip/inbound-resolve(DID → workflow → run+token), and aninitiate-callbranch whose originate path POSTs to the embedder's media-binary originate endpoint ({run_id, token, to_number}) — no ESL. - A credential shape for the carrier (API key/secret, SIP login/password/server, caller-id),
sealed at rest. The SIP trunk that actually REGISTERs is configured by the embedder's SIP trunk
configuration (server / login / password / caller-id) passed to
SipConfig/SipAgent.
OSS boundary & license
- Flowcat (Apache-2.0):
flowcat-core(runtime + the four traitsMediaTransport·RealtimeLlm·AgentBrain·SessionSource+ native SIP UAsip/+WsCarrierTransport+ codec/recorder + a demo brain) and the sibling cratesflowcat-services(~80 STT/TTS/LLM/realtime providers + obs exporters + MCP),flowcat-transports, andflowcat-telephony(carrier serializers + DTMF), plus theflowcat-clidemo binary. Every provider/transport is one opt-in Cargo feature, so a fully local/air-gapped build is just a feature selection. No embedder contract. - The embedder (its own code): the glue binary (engine adapter + control-plane client +
auth/routing), plus whatever editor, campaigns, billing, and multi-tenant control plane it
provides. The host's brain implementation plugs in via the
AgentBraintrait. - The Memory trait is OSS; a concrete backend is the embedder's choice (not wired here).
Security
- Per-call token authorizes the media WS + every control-plane call. The WS query token is
checked by the embedder before
resolve. - Carrier signature verification (CDR webhooks) stays in the embedder's control plane (constant-time compare, replay window). The model never receives carrier/embedder credentials.
- SIP trunk credentials live in the embedder's config — never logged. Inbound INVITE trust
flows through the embedder's
sip/inbound-resolve(service-authed; identity from the DID route, never the INVITE body). The outbound originate endpoint authenticates (run token / service auth) and validatesto_number(E.164). RTP from an unexpected source addr is dropped (symmetric RTP). No new secret crosses the wire to the model/caller. - The embedder seals carrier config at rest. Both review gates (code + security) apply to all control-plane changes.
Testing strategy
- Unit (CI, no infra): G.711 round-trip; resample ratios; Plivo serializer parse/encode;
SIP: SDP offer/answer build+parse (PCMU/PCMA pick, ptime), RTP packetize/depacketize
(seq/ts/PT), the
SipTransport→MediaTransportmapping (fed RTP → Audio; synthesized BYE → Stop); carrier signature accept/reject/tamper/replay + REST signing known-answer vector; the brain adapter (transition→tool→re-prompt→end); Gemini Live JSON message encode/decode against captured fixtures. - Integration (CI): a mock
MediaTransport+ mockRealtimeLlmdriving the call pipeline to a clean finalize (no network). - Live (gated — needs infra + user OK): a carrier dev number (NEVER a production number) for
the WS-media path; a SIP trunk registered by the native
SipAgentfor the SIP path. Inform the user before any live call (account guardrail).
Flowcat — the composable FrameProcessor pipeline (FROZEN API)
Status: FROZEN. This is the keystone API for the whole pipecat-parity program (see
ROADMAP.md). Every later component (cascaded STT/TTS/LLM, VAD/turn, WebRTC/serializer transports, observability, transfer/DTMF) is implemented against the traits and types frozen here. Companion runtime doc:DESIGN.md(today'sCall::run, the four seams, the audio path). Mirror source: pipecatpipecat/src/pipecat/{frames/frames.py, processors/frame_processor.py, pipeline/{pipeline,parallel_pipeline,task,runner}.py, observers/base_observer.py, metrics/metrics.py}.Scope: the framework only —
Frame,FrameProcessor,Pipeline,ParallelPipeline,PipelineTask,PipelineRunner,Observer, metrics frames, the service-processor trait signatures, and the seam→processor mapping. Provider impls, audio models, transports, and serializers are later work and only their trait signatures are frozen here (so the fan-out can start against a stable surface).Non-negotiable: the live Gemini-Live S2S prod path keeps working on the current
Call::rununtil the processor pipeline is proven equivalent (§7). It lands alongsideCall::run, never as a rewrite-in-place.
0. Design goals & the constraints they come from
- Literal pipecat parity in shape, so the ~80-provider fan-out is a mechanical port:
a
Frametaxonomy, aFrameProcessorwithprocess_frame(frame, direction)+push_frame, prev/next linking,Pipeline/ParallelPipeline,PipelineTask+PipelineRunner,Observer. (pipecatframe_processor.py:175,pipeline.py:91,task.py:142.) - Protect the p99 moat. Today's
Call::runis onetokio::select!loop (pipeline.rs:195) holding p99 ≤ 0.61 ms to 2,000 concurrent calls (bench/RESULTS.md). The channel-per-processor model adds per-frame hops; we must show the added cost stays in the sub-millisecond noise (§2.4). - OSS-clean + compile-fast.
flowcat-corestays embedder-agnostic (lib.rs) and must build without pulling every provider; providers/transports live in sibling crates behind one cargo feature each (§8). - Extensible frame set. OSS users (and later components) must be able to
add frame types without editing
flowcat-core. This drives the enum-core +Frame::Custom(Arc<dyn CustomFrame>)escape hatch decision (§1.1). - Zero ABI churn for the parallel fan-out. Trait method shapes frozen now; adding a provider must never require touching the framework.
1. Frame taxonomy
1.1 Enum core with a trait escape hatch — and why
pipecat models frames as a Python class tree with isinstance dispatch
(frames.py:54 Frame → SystemFrame/DataFrame/ControlFrame). The direct Rust
analogues are (a) a closed enum Frame or (b) trait Frame: Any + downcast.
closed enum | trait Frame: Any + downcast | |
|---|---|---|
| Dispatch | match (no vtable, no alloc, branch-predicted) | Any::downcast_ref (type-id compare) per handler |
| Exhaustiveness | compiler-checked; adding a variant flags every match | none; missed types silently fall through |
| Per-frame cost | a stack enum move; the hot audio variant is Arc<AudioFrame> | Box<dyn> / Arc<dyn> heap alloc per frame |
| OSS extensibility | closed — users cannot add a variant | open — any type implementing the trait |
| Category (System/Data/Control) | one method fn class() | per-impl |
The moat (constraint 2) wants the cheap, branch-predicted match and the alloc-free
hot path; literal parity (constraint 1) wants exhaustiveness so the port is mechanical
and a new audio/turn frame can't be silently dropped. But constraint 4 (OSS users add
frame types) rules out a purely closed enum.
Decision: a closed enum Frame for every pipecat frame, plus one Custom variant
carrying Arc<dyn CustomFrame> as the extension point. Core processors match on
named variants with full exhaustiveness; OSS extensions ride in Custom and are
downcast only by the processors that care. This is the standard "enum + escape hatch"
pattern: 99% of frames are first-class and alloc-free; extensibility costs one
Arc<dyn> only on the frames that use it.
#![allow(unused)] fn main() { // flowcat-core/src/processor/frame.rs (NOTE: distinct from today's data-shape // `frame.rs`, which is renamed to `types.rs` — see §8.4 migration step M0.) use std::any::Any; use std::sync::Arc; /// Direction of frame flow. Mirrors pipecat `FrameDirection` /// (frame_processor.py:56). `Downstream` = source→sink; `Upstream` = sink→source /// (errors, end-of-task requests, RTVI acks). #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Direction { Downstream, Upstream, } /// Frame scheduling class — mirrors pipecat's three base classes /// (frames.py:95/106/118). Drives queue priority and interruptibility: /// `System` jumps the queue and survives interruption; `Data` is dropped on /// interruption; `Control` is ordered like Data but also survives interruption /// when `uninterruptible()` is set (e.g. `End`, `Stop`). #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum FrameClass { System, Data, Control, } /// Per-frame metadata, present on every frame (mirrors the `Frame` base fields: /// `id`, `name`, `pts`, `metadata` — frames.py:73-79). Cheap to clone (Arc the map). #[derive(Debug, Clone)] pub struct FrameMeta { /// Process-unique monotonic id (an `AtomicU64` bump; see `next_frame_id()`). pub id: u64, /// Human name for tracing/observers, e.g. "OutputAudio#42". Built lazily. pub name: &'static str, /// Presentation timestamp in **nanoseconds** on the pipeline clock, if set. pub pts: Option<i64>, /// Paired id when a frame was broadcast both directions (frames.py:76). pub broadcast_sibling_id: Option<u64>, /// Arbitrary sideband metadata (Arc so cloning a frame is cheap). pub extra: Option<Arc<serde_json::Map<String, serde_json::Value>>>, /// Transport source/destination track names (frames.py:78-79). pub transport_source: Option<Arc<str>>, pub transport_destination: Option<Arc<str>>, } /// OSS extension point: a frame type defined outside flowcat-core. Carried in /// `Frame::Custom`. Processors that understand it `downcast_ref`; everyone else /// forwards it unchanged in the direction it arrived. pub trait CustomFrame: Any + Send + Sync + std::fmt::Debug { fn frame_class(&self) -> FrameClass; /// True if this frame must survive interruption (pipecat `UninterruptibleFrame`). fn uninterruptible(&self) -> bool { false } fn as_any(&self) -> &dyn Any; } /// The frame that flows through every processor. One closed enum mirroring the /// pipecat frame tree (frames.py), plus `Custom` for OSS extensions. /// /// The hot audio variants box their payload in `Arc<AudioFrame>` so cloning a /// frame for the broadcast/observer paths never copies PCM. #[derive(Debug, Clone)] pub enum Frame { // ---- System frames (priority; survive interruption) — frames.py:846+ ---- /// Pipeline init: carries sample rates + metric/trace toggles (StartFrame, :847). Start(StartParams), /// Immediate stop; flush nothing (CancelFrame, :873). Cancel { reason: Option<String> }, /// Error notification pushed upstream (ErrorFrame, :890). `fatal` ⇒ task cancels. Error { message: String, fatal: bool, processor: Option<Arc<str>> }, /// Barge-in (InterruptionFrame, :959). Broadcast both directions by the /// turn/VAD start strategy or any processor. Interruption, /// Raw caller audio from a transport input (InputAudioRawFrame, :1250). InputAudio(Arc<AudioFrame>), /// User-associated audio (UserAudioRawFrame, :1296) — carries `user_id`. UserAudio { audio: Arc<AudioFrame>, user_id: Arc<str> }, /// Raw text input from a transport (InputTextRawFrame, :1282) — text-chat path. InputText(String), /// Inbound DTMF keypress (InputDTMFFrame, :1353). InputDtmf(KeypadEntry), /// VAD/turn lifecycle (frames.py:971-1104). One variant per pipecat frame. UserStartedSpeaking, UserStoppedSpeaking, UserSpeaking, BotStartedSpeaking, BotStoppedSpeaking, BotSpeaking, /// Definitive VAD edges with the deciding secs (VAD*SpeakingFrame, :1043/1058). VadUserStartedSpeaking { start_secs: f32 }, VadUserStoppedSpeaking { stop_secs: f32 }, /// Mute/unmute the STT service (STTMuteFrame, :1182). SttMute(bool), /// Performance metrics (MetricsFrame, :1108) — TTFB/processing/usage/turn. Metrics(Vec<MetricsData>), /// Transport-level message in/out urgent (Input/OutputTransportMessage*, :1193/1207). TransportMessage { payload: serde_json::Value, urgent: bool }, /// SFU/transport lifecycle (BotConnected/ClientConnected, :1621/1633). ClientConnected, BotConnected, /// Function-call signalling (FunctionCallsStarted/InProgress/Cancel, :1155/1804/1169). FunctionCallsStarted(Vec<FunctionCall>), FunctionCallInProgress { call: FunctionCall, cancel_on_interruption: bool }, FunctionCallCancel { function_name: String, tool_call_id: String }, // ---- Data frames (ordered; dropped on interruption) — frames.py:190+ ---- /// Output audio to a transport (OutputAudioRawFrame, :191). OutputAudio(Arc<AudioFrame>), /// TTS-generated audio, tagged with its context id (TTSAudioRawFrame, :231). TtsAudio { audio: Arc<AudioFrame>, context_id: Option<Arc<str>> }, /// Generic text (TextFrame, :293) — flows LLM→aggregator→TTS. Text(String), /// LLM-generated text chunk (LLMTextFrame, :333). LlmText(String), /// Final transcription (TranscriptionFrame, :419). Transcription { text: String, user_id: Arc<str>, language: Option<Language>, final_: bool }, /// Interim/partial transcription (InterimTranscriptionFrame, :445). InterimTranscription { text: String, user_id: Arc<str>, language: Option<Language> }, /// Text the TTS should speak (TTSSpeakFrame, :744). TtsSpeak { text: String, append_to_context: Option<bool> }, /// Word/segment text emitted by TTS with its context (TTSTextFrame, :400). TtsText { text: String, context_id: Option<Arc<str>> }, /// Function-call result, fed back to the LLM (FunctionCallResultFrame, :719). /// Uninterruptible — once produced, context must always be updated. FunctionCallResult(FunctionCallResult), /// Trigger an LLM run over the current context (LLMRunFrame, :585). LlmRun, /// The universal LLM context to run (LLMContextFrame, :502). LlmContext(Arc<LlmContext>), /// Outbound DTMF (OutputDTMFFrame, :790). OutputDtmf(Vec<KeypadEntry>), // ---- Control frames (ordered; `End`/`Stop` survive interruption) — :1580+ ---- /// Graceful shutdown after flush (EndFrame, :1581). Uninterruptible. End { reason: Option<String> }, /// Stop but keep processors connected (StopFrame, :1605). Uninterruptible. Stop, /// LLM response framing (LLMFullResponseStart/End, :1699/1714). LlmResponseStart, LlmResponseEnd, /// TTS response framing (TTSStarted/Stopped, :1850/1867). TtsStarted { context_id: Option<Arc<str>> }, TtsStopped { context_id: Option<Arc<str>> }, /// Update a service's settings live (ServiceUpdateSettingsFrame, :1878). /// Uninterruptible. `target` = STT/TTS/LLM/Filter/Mixer/All. UpdateSettings { target: ServiceKind, settings: serde_json::Value }, /// Speech-control params broadcast (SpeechControlParamsFrame, :1419) + VAD /// param updates (VADParamsUpdateFrame, :1939). SpeechControlParams { vad: Option<VadParams>, turn: Option<TurnParams> }, /// Liveness probe (HeartbeatFrame, :1654). Heartbeat { timestamp_ns: i64 }, /// Output transport ready (OutputTransportReadyFrame, :1644). OutputTransportReady, // ---- OSS extension point ---- Custom(Arc<dyn CustomFrame>), } }
Frame carries its FrameMeta out of band to keep the enum small and match-cheap:
the channel item is Envelope { meta: FrameMeta, frame: Frame, direction: Direction }
(§2.1). (pipecat stuffs id/name/pts onto the frame object; we separate them so the
hot variant — OutputAudio(Arc<AudioFrame>) — stays a thin pointer move and the meta
travels alongside.)
#![allow(unused)] fn main() { impl Frame { /// Scheduling class — drives queue priority + interruptibility (§2.3). pub fn class(&self) -> FrameClass { /* match: System for Start/Cancel/Error/ Interruption/Input*/Vad*/Metrics/FunctionCalls*/…; Control for End/Stop/ Llm*Response/Tts*/UpdateSettings/Heartbeat/…; Data for the rest; Custom delegates to CustomFrame::frame_class() */ } /// True ⇒ kept in the queue and not cancelled on interruption (pipecat /// `UninterruptibleFrame`: End, Stop, FunctionCallResult, UpdateSettings — :1581/1605/719/1878). pub fn uninterruptible(&self) -> bool { /* match those; Custom delegates */ } } }
#![allow(unused)] fn main() { /// Mono 16-bit LE PCM with an explicit sample rate. **Unchanged** from today's /// `frame.rs:14` AudioChunk — renamed `AudioFrame` and Arc-wrapped in the enum so /// the hot path never copies PCM. (Keep an `AudioChunk` type alias for one release.) #[derive(Debug, Clone, PartialEq, Eq)] pub struct AudioFrame { pub pcm: Vec<i16>, pub sample_rate: u32, pub num_channels: u16 } }
StartParams, FunctionCall, FunctionCallResult, LlmContext, KeypadEntry,
Language, VadParams, TurnParams, ServiceKind, MetricsData are plain structs/
enums in flowcat-core/src/processor/{frame,metrics}.rs mirroring the pipecat fields
cited above. StartParams mirrors StartFrame (frames.py:847): audio_in_sample_rate,
audio_out_sample_rate, enable_metrics, enable_usage_metrics, enable_tracing,
report_only_initial_ttfb.
1.2 What we deliberately omit from the v1 enum
To keep the enum reviewable, the long tail of pipecat's 100+ frames that no current
component needs (vision/image frames :171/209/1267, sprite :274, summarization
:1735/1751/1782, prompt-caching :671, pause/resume :928/1668, idle-timeout-update
:1925, filter/mixer enable :1971/2000, service-switcher :2011) map to Custom
until a component needs them first-class, at which point that component promotes
them to a named variant (a non-breaking, additive change — adding an enum variant only
forces non-exhaustive match sites to add an arm, which the compiler points at). The
checklist (§9) names exactly which variants each early component promotes.
2. FrameProcessor trait + the channel runtime
2.1 The trait
#![allow(unused)] fn main() { // flowcat-core/src/processor/mod.rs use async_trait::async_trait; /// The frame envelope that travels a processor's input channel. pub struct Envelope { pub meta: FrameMeta, pub frame: Frame, pub direction: Direction } /// One-time per-task wiring handed to every processor at startup (mirrors pipecat /// `FrameProcessorSetup`, frame_processor.py:71): the pipeline clock, the (optional) /// observer fan-out, and the task's shared cancellation token. #[derive(Clone)] pub struct ProcessorSetup { pub clock: Clock, // monotonic ns; `clock.now_ns()` pub observer: Option<Observer>, // Arc fan-out (§5) pub cancel: tokio_util::sync::CancellationToken, pub enable_metrics: bool, pub enable_usage_metrics: bool, } /// A processor's view of "downstream" / "upstream" — a `Sender` to each neighbour, /// wired by `Pipeline::link`. Cloned into the processor's run loop. #[derive(Clone)] pub struct Link { next: Option<EnvelopeSender>, // downstream neighbour input prev: Option<EnvelopeSender>, // upstream neighbour input name: Arc<str>, // this processor's name (for observer events) clock: Clock, observer: Option<Observer>, } impl Link { /// Push a frame to the adjacent processor in `direction`. Mirrors pipecat /// `push_frame` (frame_processor.py:688): fires the observer `on_push_frame` /// hook, then enqueues onto the neighbour's input channel. Backpressure: /// `await`s if the neighbour's bounded channel is full (§2.2). pub async fn push(&self, meta: FrameMeta, frame: Frame, direction: Direction); /// Convenience: push a fresh frame downstream with new meta. pub async fn push_down(&self, frame: Frame); /// Push an `Error` frame upstream (pipecat `push_error`, :630). pub async fn push_error(&self, message: impl Into<String>, fatal: bool); /// Broadcast a frame both directions with paired sibling ids (pipecat /// `broadcast_frame`, :731) — used for `Interruption`. pub async fn broadcast(&self, frame: Frame); } /// The building block. Each processor runs in **its own tokio task** fed by a /// bounded mpsc channel (§2.2). The framework owns the task loop; an impl only /// writes `process_frame` (and optional `start`/`stop` hooks). /// /// Mirrors pipecat `FrameProcessor` (frame_processor.py:175): `process_frame`, /// prev/next links, system-frame priority, interruption handling — but the per- /// processor task + queues are a *framework* concern here, not re-implemented per /// processor as in Python. #[async_trait] pub trait FrameProcessor: Send + 'static { /// Stable, human-readable name (observer events, error attribution, tracing). fn name(&self) -> &str; /// Called once when the `Start` frame reaches this processor, before any data /// frame. Open sockets / spawn provider readers here. Default: no-op. async fn start(&mut self, _setup: &ProcessorSetup, _params: &StartParams) -> Result<()> { Ok(()) } /// Handle one frame. Push results via `link`. **Must not block**: long work /// (a provider round-trip) is driven by an internally-spawned task that feeds /// results back as frames (the Gemini reader-task pattern, gemini_live.rs:265). /// The default impl forwards the frame unchanged in its direction — so a /// pure observer/no-op processor is `process_frame` = default. async fn process_frame(&mut self, env: Envelope, link: &Link) -> Result<()> { link.push(env.meta, env.frame, env.direction).await; Ok(()) } /// Called on `End`/`Stop`/`Cancel` after the terminal frame is forwarded. /// Flush + close. Default: no-op. async fn stop(&mut self, _reason: StopReason) -> Result<()> { Ok(()) } /// Whether this processor produces metrics (pipecat `can_generate_metrics`, /// :395). Services override to `true`. fn can_generate_metrics(&self) -> bool { false } } }
Result<()> is flowcat_core::Result (error.rs, FlowcatError); an Err returned
from process_frame is converted to an upstream Frame::Error{fatal:false} by the task
loop (pipecat __process_frame catch → push_error, :979).
2.2 The channel runtime — bounded mpsc, one task per processor
Each linked processor becomes a spawned task:
#![allow(unused)] fn main() { // Pseudocode of the framework-owned per-processor loop (processor/runtime.rs). // Two channels per processor so System frames jump ahead of Data/Control — // pipecat does this with a PriorityQueue (frame_processor.py:119); a second // channel is the cheaper, branch-free Rust equivalent. async fn run_processor(mut p: Box<dyn FrameProcessor>, mut rx: ProcessorRx, link: Link, setup: ProcessorSetup) { loop { let env = tokio::select! { biased; // system channel first Some(e) = rx.system.recv() => e, // Start/Cancel/Interruption/… Some(e) = rx.normal.recv() => e, // Data + Control else => break, }; // observer on_process_frame hook (§5) if let Some(o) = &setup.observer { o.on_process(&link.name, &env, setup.clock.now_ns()); } match &env.frame { Frame::Start(p0) => { p.start(&setup, p0).await.ok(); link.push(env.meta, env.frame, env.direction).await; } Frame::Interruption => { /* §2.5: drain `normal` of interruptible frames, keep uninterruptible; forward */ } Frame::Cancel{..} | Frame::End{..} | Frame::Stop => { let _ = p.stop(reason(&env.frame)).await; link.push(env.meta, env.frame, env.direction).await; if terminal { break } } _ => { if let Err(e) = p.process_frame(env, &link).await { link.push_error(e.to_string(), false).await; } } } } } struct ProcessorRx { system: mpsc::Receiver<Envelope>, normal: mpsc::Receiver<Envelope> } type EnvelopeSender = ProcessorTx; // holds both system+normal Senders; `Link::push` // routes by `frame.class()` (System→system chan) }
Bounded vs unbounded — the decision. Pipecat uses unbounded asyncio queues.
Flowcat uses bounded tokio::mpsc on the Data/Control (normal) channel and an
unbounded channel on the System path:
- Bounded normal channel (default capacity 64, the
bench-rsvalue). Real-time audio is rate-limited by the wall clock (~50 audio frames/s/leg); a bounded channel gives natural backpressure — if a slow processor (e.g. a TTS provider stalling) can't keep up, the producerawaits instead of growing an unbounded queue and ballooning latency/RAM. This is the right behaviour for media: never buffer seconds of audio. Capacity 64 = ~1.3 s of audio headroom; a producer blocking on a full channel is the signal to interrupt/drop, not to buffer. - Unbounded system channel.
Cancel/Interruption/Error/Startmust never block on a full queue (an interruption that can't be delivered defeats barge-in). The system path is low-volume (events, not a stream), so unbounded is safe and removes the one place backpressure would be a correctness bug. (This is the Rust equivalent of pipecat'sHIGH_PRIORITYjumping the queue, frame_processor.py:128.)
Backpressure on the bounded channel is per-hop and bounded by capacity, so end-to-end
queueing is O(processors × 64) frames worst case — predictable, unlike an unbounded
chain. A producer that hits a full channel during an active interruption is unblocked
immediately because the interruption drains the consumer's queue (§2.5).
Scope of the backpressure guarantee. The inbound audio leg —
InputAudio/UserAudio— is classifiedSystem(processor/frame.rs::class), so it rides the unbounded channel, matching pipecat (input audio is aSystemFrame). That is deliberate: caller/transport capture is wall-clock-rate-limited (~50 fps) and must never be blocked. So the "bounded media backpressure / never buffer seconds of audio" property above applies to the output/Data leg (OutputAudio/TtsAudio, where a stalling TTS/realtime stage is the thing to backpressure), not the input leg. The moat argument is unaffected (input is producer-rate-limited), but don't overclaim bounded backpressure on the input path. A follow-up may add an explicit input-side drop/age policy if a downstream stall is ever observed to grow the inbound queue.
2.3 System-frame priority & interruptibility (parity with pipecat)
- Priority:
Link::pushroutes a frame byframe.class():System→ the consumer's system channel;Data/Control→ the normal channel. Thebiasedselect drains system first (pipecatFrameProcessorQueue, frame_processor.py:119-167). - Interruptibility: on
Frame::Interruption, the task loop drains the normal channel, keeping any frame whoseuninterruptible()is true (End/Stop/ FunctionCallResult/UpdateSettings), and cancels the in-flightprocess_frameonly if the current frame is interruptible — exactly pipecat_start_interruption(frame_processor.py:828). The in-flight cancel is aselect!betweenprocess_frame(...)and an interruption signal on the system channel.
2.4 The latency argument — the channel model stays inside the moat
The moat is p99 ≤ 0.61 ms round-trip to 2,000 calls on one process
(bench/RESULTS.md, Azure 16-vCPU). Two facts bound the cost of moving from one
select! loop to a channel-per-processor graph:
bench-rsalready measured exactly this model.bench-rs/src/main.rsbuilds a 7-stage pipeline of tasks connected by bounded mpsc channels (CHAN_CAP=64) — the literal analogue of the FrameProcessor graph designed here — and the authoritative bench numbers in RESULTS.md are that channel pipeline, not the monolithic loop. It reports 0.20 µs/frame, 0.029 µs/processor-hop on one core, and the end-to-end real-I/O sweep (real WS + μ-law) holds p99 ≤ 3 ms to 2,000 calls. So the channel model's per-hop cost is already inside the published moat — adopting it is not a regression from the bench, it is the bench.- Per-frame hop budget. A live call is ~50–100 frames/s/leg. A cascaded pipeline is ≤ ~12 processors (transport-in, VAD, turn, STT, user-agg, LLM, assistant-agg, TTS, transport-out, + 2-3 filters/observers); S2S is ~5. At 0.029 µs/hop (RESULTS.md), 12 hops = 0.35 µs/frame of pure framework routing — three orders of magnitude below the 0.61 ms p99 and four below the ~10–20 ms audio frame period. The dominant per-frame cost is, as RESULTS.md §"DOES NOT PROVE" states, the shared μ-law/resample/ WS-syscall work — identical in both the loop and the graph model.
Verdict: bounded-channel-per-processor adds ≤ ~0.4 µs/frame of routing, ~10⁻³ of the
p99 budget; the moat is preserved. A later step extends bench-rs to run the real
Pipeline (not the standalone 7-stage mock) for both topologies and asserts p99 stays
≤ the current numbers — the gate that holds this claim honest (§9 step 11).
One nuance the design bakes in to keep this true: the hot audio frame is
Arc<AudioFrame>, so each hop moves a pointer (the bench moved Bytes, similarly
cheap); only Vec<i16> PCM produced by a codec/resample stage allocates, and that's
shared I/O cost, not framework cost.
2.5 Interruption end-to-end
Barge-in (Frame::Interruption) is produced by the turn/VAD start strategy or any
processor via link.broadcast(Frame::Interruption). It travels the system channel
both directions; each processor drains its normal queue of interruptible frames and
cancels in-flight interruptible work, then forwards. The transport-output processor
additionally clears the carrier's playback buffer (today's transport.send_clear(),
pipeline.rs:370 → a process_frame arm on TransportOutput). This is the literal port
of pipecat broadcast_interruption (frame_processor.py:704).
3. Pipeline + ParallelPipeline
3.1 Pipeline — a linear chain of linked tasks
#![allow(unused)] fn main() { // flowcat-core/src/pipeline/mod.rs pub struct Pipeline { processors: Vec<Box<dyn FrameProcessor>> } impl Pipeline { pub fn new(processors: Vec<Box<dyn FrameProcessor>>) -> Self { Self { processors } } } }
Pipeline is itself a FrameProcessor (so it nests — pipecat Pipeline(BasePipeline),
pipeline.py:91). It wraps the user processors with an internal Source and Sink
processor (pipecat PipelineSource/PipelineSink, pipeline.py:21/55) so the
PipelineTask can inject downstream frames at the head and observe upstream frames at the
head, and observe downstream frames at the tail. link() is the framework wiring step
run by PipelineTask::setup: it allocates the per-processor channels, builds each Link
(prev/next senders), and tokio::spawns one run_processor task per element. The chain
order is [Source, ...user, Sink] (pipeline.py:119).
3.2 ParallelPipeline — fan-out / fan-in with lifecycle sync
#![allow(unused)] fn main() { pub struct ParallelPipeline { branches: Vec<Pipeline> } impl ParallelPipeline { pub fn new(branches: Vec<Pipeline>) -> Self { /* ... */ } } }
Also a FrameProcessor. Mirrors pipecat ParallelPipeline (parallel_pipeline.py:24):
- A frame entering the parallel block is queued into every branch's source.
- Each branch has its own Source/Sink; the Sink's downstream output funnels to the
parallel block's single downstream, de-duplicating by
meta.id(pipecat_parallel_push_frame+_seen_ids, parallel_pipeline.py:168) so a frame fanned to N branches is emitted once. - Lifecycle frames (
Start/End/Cancel) are synchronized: the block holds a per-frame counter = branch count, buffers non-lifecycle output while synchronizing, and only forwards the lifecycle frame (and flushes the buffer) once all branches have passed it (parallel_pipeline.py:158/182). This prevents a fast branch'sEndfrom shutting the transport down while a slow branch still has audio to flush — a correctness invariant we port verbatim.
ParallelPipeline is needed for the cascaded path's service-switcher / parallel STT and for tee'd observers; it is not on the v1 Gemini S2S critical path, so it lands with unit tests but is first exercised by the cascaded path.
4. PipelineTask + PipelineRunner
4.1 PipelineTask — one running pipeline's lifecycle
Mirrors pipecat PipelineTask (task.py:142). Owns: the wrapped pipeline (Source +
user + Sink), the push queue, the clock, the observer fan-out, idle detection,
heartbeat/watchdog, and the start/end/finished signalling.
#![allow(unused)] fn main() { // flowcat-core/src/pipeline/task.rs pub struct PipelineTaskParams { pub audio_in_sample_rate: u32, // default 16000 pub audio_out_sample_rate: u32, // default 24000 (S2S) / per-TTS (cascaded) pub enable_metrics: bool, pub enable_usage_metrics: bool, pub enable_tracing: bool, pub enable_heartbeats: bool, pub heartbeat_period: Duration, // default 1s (task.py:59) pub heartbeat_monitor: Duration, // default 10s (task.py:60) pub idle_timeout: Option<Duration>, // default 300s (task.py:62) pub cancel_on_idle: bool, // default true pub idle_timeout_frames: Vec<FrameKind>,// default [BotSpeaking, UserSpeaking] } pub struct PipelineTask { /* pipeline, clock, observers, channels, flags */ } impl PipelineTask { pub fn new(pipeline: Pipeline, params: PipelineTaskParams, observers: Vec<Observer>) -> Self; /// Queue a downstream frame into the head of the pipeline. pub async fn queue_frame(&self, frame: Frame); pub async fn queue_frames(&self, frames: impl IntoIterator<Item = Frame>); /// Graceful: queue an `End` so the pipeline drains then shuts down (task.py:568). pub async fn stop_when_done(&self); /// Immediate: queue a `Cancel` (task.py:577). pub async fn cancel(&self, reason: Option<String>); pub fn has_finished(&self) -> bool; /// Run to completion: setup() spawns all processor tasks, inject `Start`, wait /// for it to reach the Sink (pipeline ready), pump queued frames, and exit when /// a terminal frame (`End`/`Stop`/`Cancel`) reaches the Sink. (task.py:586/818.) pub async fn run(self) -> Result<()>; /// Event hooks (pipecat task.py event handlers), each a registered async closure: pub fn on_started(&mut self, f: impl Fn() + ...); pub fn on_finished(&mut self, f: impl Fn(StopReason) + ...); pub fn on_error(&mut self, f: impl Fn(&str, bool) + ...); pub fn on_idle_timeout(&mut self, f: impl Fn() + ...); pub fn on_frame_reached_downstream(&mut self, types: &[FrameKind], f: ...); pub fn on_frame_reached_upstream(&mut self, types: &[FrameKind], f: ...); } }
Lifecycle (task.py:818 _process_push_queue + :898 _sink_push_frame):
setupbuilds channels,spawns every processor task, starts the clock + idle task.- Inject
Frame::Start(params)at the head; block until it reaches the Sink (every processor has runstart()), then signal ready. - Pump
queue_frame'd frames into the head. - The Source processor watches upstream frames: an upstream
End/Stop/Cancelrequest (a processor wanting to end the call, today'sBrainAction::End) is converted to the corresponding downstream lifecycle frame (task.py:859_source_push_frame). - The Sink watches downstream frames: when
End/Stop/Cancelreaches it, the task signals "ended" and exits the run loop;Heartbeatis timestamped for the monitor;Error{fatal}triggersCancel.
Idle detection (task.py:970): a watcher resets on each idle_timeout_frames frame
seen by the observer; on timeout it fires on_idle_timeout and, if cancel_on_idle,
cancels. Heartbeat/watchdog (task.py:941/950): a task pushes Heartbeat every
heartbeat_period; the monitor warns if none returns within heartbeat_monitor — and
(Rust addition) doubles as a per-task watchdog since a wedged processor would block
the heartbeat from traversing. Graceful shutdown: End flushes; Cancel does not
(bounded by cancel_timeout, after which the task force-aborts the processor tasks via
the shared CancellationToken).
4.2 PipelineRunner — supervise tasks + signals
Mirrors pipecat PipelineRunner (runner.py:25). Runs one or many PipelineTasks, installs
SIGINT/SIGTERM handlers that cancel() all tasks for graceful drain (a deploy-cutover
drain story relies on SIGTERM grace), and joins them.
#![allow(unused)] fn main() { pub struct PipelineRunner { /* tasks, signal guard */ } impl PipelineRunner { pub fn new(handle_sigint: bool, handle_sigterm: bool) -> Self; pub async fn run(&self, task: PipelineTask) -> Result<()>; pub async fn cancel_all(&self); } }
In the embedder, each inbound/outbound call constructs one PipelineTask and hands it to a
process-wide PipelineRunner — replacing today's "spawn Call::run per call".
5. Observer trait + metrics frames
5.1 Observer
Non-intrusive monitoring, mirroring pipecat BaseObserver (base_observer.py:70). An
Observer sees every processed/pushed frame without sitting in the chain — this is the
seam the observability layer (OpenTelemetry/Sentry/Langfuse/RTVI) plugs into.
#![allow(unused)] fn main() { // flowcat-core/src/observer.rs pub struct FrameEvent<'a> { pub processor: &'a str, pub frame: &'a Frame, pub meta: &'a FrameMeta, pub direction: Direction, pub timestamp_ns: i64, // pipeline clock } pub struct FramePushEvent<'a> { pub source: &'a str, pub destination: &'a str, pub frame: &'a Frame, pub meta: &'a FrameMeta, pub direction: Direction, pub timestamp_ns: i64, } #[async_trait] pub trait FrameObserver: Send + Sync { /// A processor is about to handle a frame (base_observer.py:79). async fn on_process(&self, _e: &FrameEvent<'_>) {} /// A frame was pushed source→destination (base_observer.py:91). async fn on_push(&self, _e: &FramePushEvent<'_>) {} /// The pipeline finished starting (base_observer.py:103). async fn on_pipeline_started(&self) {} } /// Cheap clonable fan-out over many observers (pipecat `TaskObserver` proxy, /// task.py:401). Hooks are invoked **synchronously on the hot path only when /// enabled** — when no observer is registered the loop skips the call entirely /// (the `if let Some(o)` in run_processor), so observation is zero-cost-when-off. #[derive(Clone, Default)] pub struct Observer(Arc<[Arc<dyn FrameObserver>]>); }
Built-in observers shipped here (ported from pipecat): TurnTrackingObserver
(user/bot speaking edges → turn boundaries), UserBotLatencyObserver (TTFB of the bot's
first audio after the user stops), and the IdleFrameObserver that drives idle detection
(task.py:70). The observability layer adds the exporters.
5.2 Metrics frames
Frame::Metrics(Vec<MetricsData>) carries the same data as pipecat MetricsFrame
(frames.py:1108) + metrics/metrics.py:
#![allow(unused)] fn main() { // flowcat-core/src/processor/metrics.rs (mirrors metrics.py) pub enum MetricsData { Ttfb { processor: String, model: Option<String>, seconds: f64 }, // metrics.py:29 Processing { processor: String, model: Option<String>, seconds: f64 }, // metrics.py:39 LlmUsage { processor: String, model: Option<String>, tokens: LlmTokenUsage }, // :68 TtsUsage { processor: String, characters: u64 }, // :78 TurnPrediction { processor: String, is_complete: bool, probability: f32, e2e_processing_ms: f64 }, // :101 } pub struct LlmTokenUsage { // metrics.py:49 pub prompt_tokens: u64, pub completion_tokens: u64, pub total_tokens: u64, pub cache_read_input_tokens: Option<u64>, pub cache_creation_input_tokens: Option<u64>, pub reasoning_tokens: Option<u64>, } }
A FrameProcessor produces metrics via helper methods on Link that gate on
setup.enable_metrics and emit a Frame::Metrics downstream: start_ttfb/stop_ttfb,
start_processing/stop_processing, report_llm_usage, report_tts_usage — the literal
port of frame_processor.py:411-489. report_only_initial_ttfb (StartParams) is honored.
6. Seam → processor mapping
Today's five trait seams (MediaTransport, RealtimeLlm, AgentBrain, SessionSource,
MediaSerializer) and Call::run's inline logic become processors. The mapping:
| Today (seam / inline logic) | becomes | crate | notes |
|---|---|---|---|
MediaTransport::recv (media.rs:49) | TransportInput processor — a source: reads the transport, emits Frame::InputAudio/UserStartedSpeaking/lifecycle downstream | flowcat-transports (trait stays in core) | pipecat BaseInputTransport |
MediaTransport::send_audio/send_clear (media.rs:53/57) | TransportOutput processor — a sink: consumes OutputAudio/TtsAudio, plays to carrier; on Interruption clears playback (was pipeline.rs:370) | flowcat-transports | pipecat BaseOutputTransport; emits BotStarted/StoppedSpeaking |
RealtimeLlm (realtime/mod.rs:22) | RealtimeLlmService processor — consumes InputAudio, emits TtsAudio(bot)/Transcription/FunctionCallsStarted/Interruption/Metrics; the reader-task→mpsc bridge (gemini_live.rs:265) becomes the processor's internal task feeding link | flowcat-services (realtime-gemini feature) | the trait below; Gemini is one impl |
AgentBrain (brain.rs:22) | BrainProcessor — consumes FunctionCallsStarted/tool-call frames, emits UpdateSettings(new prompt+tools) on transition / End on terminal; holds the graph state | the embedder (its glue) | pipecat has no peer; this is the embedder's engine adapter |
SessionSource (session.rs:21) | stays embedder glue, NOT a processor — a service the BrainProcessor + a FinalizeProcessor call; bootstrap/finalize/artifact-upload is control-plane I/O, not a media frame stage | the embedder | see §6.2 — it leaves flowcat-core for OSS cleanliness |
MediaSerializer (serializer/mod.rs) | stays a pure FrameSerializer — no change of shape; TransportInput/Output for a WS carrier compose a FrameSerializer exactly as WsCarrierTransport does today | flowcat-telephony | pipecat FrameSerializer |
Call::run orchestration (pipeline.rs:130) | the Pipeline graph + PipelineTask — the select! loop's arms become each processor's process_frame; LiveState/finalize become FinalizeProcessor+SessionSource | core + the embedder | the whole point of this framework |
LiveState recorder/transcript (pipeline.rs:451) | RecorderProcessor + TranscriptProcessor — observers/sinks that tap audio + text frames | flowcat-core (recorder), the embedder (finalize) | pipecat recorder-as-processor |
6.1 New service-processor traits the cascaded path needs (signatures only)
These are frozen here so the provider implementations (the fan-out) build against them. Each is a
FrameProcessor plus a service-specific async contract; the framework's
process_frame arm calls the contract and emits the result frames. Impls are later
work — this framework ships only the trait + a no-op/mock impl for the integration test.
#![allow(unused)] fn main() { // flowcat-core/src/service/mod.rs /// Streaming speech→text. Consumes `InputAudio`, emits `InterimTranscription` then /// final `Transcription`. Mirrors pipecat `STTService`. (22 providers.) #[async_trait] pub trait SttService: Send { fn name(&self) -> &str; async fn start(&mut self, params: &StartParams) -> Result<()>; /// Feed one audio chunk; transcripts arrive asynchronously via the returned /// stream of `Frame`s (the processor forwards them downstream). async fn run_stt(&mut self, audio: Arc<AudioFrame>) -> Result<Vec<Frame>>; async fn set_muted(&mut self, muted: bool); } /// Streaming text→speech. Consumes `TtsSpeak`/`Text`, emits `TtsStarted`, /// `TtsAudio`*, `TtsStopped`. Mirrors pipecat `TTSService`. (31 providers.) #[async_trait] pub trait TtsService: Send { fn name(&self) -> &str; fn sample_rate(&self) -> u32; async fn start(&mut self, params: &StartParams) -> Result<()>; async fn run_tts(&mut self, text: &str) -> Result<Vec<Frame>>; } /// Context-driven LLM. Consumes `LlmContext`/`LlmRun`, emits `LlmResponseStart`, /// `LlmText`*, optional `FunctionCallsStarted`, `LlmResponseEnd`. (26 providers.) #[async_trait] pub trait LlmService: Send { fn name(&self) -> &str; async fn start(&mut self, params: &StartParams) -> Result<()>; async fn run_llm(&mut self, ctx: &LlmContext) -> Result<BoxStream<'_, Frame>>; fn set_tools(&mut self, tools: Vec<ToolDecl>); } /// The realtime S2S contract (today's RealtimeLlm, realtime/mod.rs:22 — UNCHANGED /// shape, restated here as the canonical service trait the processor wraps). #[async_trait] pub trait RealtimeLlmService: Send { async fn connect(&mut self, setup: RealtimeSetup) -> Result<()>; async fn send_audio(&mut self, chunk: Arc<AudioFrame>) -> Result<()>; async fn update_system(&mut self, prompt: String, tools: Vec<ToolDecl>) -> Result<()>; async fn send_tool_result(&mut self, id: String, result: serde_json::Value) -> Result<()>; async fn next_event(&mut self) -> Option<RealtimeEvent>; } // ---- audio-intelligence traits — signatures only ---- /// Voice-activity detector. Mirrors pipecat `VADAnalyzer`. Silero (ONNX/`ort`) /// is the reference impl. pub trait VadAnalyzer: Send { fn sample_rate(&self) -> u32; /// Classify a frame of audio: Quiet / Starting / Speaking / Stopping. fn analyze(&mut self, audio: &AudioFrame) -> VadState; fn set_params(&mut self, params: VadParams); } /// End-of-turn / semantic-completion analyzer. Mirrors pipecat `BaseTurnAnalyzer` /// (Smart-Turn v2/v3 is the reference impl). pub trait TurnAnalyzer: Send { /// Given accumulated speech + VAD edges, predict whether the turn is complete. fn analyze_turn(&mut self, audio: &AudioFrame, vad: VadState) -> TurnPrediction; fn set_params(&mut self, params: TurnParams); } }
RealtimeSetup, RealtimeEvent, ToolDecl, BrainAction are unchanged from today's
frame.rs (renamed types.rs, §8.4) — the existing Gemini client and an embedder AgentBrain impl
satisfy them with thin wrapping into the processor shape.
6.2 Where SessionSource lives — and why it leaves flowcat-core
SessionSource (session.rs) is embedder-specific control-plane I/O (resolve a run+token,
upload artifacts, finalize over HTTP, node-tools/tool-call relay). It is not
a media-frame stage and it is the one seam that is inherently about the embedder's contract.
For OSS cleanliness it lives in the embedder as a plain service the
BrainProcessor and a FinalizeProcessor call — flowcat-core keeps only the
media-pipeline framework. The node_tools/tool_call relay (session.rs:53/66) becomes a
ToolRelay dependency injected into BrainProcessor. This is a clean cut: flowcat-core
has zero embedder knowledge (honoring lib.rs and the DESIGN.md OSS boundary), and the
embedder's glue owns bootstrap/finalize.
7. Migration / no-regression strategy
The prod Gemini-Live S2S path must not regress (live-verified runs per the
voice-live memory notes). Strategy: build the processor pipeline beside Call::run,
prove equivalence, then cut over.
7.1 The S2S processor pipeline (what the migration assembles)
TransportInput → RealtimeLlmService(Gemini) → BrainProcessor → TransportOutput
▲ │
(tool calls) RecorderProcessor (taps both legs)
ToolRelay (embedder) TranscriptProcessor
FinalizeProcessor (on End → SessionSource)
Every arm of today's select! (pipeline.rs:195-380) maps to a process_frame:
- carrier audio in (pipeline.rs:221) →
TransportInputemitsInputAudio→ Gemini servicesend_audio. RealtimeEvent::AudioOut(pipeline.rs:249) → Gemini service emitsTtsAudio→TransportOutputplays.RealtimeEvent::ToolCallMCP branch (pipeline.rs:276) →BrainProcessorrecognizes the node's workflow tools (viaToolRelay) and emits aFunctionCallResultstraight back to the Gemini service (no transition).ToolCalltransition/end (pipeline.rs:307) →BrainProcessoremitsUpdateSettings(new prompt+tools → Geminiupdate_system) orEnd.RealtimeEvent::Interrupted(pipeline.rs:367) →Frame::Interruptionbroadcast →TransportOutputclears.Usage(pipeline.rs:375) →Frame::Metrics→RecorderProcessor/finalize accumulate.- loop break → terminal
End→FinalizeProcessorruns theLiveState/finalizeartifact-upload +SessionSource.completelogic (pipeline.rs:538).
7.2 The equivalence test (the gate that authorizes cutover)
This migration is not done until this passes. Reuse the exact mocks already in
pipeline.rs:634 (MockSocket scripted Plivo frames, MockRealtime scripted event script,
MockBrain, MockSession capturing the finalize payload). Build two harnesses driven by
the same scripted inputs:
Call::run(today) → capture: outbound WS frames,send_audiocount, tool-result statuses+ids, theFinalizepayload (recording/transcript keys,collected_varsincl. folded disposition,usagetotals), and the relayedtool_calls.- The processor
PipelineTask(new) → capture the same set off the same mocks.
Assert byte-for-byte equality of the captured outputs (the same 6 assertion blocks
that call_run_bridges_audio_both_ways_and_finalizes and
mcp_tool_call_is_relayed_not_treated_as_transition already check, run against both
harnesses). Specifically equal: the count + order of playAudio frames sent to the
carrier; the (id, status) tool-result sequence; fin.recording_url/transcript_url
(stored keys, not presigned URLs); fin.collected_vars incl. disposition;
fin.usage.total_tokens; the relayed (node_id, tool_name, args) and verbatim MCP result.
Plus: a timing assertion that the processor pipeline's per-frame routing p99 ≤ the
bench-rs channel-pipeline number (§2.4) so cutover can't regress the moat.
Only when this differential test is green (CI, no network) does a later step rewire the embedder
to construct the PipelineTask instead of Call::run, and Call::run is deleted
(cleanup mandate — no parallel implementations). A live one-call smoke on a carrier dev
number (never prod) confirms the live path post-cutover.
8. Crate split & feature matrix
8.1 Crates (the target crate layout)
| Crate | Contents | Deps it may pull | License |
|---|---|---|---|
flowcat-core | framework (frame, processor, pipeline, task, runner, observer, metrics) + audio (codec/resample/recorder) + native SIP UA + all trait seams (Transport/Stt/Tts/Llm/RealtimeLlm/Vad/Turn/FrameSerializer/Brain) | tokio, tokio-util, async-trait, serde(_json), bytes, thiserror, tracing, rubato, audio-codec-algorithms, hound, rsipstack, rand | Apache-2.0 |
flowcat-services | every STT/TTS/LLM/realtime provider, one cargo feature each | per-feature: reqwest/tonic/tokio-tungstenite/ort/whisper-rs | Apache-2.0 |
flowcat-transports | str0m WebRTC + Opus, WebSocket, Daily, LiveKit, local, avatars | str0m, opus/audiopus, per-feature SDKs | Apache-2.0 |
flowcat-telephony | carrier FrameSerializers (Twilio/Telnyx/Plivo/…) + DTMF (RFC2833 + Goertzel) | base64, serde_json | Apache-2.0 |
flowcat-cli | demos/examples (parity with pipecat examples/) | the above | Apache-2.0 |
| the embedder | the host's glue: BrainProcessor (its engine adapter), SessionSource, ToolRelay, FinalizeProcessor, routing | flowcat-* + the host's engine | the host's license |
flowcat-core must not depend on reqwest/tonic/ort/str0m — those live in the
sibling crates so core stays compile-fast and dependency-light (constraint 3). The Gemini
Live client moves from flowcat-core/src/realtime/ into flowcat-services behind the
realtime-gemini feature (the trait RealtimeLlmService stays in core).
8.2 Feature-flag matrix (pipecat "extras" parity)
flowcat-services:stt-deepgram,stt-whisper-local,tts-cartesia,tts-elevenlabs,llm-openai,llm-anthropic,realtime-gemini,realtime-openai, … — one feature per provider, each pulling only its client dep. Umbrella featuresstt-all/tts-all/llm-allfor the CLI/tests.flowcat-core:sip(native SIP UA; embedders that need telephony enable it),recorder,vad-ort(the ONNX runtime is heavy → opt-in even though the trait is always present).flowcat-transports:webrtc-str0m,ws,daily,livekit,local.flowcat-telephony:twilio,telnyx,plivo,dtmf-inband.
8.3 Dependency choices (locked here so the fan-out doesn't relitigate)
| Need | Crate | Why |
|---|---|---|
| ONNX (Silero VAD, Smart-Turn) | ort (ONNX Runtime) | the mature Rust ONNX binding; pipecat ships ONNX models; behind vad-ort |
| WebRTC | str0m (sans-I/O) | the chosen WebRTC stack; sans-I/O fits the tokio task model |
| Opus | audiopus (libopus binding) | WebRTC codec; opus pure-Rust is immature |
| gRPC (Google STT/TTS) | tonic | the tokio-native gRPC stack |
| HTTP/WS providers | reqwest + tokio-tungstenite | already in the tree (tungstenite is the Gemini socket) |
| local Whisper | whisper-rs | the mature whisper.cpp binding (toolchain note in §5) |
8.4 Module-rename housekeeping (step M0, done before any new code)
Today's flowcat-core/src/frame.rs holds data shapes (AudioChunk, RealtimeEvent,
ToolDecl, …), not pipeline frames. To avoid a name collision with the new
processor/frame.rs (the Frame enum), rename the existing frame.rs → types.rs
(pure mechanical, update lib.rs re-exports, keep pub use aliases for one release).
AudioChunk gets a type alias to the new AudioFrame. This is the first checklist step.
9. Implementation checklist (execute in order — engineer-ready)
Each step says what to build, where, and the tests that gate it. Steps 1–8 are
flowcat-core (single PR each or a small stack); step 9 is the migration; 10–12 are
cross-cutting. The framework itself does not touch the security-sensitive surfaces
(those come with the later work on transfer/DTMF/WebRTC-signaling/serializer-sigs).
- M0 — rename
frame.rs→types.rs(§8.4). No behavior change. Test: workspace builds + existing suite green. - **
processor/frame.rs— theFrameenum +FrameMeta+Direction+FrameClassCustomFrame+AudioFrame** (§1). Tests:class()/uninterruptible()table tests for every variant; aCustomframe round-trips through a no-op processor unchanged; a broadcast pairs sibling ids.
processor/metrics.rs—MetricsData+LlmTokenUsage(§5.2). Tests: serde round-trip; mirrors metrics.py field-for-field.processor/mod.rs+processor/runtime.rs—FrameProcessortrait,Link,Envelope,ProcessorSetup, the bounded/unbounded dual-channelrun_processorloop, system-frame priority, interruption drain (§2). Tests: a 3-processor hand-wired chain forwards frames in order; aSystemframe overtakes a backlog ofDataframes; anInterruptiondrops interruptible queued frames but keeps anEnd; aprocess_frameErrbecomes an upstreamError. Promotes to named variants only what these tests need.observer.rs—FrameObserver,Observerfan-out,FrameEvent/FramePushEvent,TurnTrackingObserver,UserBotLatencyObserver,IdleFrameObserver(§5.1). Tests: an observer sees every push; zero-cost-when-none (no observer ⇒ hook not called); turn-tracking emits boundaries off scripted speaking edges.pipeline/mod.rs—Pipeline(Source/Sink wrap,link()spawns tasks), nesting (§3.1). Tests: aPipelineis aFrameProcessorand nests; downstream injected at head reaches the Sink; upstream observed at head.pipeline/parallel.rs—ParallelPipeline(fan-out, id-dedup, lifecycle sync) (§3.2). Tests: a frame fans to N branches and emits once; a fast branch'sEndis held until the slow branch passes it (the sync invariant).pipeline/task.rs—PipelineTask(Start→ready handshake, push pump, Source/Sink upstream/downstream handling, idle, heartbeat/watchdog, graceful vs cancel, event hooks) (§4.1). Tests:Startreaches Sink before any data frame;stop_when_donedrains then ends;cancelskips flush; idle timeout fires + cancels; an upstreamEndrequest from a processor converts to a downstreamEnd.pipeline/runner.rs—PipelineRunner(SIGINT/SIGTERM → cancel_all, join) (§4.2). Tests: a simulated signal cancels a running task; multiple tasks join cleanly.- The migration — the S2S processor pipeline + the equivalence test (§7). Build
TransportInput/TransportOutput(core trait + the WS-carrier composition reusingFrameSerializer), wrap the existing Gemini client asRealtimeLlmServicebehind a processor,BrainProcessor+ToolRelay+FinalizeProcessorin the embedder. The gate: the differential test (§7.2) asserting the processorPipelineTaskproduces byte-identical outputs toCall::runoff the shared scripted mocks, plus the p99 timing assertion. Cutover (deleteCall::run) only after this is green — a separate PR. - Service-processor trait stubs (§6.1): land
SttService/TtsService/LlmService/VadAnalyzer/TurnAnalyzertraits + a no-op mock impl of each inflowcat-core/src/ service/andflowcat-core/src/audio/. Test: a mock cascaded pipeline (mock-STT→mock-LLM→mock-TTS) runs a turn end-to-end through a realPipeline— the fixture the provider implementations build against. - Extend
bench-rsto drive the realPipeline(not the standalone 7-stage mock) for both S2S and cascaded topologies and assert p99 ≤ the published moat. Wire that bench assertion into CI (the.github/workflows/jobs that already runcargo test --workspace --locked). - Crate split (§8.1): add
flowcat-services/-transports/-telephonyskeletons + the feature matrix; move the Gemini client intoflowcat-servicesbehindrealtime-gemini; Apache headers/NOTICE. (Can run in parallel with 9–11.)
Build order rationale: the enum (1) and the processor runtime (3) are the spine;
everything else composes them. The equivalence test (9) is the single gate that makes
the whole multi-quarter program safe — nothing in the later provider waves starts until 1–9
are frozen and green (see ROADMAP.md).
10. Open questions (need an architect/user call before/at cutover)
- Custom-frame downcast ergonomics for OSS users. The
Frame::Custom(Arc<dyn CustomFrame>)escape hatch is the agreed extensibility model, but a provider that needs a first-class, hot-path frame (rare) must promote a variant in core — i.e. a PR to flowcat-core. Acceptable? (Recommendation: yes — the long tail ridesCustom; only genuinely-hot frames get variants, which is also true of pipecat's own evolution.) BrainProcessorlocation for the OSS demo. flowcat-core ships a demo brain today (lib.rs); the embedder's engine adapter is the host's own code. Confirm the demoBrainProcessorstays inflowcat-core(so the OSS pipeline is runnable end-to-end) while the engine adapter lives in the embedder. (Recommendation: yes — matches the DESIGN.md OSS boundary.)- Heartbeat-as-watchdog default. Pipecat ships heartbeats off by default. An embedder
with a deploy-cutover drain story likely wants them on with the 1s/10s defaults — it can
flip
enable_heartbeatstrue in its ownPipelineTaskParamswithout changing the core default. - str0m vs a thin WebRTC sidecar for Daily. Not a decision for this framework, but the
TransportInput/Outputtrait shape frozen here must not assume the transport owns its own event loop — confirmed sans-I/O-friendly above. - Lifecycle frames bypass
process_frame(RESOLVED). The framework loop (run_processor, §2.2) routesStartand downstreamEnd/Stop/Cancelto thestart/stophooks and never toprocess_frame; an upstream terminal is a "request to end" that DOES reachprocess_frameso theSourcecan convert it (§4.1). This is the single most surprising property for processor authors and was the root of two bring-up hangs (the internalSinkandSourceeach needed an edge wired for it). Ruled correct and kept (the framework — not the author — owns the lifecycle; routing lifecycle throughprocess_framewould force per-processor lifecycle boilerplate, the exact thing the design eliminates). The contract is now stated on theFrameProcessortrait doc itself (processor/mod.rs) so every later / OSS author hits it. Every later component depends on knowing it. - Source-emit affordance for transports (RESOLVED). A source processor (a transport reading
recv()) cannot self-emit from the frozenFrameProcessor::starthook (it gets&ProcessorSetup+&StartParamsbut noLink). The migration worked around this with a bespoke external pump feedingPipelineTask::queue_sender(). The open question was whether the frozen trait should grow a first-class source affordance before more transports proliferate. Decision: (a) codify the external-pump pattern — NO trait change. flowcat-core ships a small [SourcePump]/[SourceHandle] helper (pipeline/source_pump.rs) that wraps a transport's reader task andemits frames at the pipeline head via the task'squeue_sender(); the helper owns the spawn + abort lifecycle so every transport author gets a one-liner instead of re-deriving the pump. Why (a) over arun_sourcetrait hook: (i) the head-injection path is the only way to preserve the Start→ready ordering guarantee —PipelineTask::runblocks on the Start→Sink handshake before it drains the head queue, so a pumpedInputAudioprovably cannot reach anyprocess_framebefore that processor'sstart()ran (the invariant that matters for.expect()-in-process_frameprocessors). A runtime-spawnedrun_source(link, …)would emit beforeStarttraverses downstream, re-introducing exactly the ordering hazard the bring-up fixed — making it safe would require threading a new start-barrier through the frozen runtime, the most delicate part of the runtime. (ii) This is literally pipecat's own model —BaseInputTransportspawns a reader task and pushes; it has no synchronous source-emit method either. (iii) Backpressure stays correct: the head queue is unbounded (input must never block — §2.2) and the first bounded normal channel one hop in applies natural backpressure. (iv) Zero churn on a just-frozen + reviewed trait; the helper is additive and re-uses the proven handshake. Code landed alongside this ruling:SourcePump/SourceHandle+ 2 unit tests (one asserts the Start handshake holds for pump-injected frames, one asserts abort-on-drop);s2s.rs's bespokespawn_transport_pumprefactored onto it (the §7.2 differential test stays green). 157 flowcat tests green (--locked), clippy-clean. Note: this adds to the frozen public surface (newpipeline::{SourcePump, SourceHandle}re-exports).
STATUS: implemented 2026-05-31
Flowcat — native SIP (supersedes the FreeSWITCH gateway)
Decision: SIP is native Rust inside flowcat; FreeSWITCH is removed. Flowcat speaks
SIP/RTP directly, so it's a single-binary native voice runtime — one project, no separate
softswitch. (Supersedes the earlier FreeSWITCH gateway approach described in DESIGN.md.)
1. The transport seam (do this first — everything depends on it)
Generalize the pipeline so it doesn't care whether audio arrived as carrier WS frames or RTP.
New in flowcat-core (transport/mod.rs):
#![allow(unused)] fn main() { #[async_trait] pub trait MediaTransport: Send { /// Next inbound event: the call started, a chunk of caller audio (at /// `carrier_rate`), or the call ended. `None` = transport closed. async fn recv(&mut self) -> Option<MediaIn>; /// Play bot audio (at `carrier_rate`) to the caller. async fn send_audio(&mut self, chunk: AudioChunk) -> Result<(), FlowcatError>; /// Barge-in: flush any buffered playback. No-op where unsupported. async fn send_clear(&mut self) -> Result<(), FlowcatError>; fn carrier_rate(&self) -> u32; // 8000 for telephony G.711 } pub enum MediaIn { StreamStart { call_id: String }, Audio(AudioChunk), // at carrier_rate Stop, } }
CallbecomesCall<Tr: MediaTransport, R: RealtimeLlm, B: AgentBrain, S: SessionSource>(4 params, was 5). The loop:tr.recv()→MediaIn::{StreamStart, Audio, Stop}; realtimeAudioOut→ resample →tr.send_audio;Interrupted→tr.send_clear. The resampler, recorder, transcript, brain/tool handling, finalize are UNCHANGED.- WS path preserved via an adapter:
WsCarrierTransport<So: MediaSocket, Se: MediaSerializer>implsMediaTransport—recv()loopssocket.recv()→serializer.on_messageuntil a non-IgnoreSerIn, maps toMediaIn;send_audio/send_cleargo throughserializer.encode_audio/encode_clear→socket.send_*;carrier_rate= serializer's. The embedder buildsWsCarrierTransport::new(AxumWsSocket, PlivoSerializer)for Plivo. - Keep
MediaSocket/MediaSerializer/PlivoSerializer(used by the adapter). Thegatewayserializer +GatewayEncodingare removed (no more gateway).
2. The SIP/RTP module (flowcat-core/src/sip/)
Add deps to flowcat-core/Cargo.toml: a SIP UA stack — rsipstack (primary; REGISTER +
INVITE/ACK/BYE transactions + dialogs; backs the rustpbx softswitch) — and an RTP layer
(rsipstack's RTP if sufficient, else webrtc-util/rtp). G.711 + resample already present
(audio-codec-algorithms, rubato). If rsipstack's API proves too thin, fall back to
ezk-sip-ua (cleaner, SDP/RTP integrated) and report the switch.
SipAgent(process-level, one per trunk): REGISTER to{server}with{login}/{password}+ periodic re-REGISTER/keepalive; NAT-friendly (rport/symmetric RTP); accept inbound INVITE → yield an inbound call (Call-ID, From, To/DID) to the host; originate outbound INVITE to an E.164 with a configured CallerID. G.711 only (disallow all; allow PCMU,PCMA), 8 kHz, ptime 20 ms,a=sendrecv.SipTransport(per dialog) implsMediaTransport: on answer/established emitStreamStart{call_id = Call-ID}; decode inbound RTP (G.711 per negotiated codec) → 20 msMediaIn::Audio @ 8000; on BYE/timeout →Stop.send_audio→ G.711-encode → RTP packets (monotonic seq + timestamp, 20 ms cadence, correct PT/SSRF).send_clear→ no-op (RTP has no flush; barge-in = stop sending). A small fixed playout jitter buffer (reorder by seq, bounded depth, drop late) — telephony fixed-rate makes this simple; document the depth.- Unit-test (no live socket): SDP offer/answer build + parse (PCMU/PCMA pick, ptime), RTP
packetize/depacketize (seq/ts/PT), the
MediaTransportmapping (a fed RTP stream → Audio; a synthesized BYE → Stop). Live registration/INVITE against the trunk is gated on the user.
3. Host + control-plane rewire (embedder side)
- The embedder runs a
SipAgentat startup, registering the SIP trunk from its own SIP trunk configuration (server / login / password / caller-id, plus public-IP/ports as needed) passed toSipConfig/SipAgent. On inbound INVITE → the embedder's service-authedsip/inbound-resolve(body{dialed_did, caller, call_id}) →{run_id, token}→ buildCall::new(SipTransport, gemini, <brain>, <session>, run_id, token)→ run. Add an internal outbound endpoint (authed by the run's token, or service auth){run_id, token, to_number}→ SipAgent originates the INVITE, builds the Call, runs it. The WS-media path is unchanged. - control plane: the SIP originate path no longer uses ESL — it POSTs to the embedder's media
binary's originate endpoint (internal base from settings). Remove any FreeSWITCH ESL routes
- settings. Keep
sip/inbound-resolve, the carrier provider (CDR webhook verify + status still useful), and the sealed credentials. The REST signer may go unused → keep (tested, small) or remove if truly dead.
- settings. Keep
4. Removals (after native SIP compiles)
flowcat/deploy/freeswitch/(whole dir) ·flowcat-core/src/serializer/gateway.rs+GatewayEncoding+ itslib.rsexport · the embedder's FreeSWITCH ESL routes + their module registration · ESL config fields · FreeSWITCH env in any.env.example.- Update
DESIGN.md: SIP media = native SIP (no FreeSWITCH); bring-up = register the trunk in the embedder + asofia-free register check.
Security (re-review the new surface)
Trunk SIP credentials live in the embedder's config — never logged; the inbound INVITE trust
flows through sip/inbound-resolve (service-authed, identity from the DID route, never the
INVITE body); the outbound originate endpoint must authenticate (run token / service auth) and
validate to_number (E.164). RTP from an unexpected source addr should be dropped (symmetric
RTP). No new secret crosses the wire to the model/caller.
Contributor reference. This is the provider-implementation taxonomy — how connectors are triaged and built. If you just want to use providers, see Providers & features and Configuration.
Flowcat provider protocol-family map
Mirrors the per-provider feature matrix + shared-file pattern used elsewhere; covers the STT / TTS / LLM provider breadth (see
ROADMAP.md).Purpose. This map covers the ~70 remaining providers, each landing in an isolated worktree. The merge hazard is contention on the two shared files —
flowcat-services/Cargo.toml([features]/[dependencies]) and each category'ssrc/{stt,tts,llm}/mod.rs(themod-decl +pub uselist). Pre-creating every provider's module home + feature + dep + a compiling stub means each provider fills one body — never adds amoddecl or a Cargo line → conflict-free merge. It also classifies every provider as (D) distinct client or (W) thin wrapper so a protocol family's real client is implemented once and the wrappers reuse it.
0. The two-letter triage
| Tag | Meaning | What the fan-out agent does |
|---|---|---|
| (D) | Distinct client — its own wire protocol (own WS/HTTP framing, own auth, own message schema). Needs a real, from-scratch impl + wire-fixture tests. | Implement the client + pure encode/decode seam + unit tests (the Deepgram/Cartesia/OpenAI template). |
| (W) | Thin wrapper — OpenAI-/Whisper-/an-existing-client-compatible: same wire protocol as a (D) family member, differing only in base_url + auth header + default model. | A small struct that constructs the family's (D) client with the provider's base URL/auth and delegates the trait (the GrokRealtime-over-OpenAiRealtime template). |
The discipline: implement each family's (D) client once; every (W) in that family is ~30 lines of config + delegation. A category agent should own a whole family (the (D) client + all its (W)s) so the wrapper contract is set by the same hand.
1. LLM (26 providers; openai✓ done, anthropic = stub-home here)
The headline finding (confirmed against pipecat): the LLM list is overwhelmingly
OpenAI-compatible. In pipecat 15 of the LLM services literally class XLLMService(OpenAILLMService) —
they are a base_url + default-model change. They are all (W) over the existing
OpenAiLlm (which already has .base_url() + .model() builders and was written for
exactly this — its doc already names OpenRouter). Only 3 are (D).
| Provider | Tag | Family / how | base_url (verified in pipecat) |
|---|---|---|---|
| openai | (D) ✓done | OpenAiLlm (chat-completions SSE) | https://api.openai.com/v1 |
| openai_responses | (D) | OpenAI Responses API (/responses, different request+event schema from chat-completions) — its own decode | https://api.openai.com/v1 |
| anthropic | (D) | Messages API (/v1/messages, anthropic-version header, content-block-delta SSE) — own client | https://api.anthropic.com |
| google (gemini) | (D) | generativelanguage streamGenerateContent (Gemini text API; distinct from the realtime client already in core) | https://generativelanguage.googleapis.com |
| aws_bedrock | (D) | Bedrock InvokeModelWithResponseStream (SigV4 + AWS event-stream framing) — own SigV4 path | (region host) |
| groq | (W) | OpenAiLlm | https://api.groq.com/openai/v1 |
| together | (W) | OpenAiLlm | https://api.together.xyz/v1 |
| fireworks | (W) | OpenAiLlm | https://api.fireworks.ai/inference/v1 |
| openrouter | (W) | OpenAiLlm | https://openrouter.ai/api/v1 |
| perplexity | (W) | OpenAiLlm | https://api.perplexity.ai |
| deepseek | (W) | OpenAiLlm | https://api.deepseek.com/v1 |
| cerebras | (W) | OpenAiLlm | https://api.cerebras.ai/v1 |
| sambanova | (W) | OpenAiLlm | https://api.sambanova.ai/v1 |
| nebius | (W) | OpenAiLlm | https://api.tokenfactory.nebius.com/v1/ |
| novita | (W) | OpenAiLlm | https://api.novita.ai/openai |
| qwen | (W) | OpenAiLlm | https://dashscope-intl.aliyuncs.com/compatible-mode/v1 |
| grok (xai) | (W) | OpenAiLlm | https://api.x.ai/v1 |
| nvidia_nim | (W) | OpenAiLlm | https://integrate.api.nvidia.com/v1 |
| ollama | (W) | OpenAiLlm | http://localhost:11434/v1 |
| sarvam | (W) | OpenAiLlm | https://api.sarvam.ai/v1 |
| mistral | (W)¹ | OpenAiLlm | https://api.mistral.ai/v1 |
| azure | (W)² | OpenAiLlm | Azure endpoint + api-version, api-key auth |
| speaches | (W) | OpenAiLlm | http://localhost:11434/v1 (self-hosted, OpenAI-compatible) |
¹ mistral: the brief flags it as arguably (D), but pipecat's MistralLLMService(OpenAILLMService)
is OpenAI-compatible → (W). (Mistral STT/TTS are (D) — see below; only the LLM is a wrapper.)
² azure: OpenAI-compatible wire but a different auth (api-key header) + URL shape (?api-version=).
It needs OpenAiLlm to accept an api-key-style header — a tiny (D-ish) seam on the OpenAI client
(an auth-mode toggle), then the rest is (W). Grouped with the (W) cohort but flagged for the auth seam.
LLM (D) count: 5 (openai✓, openai_responses, anthropic, google/gemini, aws_bedrock).
LLM (W) count: 18. → One agent implements the 4 new (D) clients; one agent fans out
all 18 (W)s (each a base_url+model struct over OpenAiLlm). The anthropic stub-home is
set up here because its llm-anthropic feature already exists from earlier
but the impl was never landed (the earlier "anthropic done" is the feature, not the
module) — this map fills it.
2. STT (~21 remaining; deepgram✓ done)
Protocol families (cross-checked against pipecat base classes):
- Streaming-WebSocket (each its own JSON schema) —
WebsocketSTTServicesubclasses. Mostly (D) (distinct message schemas), except where they ride a sibling's protocol. - Whisper-HTTP segmented —
BaseWhisperSTTService/SegmentedSTTService: POST a finished audio segment to an OpenAI-/audio/transcriptions-shaped endpoint. One (D) Whisper-HTTP client, the rest (W) over it. - gRPC — Google + NVIDIA Riva use
tonic. (D) each (distinct protos). - Local — whisper.cpp via
whisper-rs. (D), C-toolchain (see §5).
| Provider | Tag | Family / protocol | Notes |
|---|---|---|---|
| deepgram | (D) ✓done | streaming WS /v1/listen | reference impl |
| assemblyai | (D) | streaming WS v3 wss://streaming.assemblyai.com/v3/ws | own JSON schema |
| gladia | (D) | streaming WS (init-then-stream session) | own schema |
| soniox | (D) | streaming WS wss://stt-rt.soniox.com/transcribe-websocket | own schema |
| speechmatics | (D) | streaming WS wss://*.rt.speechmatics.com/v2 | own schema |
| cartesia | (D) | streaming WS (/stt/websocket) | own schema (sibling of the TTS WS) |
| aws_transcribe | (D) | AWS Transcribe streaming WS (SigV4-signed) + event framing | SigV4 (see §5) |
| azure | (D) | Azure Speech SDK / WS | own protocol |
| elevenlabs | (D) | segmented HTTP /v1/speech-to-text | own (not Whisper-shaped) |
| openai | (D) Whisper-HTTP base | POST /audio/transcriptions (Whisper) | the Whisper-HTTP (D) family client |
| groq | (W) | Whisper-HTTP | https://api.groq.com/openai/v1 |
| fal | (W) | Whisper-HTTP | fal endpoint |
| speaches | (W) | Whisper-HTTP | self-hosted OpenAI-compatible |
| xai | (W)³ | Whisper-HTTP / OpenAI-STT WS | https://api.x.ai/v1 |
| sarvam | (D) | sarvam REST STT | own schema |
| mistral | (D) | mistral STT REST | own schema |
| (D) gRPC | Cloud Speech tonic streaming | gRPC (see §5) | |
| nvidia (riva) | (D) gRPC | Riva ASR tonic | gRPC (see §5) |
| gradium | (D) | gradium WS STT | own schema |
| whisper_local | (D) | whisper-rs (whisper.cpp) | C-toolchain (see §5) |
| speaches¹ | see groq row | — | listed once above |
³ xai STT: pipecat's is WebsocketSTTService; xAI exposes an OpenAI-compatible STT — treat as
(W) over the Whisper-HTTP/OpenAI-STT family unless a live test shows a distinct WS schema (flag for
the implementer). Conservative grouping: (W).
STT (D) count: ~16 (deepgram✓ + assemblyai, gladia, soniox, speechmatics, cartesia,
aws_transcribe, azure, elevenlabs, openai-Whisper-base, sarvam, mistral, google-gRPC,
nvidia/riva-gRPC, gradium, whisper_local). STT (W) count: ~4 (groq, fal, speaches, xai —
all Whisper-HTTP over the openai client). The brief also names nvidia, riva, speaches —
riva is nvidia's STT (one provider, one feature stt-nvidia); speaches is the (W).
3. TTS (~30 remaining; cartesia✓ done)
Two big families + a long tail of HTTP-POST-audio providers:
- Streaming-WebSocket TTS —
WebsocketTTSServicesubclasses: connect a WS, send a synthesis request, read base64/binary PCM chunks. The Cartesia ref impl is the template. Each has its own request/response schema → (D). - HTTP-POST-audio (one-shot / segmented) —
TTSServicesubclasses that POST text and read an audio body (mp3/pcm/opus). The wire shape is similar (POST → audio bytes) but the request schemas + audio containers differ enough that there is no single reusable client; most are (D), a few are (W) where they share an OpenAI-TTS shape.
| Provider | Tag | Family / protocol |
|---|---|---|
| cartesia | (D) ✓done | WS streaming (reference) |
| elevenlabs | (D) | WS streaming /v1/text-to-speech/{voice}/stream-input |
| deepgram | (D) | WS streaming /v1/speak |
| rime | (D) | WS streaming |
| asyncai | (D) | WS streaming |
| gradium | (D) | WS streaming |
| soniox | (D) | WS streaming |
| resemble | (D) | WS streaming |
| openai | (D) HTTP base | POST /audio/speech → audio body (the OpenAI-TTS-HTTP family client) |
| groq | (W) | OpenAI-TTS-HTTP (/audio/speech) over the openai client |
| xai | (W) | OpenAI-TTS-HTTP shape (XAIHttpTTSService) |
| aws_polly | (D) | AWS Polly (SigV4 + SynthesizeSpeech) — §5 |
| azure | (D) | Azure Speech SSML over WS/HTTP |
| (D) | Google Cloud TTS (HTTP/gRPC) — §5 for the gRPC variant | |
| sarvam | (D) | sarvam HTTP TTS |
| mistral | (D) | mistral HTTP TTS |
| nvidia | (D) | NVIDIA Riva TTS (gRPC) — §5 |
| hume | (D) | Hume HTTP |
| inworld | (D) | Inworld HTTP |
| minimax | (D) | MiniMax HTTP |
| camb | (D) | Camb HTTP |
| fish | (D) | Fish Audio (interruptible) |
| lmnt | (D) | LMNT (interruptible) |
| neuphonic | (D) | Neuphonic (interruptible) |
| smallest | (D) | Smallest (interruptible) |
| speechmatics | (D) | Speechmatics TTS |
| kokoro | (D) | Kokoro (local model) — C/ONNX-ish, may need a model file |
| piper | (D) | Piper (local model, subprocess/ONNX) |
| xtts | (D) | XTTS (local model) |
| asyncai¹ | see WS row | — |
TTS (D) count: ~28 (cartesia✓ + 27). TTS (W) count: ~2 (groq, xai — OpenAI-TTS-HTTP).
TTS is the least wrapper-friendly category — almost every vendor has a bespoke request body.
The leverage here is two shared client helpers (a WsTtsClient connect/send/read-base64
skeleton mirroring Cartesia, and an HttpTtsClient POST-and-read-audio skeleton) that the (D)
impls reuse for the transport plumbing while each supplies its own request encode + container
decode. (Those helpers are an implementation convenience for the implementations, not part of this
map.)
4. Recommended fan-out grouping (which agent owns what)
To keep each protocol family's (D) client authored once and its (W)s consistent, group by family, not by alphabetical slice:
| Agent | Owns | Why grouped |
|---|---|---|
| A — LLM/OpenAI-compatible | the 18 LLM **(W)**s over OpenAiLlm | identical pattern; one PR, 18 ~30-line structs + the base_url table |
| B — LLM/distinct | anthropic, google-gemini, openai_responses, aws_bedrock (the 4 new LLM (D)) | 4 distinct clients incl. the SigV4 Bedrock path |
| C — STT/Whisper-HTTP | openai-STT (D) client + groq/fal/speaches/xai (W) | one Whisper-HTTP client, 4 wrappers |
| D — STT/streaming-WS | assemblyai, gladia, soniox, speechmatics, cartesia-STT, elevenlabs-STT, azure-STT, gradium-STT, sarvam, mistral-STT | all WS/REST distinct schemas; the Deepgram template |
| E — STT/gRPC+AWS+local | google-gRPC, nvidia/riva-gRPC, aws_transcribe (SigV4), whisper_local | the toolchain-heavy STT (§5) — keep together so the tonic/SigV4/whisper-rs plumbing is solved once |
| F — TTS/streaming-WS | elevenlabs, deepgram, rime, asyncai, gradium, soniox, resemble TTS | share the WsTtsClient helper (Cartesia template) |
| G — TTS/HTTP cloud | openai-TTS (D) + groq/xai (W), hume, inworld, minimax, camb, sarvam, mistral, speechmatics, azure-TTS | share the HttpTtsClient helper |
| H — TTS/interruptible+local+AWS+gRPC | fish, lmnt, neuphonic, smallest (interruptible); kokoro, piper, xtts (local models); aws_polly (SigV4); google-TTS / nvidia-TTS (gRPC) | the bespoke + toolchain-heavy TTS tail (§5) |
8 groups, family-coherent, no two touching the same provider. The new auth/SigV4 paths (aws_bedrock, aws_transcribe, aws_polly, the azure api-key seam) are the security-sensitive ones.
5. Toolchain / dependency notes for the fan-out
This crate declares every dep optional + dep:-gated so a default build pulls
nothing and --features <provider> compiles the stub. Three families need a toolchain/dep beyond
the existing reqwest/tokio-tungstenite/tokio/base64:
| Family | Dep | Gate(s) | Toolchain note |
|---|---|---|---|
| Google / NVIDIA-Riva STT+TTS | tonic 0.14 (already declared) | stt-google, stt-nvidia, tts-google, tts-nvidia | gRPC; needs the .protos compiled — tonic-build at build time (a build.rs, added by the impl, not here). protoc must be on PATH for codegen. |
| whisper_local | whisper-rs 0.16 (already declared) | stt-whisper-local | bundles whisper.cpp → needs cmake + a C/C++ toolchain to build. Not present by default on this machine unless cmake is installed — building --features stt-whisper-local may fail at the C build step. The Rust stub itself compiles; the dep's C build is the gate. |
| AWS (Bedrock LLM, Transcribe STT, Polly TTS) | hand-rolled SigV4 — no AWS SDK | llm-aws-bedrock, stt-aws-transcribe, tts-aws-polly | no new crate; SigV4 over reqwest + a SHA-256/HMAC (hmac/sha2) — declared optional. |
opentelemetry/gRPC/whisper-rs are the only non-pure-Rust-network deps. Everything else is
reqwest (rustls) or tokio-tungstenite (rustls) — the rustls-only + zero-default-cost discipline
holds. This adds hmac + sha2 (optional) for the AWS SigV4 family; no other new dep.
6. Acceptance
cargo build -p flowcat-services(default) → pulls nothing, compiles the trait re-exports + stubs only.cargo build -p flowcat-services --features stt-all,tts-all,llm-all→ compiles every stub (modulostt-whisper-local's C build, which needscmake— see §5).cargo test -p flowcat-services(existing tests) → still green.- Every stub: Apache SPDX header +
//! WS-n: <provider> — TODO+ a frozen-traitimplwhose bodies returnFlowcatError::Other("<provider>: not yet wired (WS-n)")(ortodo!()for the&strname()accessor) so the crate compiles.#[allow(dead_code)]on the held config fields is the sanctioned clippy escape on a stub.
7. What this map deliberately does NOT do
- It does not implement any provider body (that is the fan-out).
- It does not add the WS/HTTP TTS helper clients (
WsTtsClient/HttpTtsClient) or the gRPCbuild.rs/.protos — those are owned by the impls. - It does not touch
flowcat-core, the Deepgram/Cartesia/OpenAI reference impls' logic, other crates, or the embedder. - It does not create a per-(W) stub where the (W) is literally the (D) client + a base_url
(the LLM wrappers) — those get a thin module stub anyway (so the
mod/feature exists for the fan-out to fill), but their stub body just notes "wrapper over OpenAiLlm — set base_url".
Flowcat roadmap
This is a forward-looking, best-effort roadmap — directional, not a commitment.
Shipped capabilities are described in the README and
FEATURES.md; this file tracks what is not yet done.
Recently landed
- Composable
FrameProcessorpipeline — typed frame taxonomy, system-frame priority/interruption model, linear + parallel pipelines. (Frozen API inPROCESSOR-DESIGN.md.) - In-process native SIP/RTP/SDP user-agent for G.711 telephony.
- ~80 STT/TTS/LLM + realtime connectors, each a
dep:-gated Cargo feature (FEATURES.md). - Runnable demos in
flowcat-cli: an in-process pipeline demo and a WebSocket PCM echo-bot (pipeline,ws-echo). - Use it from Python (out-of-process) — the
RemoteBrainHTTP adapter (brain-http) drives conversation policy from a Python service, and themcpclient exposes Python functions as tools. Reference servers inexamples/. This is the supported Python path today.
Planned
Python bindings (PyO3)
The out-of-process path above already lets Python drive Flowcat without writing
Rust. The next step is in-process bindings: import flowcat, build a pipeline
in Python, and pass Python callables/objects as the brain — single process, no
service to host. The binding will keep Python at turn granularity (releasing
the GIL during the Rust media loop) so the tail-latency guarantees hold. This is a
developer-ergonomics step, not a capability gap. Not yet started.
WebRTC browser transport
Complete the str0m-based WebRTC transport (+ Opus) so a browser client can
connect directly, alongside the existing carrier/WebSocket paths.
Local device backend
The local mic/speaker transport currently ships its shape as a stub (no device
backend, to keep the default build free of a heavy platform audio dependency).
Add an optional, feature-gated device backend so the local demo can capture and
play real audio.
Broader live-verified provider coverage
Connectors are currently fixture/wire-tested — each provider's message framing is pinned by unit tests, but end-to-end live calls require that vendor's credentials and are not exercised in CI. Expand the set of provider paths that have been verified against the live service.
LiveKit transport
The livekit transport is currently a stub; wire up real signaling.
How to influence the roadmap
Open an issue or start a discussion. Adding a new connector is usually a small,
self-contained contribution — see CONTRIBUTING.md and the
provider-family taxonomy in PROVIDERS.md.
Changelog
All notable changes to this project are documented here. The format is based on
Keep a Changelog, and this project aims to
follow Semantic Versioning from 1.0.0
onward. Until then (pre-1.0), minor versions may include breaking changes.
[Unreleased]
Added
flowcat-cliships two runnable, credential-free demos (the OSS examples surface):pipeline(an in-processFrameProcessorpipeline over a synthetic sine-wave source) andws-echo(PCM echo over the generic WebSocket transport, with a self-contained--loopbackround-trip or--connect <ws://url>).RemoteBrainHTTP adapter (flowcat-services, featurebrain-http): drive a call's conversation policy from an out-of-process HTTP service (e.g. a Python webhook) via theAgentBrainseam, at turn granularity. Includes a documented JSON wire contract, a request timeout, a fail-safe (Stayon transient error or timeout), and fixture tests.examples/for using Flowcat from Python without writing Rust: a pure-stdlib referencepython-remote-brainserver and apython-mcp-toolsMCP server.ROADMAP.mddescribing planned work (in-process PyO3 bindings, WebRTC browser transport, a local audio device backend, broader live-verified coverage).- Standard project docs:
CODE_OF_CONDUCT.md,SECURITY.md, this changelog, and GitHub issue / pull-request templates.
Changed
- Documentation scrubbed for the public release: design docs are now
embedder-agnostic, and
SPINOUT.mdis repurposed as a workspace-independence note for contributors.
Removed
- Internal-only planning and operations documents that are not relevant to the public project.
Breaking
- Renamed the runtime's environment-variable keys off the previous prefix to the
FLOWCAT_*family:FLOWCAT_VOICE,FLOWCAT_VAD_START_SENSITIVITY,FLOWCAT_VAD_END_SENSITIVITY,FLOWCAT_VAD_PREFIX_PADDING_MS,FLOWCAT_VAD_SILENCE_DURATION_MS, andFLOWCAT_MINIMAX_GROUP_ID. Embedders setting these must update the key names.
Code of Conduct
Our pledge
The Flowcat project is committed to providing a friendly, safe, and welcoming environment for all contributors and participants, regardless of background or identity. We expect everyone who takes part in the project — in issues, pull requests, discussions, and any other project space — to help keep it that way.
Our standard
This project adopts the Contributor Covenant, version 2.1 as its code of conduct. The full text is available at:
In short: be respectful and constructive, assume good faith, welcome newcomers, and accept feedback gracefully. Behaviour that harasses, demeans, or excludes others is not tolerated. Maintainers are responsible for clarifying standards and may remove, edit, or reject contributions, comments, and other interactions that are not aligned with this Code of Conduct.
Scope
This Code of Conduct applies within all project spaces, and also when an individual is officially representing the project in public spaces.
Reporting
If you experience or witness unacceptable behaviour, report it confidentially to the maintainers at security@areev.ai. All reports will be reviewed and investigated promptly and fairly, and the privacy and security of the reporter will be respected.
Enforcement
Maintainers will follow the Contributor Covenant's Enforcement Guidelines when determining the consequences for any action they deem in violation of this Code of Conduct.
Attribution: this Code of Conduct adopts the Contributor Covenant, version 2.1, available at https://www.contributor-covenant.org/version/2/1/code_of_conduct/.
Security policy
Supported versions
Flowcat is pre-1.0 and under active development. Security fixes are applied to
the latest main. Until a 1.0 release, only the most recent published version
(and main) is supported.
Reporting a vulnerability
Please do not open a public issue for security vulnerabilities.
Report privately through either channel:
- GitHub private vulnerability reporting (preferred) — use the repository's Security → Report a vulnerability tab to open a private advisory.
- Email — security@areev.ai, with a description and reproduction steps.
Please include, where possible:
- the affected crate(s) and version / commit,
- the feature flags and provider/transport involved,
- a minimal reproduction or proof of concept, and
- the impact you foresee.
What to expect
- We aim to acknowledge a report within 3 business days.
- We will work with you on an assessment and a fix, and keep you updated on progress.
- We support coordinated disclosure: we ask that you give us a reasonable window to release a fix before any public disclosure, and we will credit you for the report unless you prefer to remain anonymous.
Scope
This project is a media-pipeline runtime that speaks third-party provider protocols using credentials you supply. Vulnerabilities in third-party services or SDKs should be reported to those vendors. Issues in Flowcat's own code — for example in its SIP/RTP/SDP parsing, the SigV4 signing paths, WebSocket framing, or audio codec handling — are in scope.