ð fibersse - Awesome Go Library for Networking

Production-grade Server-Sent Events (SSE) for Fiber v3 with event coalescing, priority lanes, topic wildcards, adaptive throttling, and built-in auth
Detailed Description of fibersse
FiberSSE
Production-grade Server-Sent Events (SSE) for Fiber v3
React SDK available:
npm install fibersse-reactâ hooks for TanStack Query / SWR cache invalidation. GitHubBlog: How We Eliminated 90% of API Calls by Replacing Polling with SSE
Stop polling. Start pushing. The only SSE library built natively for Fiber v3 â with built-in cache invalidation, event coalescing, and one-line domain event publishing.
Replace setInterval with one line of Go:
// Before: client polls every 30 seconds (wasteful)
// setInterval(() => fetch("/api/orders"), 30_000)
// After: server pushes when data ACTUALLY changes
hub.Invalidate("orders", order.ID, "created") // â client refetches instantly
80-90% fewer API calls. Real-time UI. Zero polling.
Why FiberSSE?
1. The Only SSE Library That Works on Fiber
Every Go SSE library (r3labs/sse, tmaxmax/go-sse) is built on net/http and breaks on Fiber â fasthttp.RequestCtx.Done() only fires on server shutdown, not per-client disconnect. Zombie subscribers leak forever. fibersse uses Fiber's native SendStreamWriter with w.Flush() error detection.
2. Built to Kill Polling
Most SSE libraries just push events. fibersse has built-in patterns for replacing polling:
| API | What It Does | Replaces |
|---|---|---|
hub.Invalidate() | Signal clients to refetch a resource | setInterval polling |
hub.InvalidateForTenant() | Tenant-scoped invalidation (multi-tenant SaaS) | Tenant polling |
hub.InvalidateForTenantWithHint() | Tenant-scoped + data hints in one call | Polling + extra fetch |
hub.DomainEvent() | Structured event from any handler/worker | Manual event wiring |
hub.BatchDomainEvents() | Multiple resource changes in one SSE frame | Multiple polling loops |
hub.Progress() | Coalesced progress (5%â8% sends only 8%) | 2s progress polling |
hub.Complete() | Operation done signal (instant delivery) | Completion polling |
hub.Signal() / SignalForTenant() | Generic "something changed" refresh | Dashboard polling |
3. Every Feature a SaaS Needs
| Feature | r3labs/sse | tmaxmax/go-sse | fibersse |
|---|---|---|---|
| Fiber v3 native | No | No | Yes |
| Disconnect detection | Broken on Fiber | Broken on Fiber | Works (flush-based) |
| Event coalescing | No | No | Yes (last-writer-wins) |
| Priority lanes | No | No | Yes (P0 instant / P1 batched / P2 coalesced) |
| Topic wildcards | No | No | Yes (NATS-style * and >) |
| Adaptive throttling | No | No | Yes (buffer-depth AIMD) |
| Connection groups | No | No | Yes (publish by metadata) |
| Backpressure | Blocks sender | Blocks sender | Drops + reconnect hint |
| Built-in auth | No | No | Yes (JWT + ticket helpers) |
| Prometheus metrics | No | No | Yes |
| Graceful drain | No | No | Yes (Kubernetes-style) |
| Event TTL | No | No | Yes |
| Last-Event-ID replay | Yes | Yes | Yes (pluggable) |
| Fan-out middleware | No | No | Yes (Redis/NATS bridge) |
fibersse vs Fiber SSE Recipe
Fiber's official SSE recipe is ~50 lines of raw SendStreamWriter code. It's a great starting point, but it's a recipe (copy-paste example), not a library. Here's what fibersse adds:
| Feature | Fiber Recipe | fibersse |
|---|---|---|
| Hub pattern (managed connections) | â | â |
| Topic routing | â | â |
NATS-style wildcard topics (*, >) | â | â |
| Event coalescing (P0/P1/P2 priorities) | â | â |
| Authentication (JWT + ticket) | â | â |
| Last-Event-ID replay | â | â |
| Heartbeat management | â | â (adaptive) |
| Connection tracking + groups | â | â |
| Prometheus metrics | â | â |
| Graceful Kubernetes-style drain | â | â |
| Cache invalidation helpers | â | â |
| Multi-tenant support | â | â |
| Domain event publishing | â | â |
| Progress tracking (coalesced) | â | â |
| Auto fan-out from Redis/NATS | â | â |
| Visibility hints (paused tabs) | â | â |
| Adaptive per-connection throttling | â | â |
React SDK (fibersse-react) | â | â |
The recipe is perfect if you need to push a single event to a single client. fibersse is for production apps that need topic routing, multi-tenancy, auth, coalescing, and monitoring.
Install
go get github.com/vinod-morya/fibersse@latest
Requirements: Go 1.23+ and Fiber v3.
Quick Start
package main
import (
"time"
"github.com/gofiber/fiber/v3"
"github.com/vinod-morya/fibersse"
)
func main() {
app := fiber.New()
// Create the SSE hub
hub := fibersse.New(fibersse.HubConfig{
FlushInterval: 2 * time.Second,
HeartbeatInterval: 30 * time.Second,
OnConnect: func(c fiber.Ctx, conn *fibersse.Connection) error {
// Authenticate and set topics
conn.Topics = []string{"notifications", "live"}
conn.Metadata["user_id"] = "user_123"
return nil
},
})
// Mount the SSE endpoint
app.Get("/events", hub.Handler())
// Publish events from anywhere in your app
go func() {
for i := 0; ; i++ {
hub.Publish(fibersse.Event{
Type: "heartbeat",
Data: map[string]int{"count": i},
Topics: []string{"live"},
})
time.Sleep(5 * time.Second)
}
}()
app.Listen(":3000")
}
Client (browser):
const es = new EventSource('/events');
es.addEventListener('heartbeat', (e) => {
console.log(JSON.parse(e.data)); // { count: 0 }
});
es.addEventListener('notification', (e) => {
showToast(JSON.parse(e.data));
});
Kill Polling Guide
Step 1: Replace setInterval with Invalidation
Backend â publish when data changes:
// In your order handler
func (h *OrderHandler) Create(c fiber.Ctx) error {
order, err := h.svc.Create(...)
if err != nil { return err }
// One line â replaces 30s polling for ALL connected clients
hub.InvalidateForTenant(tenantID, "orders", order.ID, "created")
return c.JSON(order)
}
Frontend â listen and refetch:
// With TanStack Query (React Query)
const es = new EventSource('/events?topics=orders');
es.addEventListener('invalidate', (e) => {
const { resource } = JSON.parse(e.data);
queryClient.invalidateQueries({ queryKey: [resource] });
});
// With SWR
es.addEventListener('invalidate', (e) => {
const { resource } = JSON.parse(e.data);
mutate(`/api/${resource}`);
});
Step 2: Track Progress Without Polling
// Backend â in your import worker
for i, row := range rows {
processRow(row)
hub.Progress("import", importID, tenantID, i+1, len(rows))
// Fires 1000 times but client receives ~10 updates (coalesced!)
}
hub.Complete("import", importID, tenantID, true, nil)
// Frontend
es.addEventListener('progress', (e) => {
const { pct } = JSON.parse(e.data);
setProgressBar(pct); // Smooth updates, no polling
});
es.addEventListener('complete', (e) => {
showToast("Import complete!");
queryClient.invalidateQueries({ queryKey: ['products'] });
});
Step 3: Dashboard Signals (No Polling, Ever)
// Backend â after ANY mutation that affects the dashboard
hub.SignalForTenant(tenantID, "dashboard") // coalesced, won't flood
// Or with hints:
hub.InvalidateWithHint("orders", orderID, "created", map[string]any{
"total": 149.99,
"customer": "John Doe",
})
Impact
| Metric | Before (Polling) | After (SSE) |
|---|---|---|
| API calls per user/minute | ~12 (6 pages à 30s) | ~0-2 (only when data changes) |
| Time to see new data | 0-30 seconds | < 200ms |
| Server load | Constant (even idle users poll) | Proportional to actual changes |
| Battery drain (mobile) | High (constant network) | Minimal (idle connection) |
Features
Event Priority & Coalescing
Three priority lanes control how events reach clients:
// P0: INSTANT â bypasses all buffering, sent immediately
// Use for: notifications, errors, chat messages, auth revocations
hub.Publish(fibersse.Event{
Type: "notification",
Data: map[string]string{"title": "New order!"},
Topics: []string{"notifications"},
Priority: fibersse.PriorityInstant,
})
// P1: BATCHED â collected in a time window, all sent together
// Use for: status updates, media processing
hub.Publish(fibersse.Event{
Type: "media_status",
Data: map[string]string{"id": "m_1", "status": "ready"},
Topics: []string{"media"},
Priority: fibersse.PriorityBatched,
})
// P2: COALESCED â last-writer-wins per key
// If progress goes 5% â 6% â 7% â 8% in 2 seconds, client receives only 8%
hub.Publish(fibersse.Event{
Type: "progress",
Data: map[string]int{"pct": 8},
Topics: []string{"tasks"},
Priority: fibersse.PriorityCoalesced,
CoalesceKey: "task:abc123",
})
Topic Wildcards (NATS-style)
Subscribe to topic patterns using * (one segment) and > (one or more trailing segments):
// Client subscribes to "analytics.*"
conn.Topics = []string{"analytics.*"}
// These events all match:
hub.Publish(fibersse.Event{Topics: []string{"analytics.live"}}) // matched by *
hub.Publish(fibersse.Event{Topics: []string{"analytics.revenue"}}) // matched by *
// Subscribe to everything under analytics:
conn.Topics = []string{"analytics.>"}
// Now these also match:
hub.Publish(fibersse.Event{Topics: []string{"analytics.live.visitors"}}) // matched by >
hub.Publish(fibersse.Event{Topics: []string{"analytics.funnel.checkout"}}) // matched by >
Connection Groups
Publish to connections by metadata instead of topics â perfect for multi-tenant SaaS:
// During OnConnect, set metadata:
conn.Metadata["tenant_id"] = "t_123"
conn.Metadata["plan"] = "pro"
// Publish to ALL connections for a specific tenant:
hub.Publish(fibersse.Event{
Type: "tenant_update",
Data: map[string]string{"message": "Plan upgraded"},
Group: map[string]string{"tenant_id": "t_123"},
})
// Publish to all pro-plan users:
hub.Publish(fibersse.Event{
Type: "feature_announcement",
Data: "New feature available!",
Group: map[string]string{"plan": "pro"},
})
Adaptive Throttling
The hub automatically adjusts flush intervals per connection based on buffer saturation:
| Buffer Saturation | Effective Interval | Behavior |
|---|---|---|
| < 10% (healthy) | FlushInterval / 4 | Fast delivery |
| 10-50% (normal) | FlushInterval | Default cadence |
| 50-80% (warning) | FlushInterval à 2 | Slowing down |
| > 80% (critical) | FlushInterval à 4 | Backpressure relief |
Mobile users on slow connections automatically get fewer updates. Desktop users on fast connections get near-real-time delivery. Zero configuration needed.
Client Visibility Hints
Pause non-critical events for hidden browser tabs:
// Server-side: pause/resume a connection
hub.SetPaused(connID, true) // tab hidden â skip P1/P2 events
hub.SetPaused(connID, false) // tab visible â resume all events
P0 (instant) events are always delivered regardless of pause state â critical messages like errors and auth revocations never get dropped.
Built-in Authentication
JWT Auth â validate Bearer tokens or query parameters:
hub := fibersse.New(fibersse.HubConfig{
OnConnect: fibersse.JWTAuth(func(token string) (map[string]string, error) {
claims, err := myJWTValidator(token)
if err != nil {
return nil, err
}
return map[string]string{
"tenant_id": claims.TenantID,
"user_id": claims.UserID,
}, nil
}),
})
Ticket Auth â one-time tickets for EventSource (which can't send headers):
store := fibersse.NewMemoryTicketStore() // or implement TicketStore with Redis
// Issue ticket (in your authenticated POST endpoint):
ticket, _ := fibersse.IssueTicket(store, `{"tenant":"t1","topics":"notifications,live"}`, 30*time.Second)
// Use ticket auth in hub:
hub := fibersse.New(fibersse.HubConfig{
OnConnect: fibersse.TicketAuth(store, func(value string) (map[string]string, []string, error) {
var data struct{ Tenant, Topics string }
json.Unmarshal([]byte(value), &data)
return map[string]string{"tenant_id": data.Tenant},
strings.Split(data.Topics, ","), nil
}),
})
Auto Fan-Out (Redis/NATS Bridge)
Bridge external pub/sub to SSE with one line:
// Redis pub/sub â SSE (implement PubSubSubscriber interface)
cancel := hub.FanOut(fibersse.FanOutConfig{
Subscriber: myRedisSubscriber,
Channel: "notifications:tenant_123",
Topic: "notifications",
EventType: "notification",
Priority: fibersse.PriorityInstant,
})
defer cancel()
// Multiple channels at once:
cancel := hub.FanOutMulti(
fibersse.FanOutConfig{Subscriber: redis, Channel: "notifications:*", Topic: "notifications", EventType: "notification", Priority: fibersse.PriorityInstant},
fibersse.FanOutConfig{Subscriber: redis, Channel: "media:*", Topic: "media", EventType: "media_status", Priority: fibersse.PriorityBatched},
fibersse.FanOutConfig{Subscriber: redis, Channel: "import:*", Topic: "import", EventType: "progress", Priority: fibersse.PriorityCoalesced},
)
defer cancel()
Implement the PubSubSubscriber interface for your broker:
type PubSubSubscriber interface {
Subscribe(ctx context.Context, channel string, onMessage func(payload string)) error
}
Event TTL
Drop stale events instead of delivering outdated data:
hub.Publish(fibersse.Event{
Type: "live_count",
Data: map[string]int{"visitors": 42},
Topics: []string{"live"},
TTL: 5 * time.Second, // useless after 5 seconds
})
Prometheus Metrics
Built-in monitoring endpoints:
// JSON metrics (for dashboards)
app.Get("/admin/sse/metrics", hub.MetricsHandler())
// Prometheus format (for Grafana/Datadog)
app.Get("/metrics/sse", hub.PrometheusHandler())
Exposed metrics:
fibersse_connections_activeâ current open connectionsfibersse_connections_pausedâ hidden-tab connectionsfibersse_events_published_totalâ lifetime events publishedfibersse_events_dropped_totalâ events dropped (backpressure/TTL)fibersse_pending_eventsâ events buffered in coalescersfibersse_buffer_saturation_avgâ average send buffer usagefibersse_buffer_saturation_maxâ worst-case buffer usagefibersse_connections_by_topic{topic="..."}â per-topic breakdownfibersse_events_by_type_total{type="..."}â per-event-type breakdown (invalidate, progress, signal, batch, etc.)
Last-Event-ID Replay
Pluggable replay for reconnecting clients:
hub := fibersse.New(fibersse.HubConfig{
Replayer: fibersse.NewMemoryReplayer(fibersse.MemoryReplayerConfig{
MaxEvents: 1000,
TTL: 5 * time.Minute,
}),
})
Implement the Replayer interface for Redis Streams or any durable store:
type Replayer interface {
Store(event MarshaledEvent, topics []string) error
Replay(lastEventID string, topics []string) ([]MarshaledEvent, error)
}
Graceful Drain (Kubernetes-style)
On shutdown, the hub:
- Enters drain mode (rejects new connections with
503 + Retry-After: 5) - Sends
server-shutdownevent to all connected clients - Waits for context deadline to let clients reconnect elsewhere
- Closes all connections and stops the run loop
// In your shutdown handler:
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
hub.Shutdown(ctx)
Backpressure
Each connection has a bounded send buffer (default: 256 events). If a client can't keep up:
- New events are dropped (not queued infinitely)
MessagesDroppedcounter increments- Monitor via
hub.Metrics()to identify slow clients - The client's EventSource auto-reconnects and gets current state
Benchmarks
Run on Apple M4 Max, Go 1.25, -benchmem:
| Operation | ns/op | B/op | allocs/op |
|---|---|---|---|
| Publish (1 conn) | 477 | 72 | 2 |
| Publish (1,000 conns) | 81,976 | 101,572 | 22 |
| Coalesce same key | 21 | 0 | 0 |
| Topic match (exact) | 8 | 0 | 0 |
Topic match (wildcard *) | 51 | 64 | 2 |
Topic match (wildcard >) | 60 | 96 | 2 |
| Marshal event (string) | 3 | 0 | 0 |
| Marshal event (struct) | 89 | 96 | 2 |
| Connection send | 14 | 0 | 0 |
| Backpressure drop | 2 | 0 | 0 |
| Throttle decision | 19 | 0 | 0 |
| Group match (single key) | 27 | 0 | 0 |
| Replayer store | 140 | 687 | 4 |
Key takeaway: Publishing to 1,000 connections takes 82Ξs. Zero-alloc on all hot paths (topic match, send, backpressure, throttle).
go test -bench=. -benchmem ./...
Configuration
fibersse.HubConfig{
FlushInterval: 2 * time.Second, // P1/P2 coalescing window
SendBufferSize: 256, // per-connection buffer capacity
HeartbeatInterval: 30 * time.Second, // keepalive for disconnect detection
MaxLifetime: 30 * time.Minute, // max connection duration (0 = unlimited)
RetryMS: 3000, // client reconnection hint (ms)
Replayer: nil, // Last-Event-ID replay (nil = disabled)
Logger: slog.Default(), // structured logging (nil = disabled)
OnConnect: nil, // auth + topic selection callback
OnDisconnect: nil, // cleanup callback
OnPause: nil, // called when client tab goes hidden
OnResume: nil, // called when client tab becomes visible
}
Architecture
Publish()
â
âž
ââââââââââââââââââââââââââââââââââââââââââ
â Hub Run Loop (single goroutine) â
â â
â register âââ new connections â
â unregister âââ disconnects â
â events âââ published events â
â â
â For each event: â
â 1. Match topics (exact + wildcard) â
â 2. Match groups (metadata k-v) â
â 3. Skip paused connections (P1/P2) â
â 4. Route by priority: â
â P0 â send channel (immediate) â
â P1 â batch buffer â
â P2 â coalesce buffer (LWW) â
â â
â Flush ticker (every FlushInterval): â
â Adaptive throttle per connection â
â Drain batch + coalesce â send chan â
â â
â Heartbeat ticker: â
â Send comment to idle connections â
ââââââââââââââââââââââââââââââââââââââââââ
â
âž (per-connection send channel)
ââââââââââââââââââââââââââââââââââââââââââ
â Connection Writer (in SendStreamWriter)â
â â
â for event := range sendChan: â
â write SSE format â bufio.Writer â
â w.Flush() â detect disconnect â
ââââââââââââââââââââââââââââââââââââââââââ
File Structure
fibersse/
âââ hub.go Core hub â New(), Publish(), Handler(), Shutdown()
âââ invalidation.go Kill polling â Invalidate(), Signal(), InvalidateForTenant()
âââ domain_event.go One-line publish â DomainEvent(), Progress(), Complete()
âââ event.go Event struct, Priority constants, SSE wire format
âââ connection.go Per-client connection, write loop, backpressure
âââ coalescer.go Batch + last-writer-wins buffers
âââ topic.go NATS-style wildcard topic matching (* and >)
âââ throttle.go Adaptive per-connection flush interval (AIMD)
âââ auth.go JWTAuth, TicketAuth, TicketStore helpers
âââ fanout.go PubSubSubscriber, FanOut(), FanOutMulti()
âââ replayer.go Last-Event-ID replay (pluggable MemoryReplayer)
âââ metrics.go PrometheusHandler, MetricsHandler
âââ stats.go HubStats struct
âââ CLAUDE.md Instructions for AI agents (Claude, Codex, Copilot)
âââ hub_test.go 29 unit tests
âââ integration_test.go 11 integration tests (real Fiber HTTP server)
âââ benchmark_test.go 42 benchmarks (publish, coalesce, topic match, etc.)
Integration with TanStack Query / SWR
The canonical pattern for bridging fibersse events to your React data layer:
TanStack Query (React Query)
import { useQueryClient } from '@tanstack/react-query';
import { useEffect } from 'react';
function useSSEInvalidation(topics: string[]) {
const queryClient = useQueryClient();
useEffect(() => {
const es = new EventSource(`/events?topics=${topics.join(',')}`);
// Single resource invalidation
es.addEventListener('invalidate', (e) => {
const { resource, resource_id, action, hint } = JSON.parse(e.data);
// Invalidate the collection
queryClient.invalidateQueries({ queryKey: [resource] });
// Invalidate the specific item
if (resource_id) {
queryClient.invalidateQueries({ queryKey: [resource, resource_id] });
}
// Optional: update cache directly from hint (skip refetch)
if (hint && resource_id) {
queryClient.setQueryData([resource, resource_id], (old) =>
old ? { ...old, ...hint } : old
);
}
});
// Batch invalidation (multiple resources in one event)
es.addEventListener('batch', (e) => {
const events = JSON.parse(e.data);
const resources = new Set(events.map(e => e.resource));
resources.forEach(resource => {
queryClient.invalidateQueries({ queryKey: [resource] });
});
});
// Progress tracking
es.addEventListener('progress', (e) => {
const { resource_id, pct } = JSON.parse(e.data);
// Update local state for progress bars
});
// Completion
es.addEventListener('complete', (e) => {
const { resource_id, status } = JSON.parse(e.data);
if (status === 'completed') {
queryClient.invalidateQueries(); // refetch everything
}
});
return () => es.close();
}, [topics, queryClient]);
}
// Usage in any page:
function OrdersPage() {
useSSEInvalidation(['orders', 'dashboard']);
const { data } = useQuery({ queryKey: ['orders'], queryFn: fetchOrders });
// â Automatically refetches when server publishes hub.Invalidate("orders", ...)
}
SWR
import { useSWRConfig } from 'swr';
function useSSEInvalidation(topics: string[]) {
const { mutate } = useSWRConfig();
useEffect(() => {
const es = new EventSource(`/events?topics=${topics.join(',')}`);
es.addEventListener('invalidate', (e) => {
const { resource, resource_id } = JSON.parse(e.data);
mutate(`/api/${resource}`);
if (resource_id) mutate(`/api/${resource}/${resource_id}`);
});
return () => es.close();
}, [topics, mutate]);
}
Versioning
This project follows Semantic Versioning:
- v0.x.y â Pre-1.0 development. API may change between minor versions.
- v1.0.0 â Stable API. Breaking changes only in major versions.
Current: v0.5.0.
Roadmap
- Redis Streams Replayer (durable replay across server restarts)
- React SDK (
fibersse-react) âuseSSE()anduseSSEInvalidation()hooks - Admin Dashboard (web UI for live connection monitoring)
- WebSocket fallback transport
- Load testing CLI (
fibersse-bench) - OpenTelemetry tracing integration
- TanStack Query integration example
Examples
Runnable examples in the examples/ directory:
| Example | What it demonstrates | Run |
|---|---|---|
| basic | Minimal hub setup, periodic publisher, browser client | cd examples/basic && go run main.go |
| chat | Multi-room chat with topic wildcards and metadata | cd examples/chat && go run main.go |
| polling-replacement | Side-by-side polling vs SSE comparison | cd examples/polling-replacement && go run main.go |
Contributing
Contributions are welcome! See CONTRIBUTING.md for development workflow, code style, and PR process.
License
MIT - Vinod Morya
Author
Vinod Morya â @vinod-morya
Built at PersonaCart â the creator commerce platform. fibersse powers all real-time features in PersonaCart: notifications, live analytics, media processing, curriculum generation progress, and more.