PyTrickle Code Map

Understanding the architecture and capabilities of PyTrickle

πŸ—οΈ System Architecture

PyTrickle is built with a modular architecture that separates concerns and enables flexible video processing pipelines.

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                      Client Application                     β”‚
β”‚                    (Your Python Code)                       β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                       β”‚
                       β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    StreamProcessor                          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚
β”‚  β”‚   Decorators (@video_handler, @audio_handler, ...)  β”‚    β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                       β”‚
        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
        β–Ό              β–Ό              β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ HTTP Server  β”‚ β”‚  Frame   β”‚ β”‚ TrickleProtocolβ”‚
β”‚   (aiohttp)  β”‚ β”‚Processor β”‚ β”‚  (coordination)β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
        β”‚              β”‚              β”‚
        β”‚         β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β”         β”‚
        β”‚         β–Ό         β–Ό         β”‚
        β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
        β”‚   β”‚ Video   β”‚ β”‚ Audio   β”‚   β”‚
        β”‚   β”‚Handler  β”‚ β”‚Handler  β”‚   β”‚
        β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
        β”‚                             β”‚
        └────────────┬──────────────-β”€β”˜
                     β”‚
         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
         β–Ό           β–Ό                                               β–Ό
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”                                   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚Decoder β”‚  β”‚Encoder β”‚                                   β”‚ TrickleServer      β”‚
    β”‚ (PyAV) β”‚  β”‚ (PyAV) β”‚                                   β”‚(subscribe/publish) β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                   └─────────────-----β”€β”€β”˜
         β–²             β”‚                                              β–²
         β”‚             β”‚                                              β”‚
         β”‚      β”Œβ”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”                                β”Œβ”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”
         β”‚      β”‚ TrickleSub  β”‚                                β”‚ TricklePub  β”‚
         β”‚      β”‚  (HTTP GET) β”‚                                β”‚ (HTTP POST) β”‚
         β”‚      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                

Data Flow

Input Stream (Trickle subscribe URL)
    ↓
TrickleSubscriber (HTTP GET)
    ↓
Decoder (PyAV)
    ↓
Frame Objects (VideoFrame/AudioFrame)
    ↓
Your Handlers (@video_handler, @audio_handler)
    ↓
Processed Frame Objects
    ↓
Encoder (PyAV)
    ↓
TricklePublisher (HTTP POST)
    ↓
Output Stream (Trickle publish URL)

Core Components

1. StreamProcessor

Location: pytrickle/stream_processor.py

Purpose: High-level API that orchestrates the entire pipeline

Key Features:

  • Manages HTTP server for external control
  • Coordinates frame processing lifecycle
  • Handles stream start/stop operations
  • Provides decorator-based handler registration
# Create processor from handlers
processor = StreamProcessor.from_handlers(
    handlers,
    name="my-processor",
    port=8000
)

2. Frame Processor

Location: pytrickle/frame_processor.py

Purpose: Executes user-defined processing logic on frames

Key Features:

  • Async processing of video and audio frames
  • Model loading and initialization
  • Parameter updates during runtime
  • Resource cleanup on stream stop

3. Trickle Client

Location: pytrickle/client.py

Purpose: Handles communication with trickle servers

Key Features:

  • Subscribe to input streams
  • Publish to output streams
  • Automatic reconnection on failure
  • Frame queueing and flow control

4. Decoder/Encoder

Location: pytrickle/decoder.py, pytrickle/encoder.py

Purpose: Convert between raw media and frame objects

Key Features:

  • PyAV-based video/audio decoding and encoding
  • Multiple codec support
  • Automatic format detection
  • Hardware-accelerated encoding when available

Module Reference

cli.py

Command-line interface for scaffolding new applications. Provides pytrickle init and pytrickle list commands.

decorators.py

Decorator definitions for handler methods: @video_handler, @audio_handler, @model_loader, etc.

frames.py

Frame data structures: VideoFrame and AudioFrame with tensor/sample data and metadata.

api.py

Pydantic request/response models used by the HTTP server (e.g., StreamStartRequest, StreamParamsUpdateRequest).

protocol.py

Trickle protocol coordination: manages queues, ingress/egress loops, and monitoring events.

subscriber.py

TrickleSubscriber: receives segments via HTTP GET with background preconnect and retry.

publisher.py

TricklePublisher: streams encoded bytes via HTTP POST using an async queue-backed generator.

server.py

HTTP server (aiohttp) exposing routes: /stream/start, /stream/stop, /stream/params, /stream/status, /health.

manager.py

Stream session management and lifecycle coordination.

fps_meter.py

Frame rate measurement and throttling for performance optimization.

cache.py

Frame caching for efficient memory management.

state.py

Stream state management and status tracking.

utils/

Utility functions for media processing, tensor operations, and logging.

registry.py

Handler registry for managing decorated processing functions.

🎞️ Frame Objects

VideoFrame

Represents a single video frame with associated metadata.

Properties:

  • tensor - PyTorch tensor in (H, W, C) format
  • timestamp - Presentation timestamp (PTS, int)
  • time_base - Fraction giving the time scale for PTS

Methods:

  • replace_tensor(new_tensor) - Create new frame with different tensor
  • to_av_frame(tensor) - Convert a tensor to av.VideoFrame
# Access video frame data
tensor = frame.tensor  # Shape: (H, W, 3)

# Process on GPU
if torch.cuda.is_available():
    tensor = tensor.cuda()
    processed = my_model(tensor)
    
# Return processed frame
return frame.replace_tensor(processed)

AudioFrame

Represents audio samples with channel configuration.

Properties:

  • samples - NumPy array of audio samples
  • nb_samples - Number of samples in the frame
  • format - Sample format (e.g., s16, fltp)
  • rate - Sample rate (e.g., 48000)
  • layout - Channel layout (e.g., mono, stereo)
  • timestamp and time_base

Methods:

  • replace_samples(new_samples) - Create new frame with different samples
  • to_av_frame() - Convert samples back to av.AudioFrame
# Access audio frame data
samples = np.asarray(frame.samples)

# Process audio (e.g., apply filter)
processed = my_audio_filter(samples)

# Return processed frame
return [frame.replace_samples(processed)]

🎨 Decorator API

PyTrickle uses decorators to define pipeline behavior in a clean, declarative way.

@model_loader

Called once when stream starts. Use for:

  • Loading AI models
  • Initializing resources
  • Warming up GPU
@model_loader
async def load(self, **kwargs):
    self.model = load_my_model()
    self.model.eval()

@video_handler

Called for each video frame. Use for:

  • Applying filters
  • Running AI inference
  • Frame transformations
@video_handler
async def handle_video(self, frame):
    tensor = frame.tensor
    processed = self.model(tensor)
    return frame.replace_tensor(processed)

@audio_handler

Called for each audio frame. Use for:

  • Audio effects
  • Noise reduction
  • Speech processing
@audio_handler
async def handle_audio(self, frame):
    samples = frame.samples
    # Process audio
    return [frame]

@param_updater

Called when parameters are updated via API. Use for:

  • Real-time config changes
  • Tuning processing intensity
  • Enabling/disabling features
@param_updater
async def update_params(self, params):
    if "intensity" in params:
        self.intensity = params["intensity"]

@on_stream_stop

Called when stream stops. Use for:

  • Cleaning up resources
  • Saving state
  • Releasing GPU memory
@on_stream_stop
async def on_stop(self):
    del self.model
    torch.cuda.empty_cache()

@on_stream_start

Called when stream starts. Use for:

  • Initialisation of resource when the stream starts.
  • Initiating backfround tasks
@on_stream_start
async def on_start(self):
    await self._initialise_background_tasks()
    logger.info("Start request received")

Pipeline Creation Workflow

1. Scaffold Application

Start with a template that matches your use case:

# List available templates
pytrickle list

# Create new app from template
pytrickle init my_pipeline --template passthrough

2. Define Processing Logic

Implement your custom handlers:

class MyHandlers:
    def __init__(self):
        self.model = None
    
    @model_loader
    async def load(self, **kwargs):
        # Load your AI model
        self.model = YourModel()
        self.model.eval()
    
    @video_handler
    async def handle_video(self, frame: VideoFrame):
        # Process video frame
        tensor = frame.tensor
        output = self.model(tensor)
        return frame.replace_tensor(output)
    
    @audio_handler
    async def handle_audio(self, frame: AudioFrame):
        # Process audio frame (or pass through)
        return [frame]

3. Configure and Run

Set up the processor and start the server:

async def main():
    handlers = MyHandlers()
    processor = StreamProcessor.from_handlers(
        handlers,
        name="my-pipeline",
        port=8000
    )
    await processor.run_forever()

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

4. Control via HTTP API

Start processing and manage your pipeline: (you need to have a trickle server running on http://trickle:3389) with video source

# Start processing a stream
curl -X POST http://localhost:8000/stream/start \
  -H "Content-Type: application/json" \
  -d '{
    "subscribe_url": "http://trickle:3389/subscribe",
    "publish_url": "http://trickle:3389/publish",
    "gateway_request_id": "req-123",
    "params": {"width": 512, "height": 512, "max_framerate": 30}
  }'

# Update parameters in real-time (runtime-safe fields only)
curl -X POST http://localhost:8000/stream/params \
  -H "Content-Type: application/json" \
  -d '{"enabled": true}'

# Get current status
curl http://localhost:8000/stream/status

# Health check
curl http://localhost:8000/health

# Stop processing
curl -X POST http://localhost:8000/stream/stop

Common Use Cases

1. AI Video Enhancement

Apply ML models for super-resolution, denoising, or style transfer

@video_handler
async def handle_video(self, frame):
    tensor = frame.tensor.cuda()
    enhanced = self.super_resolution_model(tensor)
    return frame.replace_tensor(enhanced.cpu())

2. Object Detection Overlay

Detect objects and draw bounding boxes on video

@video_handler
async def handle_video(self, frame):
    detections = self.detector(frame.tensor)
    annotated = draw_boxes(frame.tensor, detections)
    return frame.replace_tensor(annotated)

3. Real-time Video Filters

Apply color grading, effects, or artistic filters

@video_handler
async def handle_video(self, frame):
    filtered = apply_color_grade(
        frame.tensor,
        intensity=self.config.intensity
    )
    return frame.replace_tensor(filtered)

4. Audio Processing

Noise reduction, echo cancellation, or voice enhancement

@audio_handler
async def handle_audio(self, frame):
    clean_samples = noise_reduction(
        frame.samples,
        noise_profile=self.noise_profile
    )
    return [frame.replace_samples(clean_samples)]

Next Steps