Understanding the architecture and capabilities of PyTrickle
PyTrickle is built with a modular architecture that separates concerns and enables flexible video processing pipelines.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Client Application β
β (Your Python Code) β
ββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β StreamProcessor β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Decorators (@video_handler, @audio_handler, ...) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
ββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββΌβββββββββββββββ
βΌ βΌ βΌ
ββββββββββββββββ ββββββββββββ ββββββββββββββββββ
β HTTP Server β β Frame β β TrickleProtocolβ
β (aiohttp) β βProcessor β β (coordination)β
ββββββββββββββββ ββββββββββββ ββββββββββββββββββ
β β β
β ββββββ΄βββββ β
β βΌ βΌ β
β βββββββββββ βββββββββββ β
β β Video β β Audio β β
β βHandler β βHandler β β
β βββββββββββ βββββββββββ β
β β
ββββββββββββββ¬ββββββββββββββ-ββ
β
βββββββββββββΌββββββββββββββββββββββββββββββββββββββββββββββββ
βΌ βΌ βΌ
ββββββββββ ββββββββββ ββββββββββββββββββββββ
βDecoder β βEncoder β β TrickleServer β
β (PyAV) β β (PyAV) β β(subscribe/publish) β
ββββββββββ ββββββββββ ββββββββββββββ-----βββ
β² β β²
β β β
β ββββββββ΄βββββββ ββββββββ΄βββββββ
β β TrickleSub β β TricklePub β
β β (HTTP GET) β β (HTTP POST) β
β βββββββββββββββ βββββββββββββββ
Input Stream (Trickle subscribe URL)
β
TrickleSubscriber (HTTP GET)
β
Decoder (PyAV)
β
Frame Objects (VideoFrame/AudioFrame)
β
Your Handlers (@video_handler, @audio_handler)
β
Processed Frame Objects
β
Encoder (PyAV)
β
TricklePublisher (HTTP POST)
β
Output Stream (Trickle publish URL)
Location: pytrickle/stream_processor.py
Purpose: High-level API that orchestrates the entire pipeline
Key Features:
# Create processor from handlers
processor = StreamProcessor.from_handlers(
handlers,
name="my-processor",
port=8000
)
Location: pytrickle/frame_processor.py
Purpose: Executes user-defined processing logic on frames
Key Features:
Location: pytrickle/client.py
Purpose: Handles communication with trickle servers
Key Features:
Location: pytrickle/decoder.py, pytrickle/encoder.py
Purpose: Convert between raw media and frame objects
Key Features:
Command-line interface for scaffolding new applications. Provides pytrickle init and pytrickle list commands.
Decorator definitions for handler methods: @video_handler, @audio_handler, @model_loader, etc.
Frame data structures: VideoFrame and AudioFrame with tensor/sample data and metadata.
Pydantic request/response models used by the HTTP server (e.g., StreamStartRequest, StreamParamsUpdateRequest).
Trickle protocol coordination: manages queues, ingress/egress loops, and monitoring events.
TrickleSubscriber: receives segments via HTTP GET with background preconnect and retry.
TricklePublisher: streams encoded bytes via HTTP POST using an async queue-backed generator.
HTTP server (aiohttp) exposing routes: /stream/start, /stream/stop, /stream/params, /stream/status, /health.
Stream session management and lifecycle coordination.
Frame rate measurement and throttling for performance optimization.
Frame caching for efficient memory management.
Stream state management and status tracking.
Utility functions for media processing, tensor operations, and logging.
Handler registry for managing decorated processing functions.
Represents a single video frame with associated metadata.
# Access video frame data
tensor = frame.tensor # Shape: (H, W, 3)
# Process on GPU
if torch.cuda.is_available():
tensor = tensor.cuda()
processed = my_model(tensor)
# Return processed frame
return frame.replace_tensor(processed)
Represents audio samples with channel configuration.
# Access audio frame data
samples = np.asarray(frame.samples)
# Process audio (e.g., apply filter)
processed = my_audio_filter(samples)
# Return processed frame
return [frame.replace_samples(processed)]
PyTrickle uses decorators to define pipeline behavior in a clean, declarative way.
Called once when stream starts. Use for:
@model_loader
async def load(self, **kwargs):
self.model = load_my_model()
self.model.eval()
Called for each video frame. Use for:
@video_handler
async def handle_video(self, frame):
tensor = frame.tensor
processed = self.model(tensor)
return frame.replace_tensor(processed)
Called for each audio frame. Use for:
@audio_handler
async def handle_audio(self, frame):
samples = frame.samples
# Process audio
return [frame]
Called when parameters are updated via API. Use for:
@param_updater
async def update_params(self, params):
if "intensity" in params:
self.intensity = params["intensity"]
Called when stream stops. Use for:
@on_stream_stop
async def on_stop(self):
del self.model
torch.cuda.empty_cache()
Called when stream starts. Use for:
@on_stream_start
async def on_start(self):
await self._initialise_background_tasks()
logger.info("Start request received")
Start with a template that matches your use case:
# List available templates
pytrickle list
# Create new app from template
pytrickle init my_pipeline --template passthrough
Implement your custom handlers:
class MyHandlers:
def __init__(self):
self.model = None
@model_loader
async def load(self, **kwargs):
# Load your AI model
self.model = YourModel()
self.model.eval()
@video_handler
async def handle_video(self, frame: VideoFrame):
# Process video frame
tensor = frame.tensor
output = self.model(tensor)
return frame.replace_tensor(output)
@audio_handler
async def handle_audio(self, frame: AudioFrame):
# Process audio frame (or pass through)
return [frame]
Set up the processor and start the server:
async def main():
handlers = MyHandlers()
processor = StreamProcessor.from_handlers(
handlers,
name="my-pipeline",
port=8000
)
await processor.run_forever()
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Start processing and manage your pipeline: (you need to have a trickle server running on http://trickle:3389) with video source
# Start processing a stream
curl -X POST http://localhost:8000/stream/start \
-H "Content-Type: application/json" \
-d '{
"subscribe_url": "http://trickle:3389/subscribe",
"publish_url": "http://trickle:3389/publish",
"gateway_request_id": "req-123",
"params": {"width": 512, "height": 512, "max_framerate": 30}
}'
# Update parameters in real-time (runtime-safe fields only)
curl -X POST http://localhost:8000/stream/params \
-H "Content-Type: application/json" \
-d '{"enabled": true}'
# Get current status
curl http://localhost:8000/stream/status
# Health check
curl http://localhost:8000/health
# Stop processing
curl -X POST http://localhost:8000/stream/stop
Apply ML models for super-resolution, denoising, or style transfer
@video_handler
async def handle_video(self, frame):
tensor = frame.tensor.cuda()
enhanced = self.super_resolution_model(tensor)
return frame.replace_tensor(enhanced.cpu())
Detect objects and draw bounding boxes on video
@video_handler
async def handle_video(self, frame):
detections = self.detector(frame.tensor)
annotated = draw_boxes(frame.tensor, detections)
return frame.replace_tensor(annotated)
Apply color grading, effects, or artistic filters
@video_handler
async def handle_video(self, frame):
filtered = apply_color_grade(
frame.tensor,
intensity=self.config.intensity
)
return frame.replace_tensor(filtered)
Noise reduction, echo cancellation, or voice enhancement
@audio_handler
async def handle_audio(self, frame):
clean_samples = noise_reduction(
frame.samples,
noise_profile=self.noise_profile
)
return [frame.replace_samples(clean_samples)]