Skip to content

Video API Reference

Signals

VideoEnsemble

VideoEnsemble

Bases: BaseSignal

Ensemble of video deepfake detection signals. Combines RPPG, I3D, and LipSync using weighted fusion.

Example

from veridex.video import VideoEnsemble ensemble = VideoEnsemble() result = ensemble.run("video.mp4") print(f"Combined score: {result.score:.2f}")

Source code in veridex/video/ensemble.py
class VideoEnsemble(BaseSignal):
    """
    Ensemble of video deepfake detection signals.
    Combines RPPG, I3D, and LipSync using weighted fusion.

    Example:
        >>> from veridex.video import VideoEnsemble
        >>> ensemble = VideoEnsemble()
        >>> result = ensemble.run("video.mp4")
        >>> print(f"Combined score: {result.score:.2f}")
    """

    @property
    def name(self) -> str:
        return "video_ensemble"

    @property
    def dtype(self) -> str:
        return "video"

    def __init__(self, signals: Optional[List[BaseSignal]] = None):
        """
        Args:
            signals: List of signals to ensemble. Defaults to all three video signals.
        """
        if signals is None:
            # Import here to avoid circular dependency
            from veridex.video.rppg import RPPGSignal
            from veridex.video.i3d import I3DSignal
            from veridex.video.lipsync import LipSyncSignal

            self.signals = [
                RPPGSignal(),
                I3DSignal(),
                LipSyncSignal()
            ]
        else:
            self.signals = signals

    def check_dependencies(self) -> None:
        """Check dependencies for all signals."""
        for signal in self.signals:
            signal.check_dependencies()

    def run(self, input_data: str) -> DetectionResult:
        """
        Run all signals and fuse results using weighted average.

        Args:
            input_data: Path to video file

        Returns:
            DetectionResult with fused score and metadata from all signals
        """
        results = []

        for signal in self.signals:
            try:
                result = signal.run(input_data)
                # Only include successful results (no error)
                if result.error is None and result.confidence > 0:
                    results.append((signal.name, result))
            except Exception as e:
                # Log but continue with other signals
                warnings.warn(
                    f"{signal.name} failed: {e}. Continuing with other signals.",
                    UserWarning
                )

        if not results:
            return DetectionResult(
                score=0.5,
                confidence=0.0,
                error="All signals failed to produce valid results",
                metadata={"num_successful": 0, "num_total": len(self.signals)}
            )

        # Weighted average (confidence as weight)
        total_weight = sum(r.confidence for _, r in results)

        if total_weight == 0:
            # All zero confidence, use simple average
            avg_score = float(np.mean([r.score for _, r in results]))
            avg_conf = 0.0
        else:
            avg_score = sum(r.score * r.confidence for _, r in results) / total_weight
            avg_conf = total_weight / len(self.signals)  # Normalize by total signals

        # Build metadata
        individual_results = {}
        for sig_name, result in results:
            individual_results[sig_name] = {
                "score": float(result.score),
                "confidence": float(result.confidence),
                "metadata": result.metadata
            }

        return DetectionResult(
            score=float(avg_score),
            confidence=float(avg_conf),
            metadata={
                "individual_results": individual_results,
                "num_successful": len(results),
                "num_total": len(self.signals),
                "fusion_method": "weighted_average"
            }
        )

__init__(signals=None)

Parameters:

Name Type Description Default
signals Optional[List[BaseSignal]]

List of signals to ensemble. Defaults to all three video signals.

None
Source code in veridex/video/ensemble.py
def __init__(self, signals: Optional[List[BaseSignal]] = None):
    """
    Args:
        signals: List of signals to ensemble. Defaults to all three video signals.
    """
    if signals is None:
        # Import here to avoid circular dependency
        from veridex.video.rppg import RPPGSignal
        from veridex.video.i3d import I3DSignal
        from veridex.video.lipsync import LipSyncSignal

        self.signals = [
            RPPGSignal(),
            I3DSignal(),
            LipSyncSignal()
        ]
    else:
        self.signals = signals

check_dependencies()

Check dependencies for all signals.

Source code in veridex/video/ensemble.py
def check_dependencies(self) -> None:
    """Check dependencies for all signals."""
    for signal in self.signals:
        signal.check_dependencies()

run(input_data)

Run all signals and fuse results using weighted average.

Parameters:

Name Type Description Default
input_data str

Path to video file

required

Returns:

Type Description
DetectionResult

DetectionResult with fused score and metadata from all signals

Source code in veridex/video/ensemble.py
def run(self, input_data: str) -> DetectionResult:
    """
    Run all signals and fuse results using weighted average.

    Args:
        input_data: Path to video file

    Returns:
        DetectionResult with fused score and metadata from all signals
    """
    results = []

    for signal in self.signals:
        try:
            result = signal.run(input_data)
            # Only include successful results (no error)
            if result.error is None and result.confidence > 0:
                results.append((signal.name, result))
        except Exception as e:
            # Log but continue with other signals
            warnings.warn(
                f"{signal.name} failed: {e}. Continuing with other signals.",
                UserWarning
            )

    if not results:
        return DetectionResult(
            score=0.5,
            confidence=0.0,
            error="All signals failed to produce valid results",
            metadata={"num_successful": 0, "num_total": len(self.signals)}
        )

    # Weighted average (confidence as weight)
    total_weight = sum(r.confidence for _, r in results)

    if total_weight == 0:
        # All zero confidence, use simple average
        avg_score = float(np.mean([r.score for _, r in results]))
        avg_conf = 0.0
    else:
        avg_score = sum(r.score * r.confidence for _, r in results) / total_weight
        avg_conf = total_weight / len(self.signals)  # Normalize by total signals

    # Build metadata
    individual_results = {}
    for sig_name, result in results:
        individual_results[sig_name] = {
            "score": float(result.score),
            "confidence": float(result.confidence),
            "metadata": result.metadata
        }

    return DetectionResult(
        score=float(avg_score),
        confidence=float(avg_conf),
        metadata={
            "individual_results": individual_results,
            "num_successful": len(results),
            "num_total": len(self.signals),
            "fusion_method": "weighted_average"
        }
    )

RPPGSignal

RPPGSignal

Bases: BaseSignal

Detects Deepfakes by analyzing the rPPG (Remote Photoplethysmography) signal. Real humans have a heartbeat (0.7-4Hz). Deepfakes often lack this or have noise.

Source code in veridex/video/rppg.py
class RPPGSignal(BaseSignal):
    """
    Detects Deepfakes by analyzing the rPPG (Remote Photoplethysmography) signal.
    Real humans have a heartbeat (0.7-4Hz). Deepfakes often lack this or have noise.
    """

    @property
    def name(self) -> str:
        return "rppg_physnet"

    @property
    def dtype(self) -> str:
        return "video"

    def check_dependencies(self) -> None:
        try:
            import torch
            import cv2
            import scipy.signal
        except ImportError:
            raise ImportError("RPPGSignal requires 'torch', 'opencv-python-headless', and 'scipy'. Install veridex[video].")

    def run(self, input_data: str) -> DetectionResult:
        """
        Args:
            input_data: Path to video file.
        """
        self.check_dependencies()

        try:
            frames = self._load_video_frames(input_data, max_frames=300) # Analyze ~10 sec
            if frames is None or len(frames) < 30:
                return DetectionResult(
                    score=0.5,
                    confidence=0.0,
                    metadata={"error": "Video too short or unreadable"},
                    error="Video read error"
                )

            # Detect and Crop Face (Track first face found)
            faces = self._detect_faces(frames)

            # Check if faces is empty or malformed
            if faces is None or len(faces) == 0 or (isinstance(faces, np.ndarray) and faces.size == 0):
                 return DetectionResult(
                    score=0.5,
                    confidence=0.0,
                    metadata={"error": "No face detected"},
                    error="No face detected"
                )

            # Extract BVP Signal - track if using trained weights
            bvp_signal, weights_loaded = self._extract_signal(faces)

            # Analyze PSD
            fake_prob, meta = self._analyze_psd(bvp_signal)

            # Calculate confidence based on signal quality and model training status
            peak_ratio = meta.get("peak_ratio", 0.0)
            snr = meta.get("snr", 0.0)

            # Base confidence from signal quality
            if peak_ratio > 4.0:
                signal_confidence = 0.85  # Very strong periodic signal
            elif peak_ratio > 3.0:
                signal_confidence = 0.75
            elif peak_ratio > 2.0:
                signal_confidence = 0.65
            elif peak_ratio > 1.5:
                signal_confidence = 0.50
            else:
                signal_confidence = 0.35  # Weak/noisy signal

            # Adjust by model training status
            if weights_loaded:
                # Trained model: use higher portion of signal confidence
                confidence = signal_confidence
            else:
                # Untrained model: significantly reduce confidence
                confidence = min(signal_confidence * 0.3, 0.4)  # Cap at 0.4 for untrained

            return DetectionResult(
                score=fake_prob,
                confidence=confidence,
                metadata={**meta, "model_trained": weights_loaded}
            )

        except Exception as e:
            return DetectionResult(score=0.0, confidence=0.0, error=str(e))

    def _load_video_frames(self, path: str, max_frames: int = 300) -> np.ndarray:
        import cv2
        cap = cv2.VideoCapture(path)
        frames = []
        count = 0
        while cap.isOpened() and count < max_frames:
            ret, frame = cap.read()
            if not ret:
                break
            # Convert BGR to RGB
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frames.append(frame)
            count += 1
        cap.release()
        return np.array(frames)

    def _detect_faces(self, frames: np.ndarray) -> List[np.ndarray]:
        from veridex.video.processing import FaceDetector
        detector = FaceDetector()

        # Use the built-in tracking method for temporal consistency
        # Convert np.ndarray frames (T, H, W, C) to list for the detector
        frame_list = [f for f in frames]
        roi_frames = detector.track_faces(frame_list, size=(128, 128))

        return roi_frames # (T, 128, 128, 3)

    def _extract_signal(self, face_frames: np.ndarray) -> tuple[np.ndarray, bool]:
        """Extract BV signal and return whether trained weights were loaded."""
        import torch
        from veridex.video.models.physnet import PhysNet

        # Prepare for model
        tensor = torch.from_numpy(face_frames).float() / 255.0
        tensor = tensor.permute(3, 0, 1, 2) # (C, T, H, W)
        tensor = tensor.unsqueeze(0) # (1, C, T, H, W)

        model = PhysNet()
        model.eval()

        # Load weights from centralized config
        from veridex.video.weights import get_weight_config

        weight_config = get_weight_config('physnet')
        weights_url = weight_config['url']
        weights_path = os.path.join(get_cache_dir(), weight_config['filename'])
        sha256 = weight_config.get('sha256')

        weights_loaded = False
        if not os.path.exists(weights_path):
            try:
                download_file(weights_url, weights_path)
            except Exception:
                pass  # Silently fail, final warning below will inform user

        if os.path.exists(weights_path):
             try:
                model.load_state_dict(torch.load(weights_path, map_location='cpu'))
                logger.info(f"✓ Loaded PhysNet weights from {weights_path}")
                weights_loaded = True
             except Exception:
                pass  # Silently fail, final warning below will inform user

        if not weights_loaded:
            warnings.warn(
                "⚠ RPPGSignal is using untrained weights. Predictions are random.\n"
                "For production use, download real PhysNet weights.",
                UserWarning,
                stacklevel=2
            )

        with torch.no_grad():
             # Process in chunks if T is too large, but PhysNet is T-conv.
             # T=300 might be big for memory.
             # Let's use a sliding window or just crop to T=128 (approx 4 sec at 30fps)
             T = tensor.shape[2]
             if T > 128:
                 tensor = tensor[:, :, :128, :, :]

             signal = model(tensor) # (1, T)

        return signal.squeeze().numpy(), weights_loaded

    def _analyze_psd(self, signal: np.ndarray) -> Tuple[float, Dict[str, Any]]:
        from scipy import signal as scipy_signal

        # Detrend
        signal = scipy_signal.detrend(signal)

        # PSD
        fs = 30.0 # Assumed FPS
        freqs, psd = scipy_signal.periodogram(signal, fs)

        # ROI: 0.7 Hz (42 BPM) to 4.0 Hz (240 BPM)
        mask = (freqs >= 0.7) & (freqs <= 4.0)
        roi_power = np.trapz(psd[mask], freqs[mask])
        total_power = np.trapz(psd, freqs)

        snr = roi_power / (total_power + 1e-6)

        # Peak analysis
        # If real, there should be a dominant peak in ROI.
        roi_psd = psd[mask]
        if len(roi_psd) > 0:
            peak_power = np.max(roi_psd)
            peak_ratio = peak_power / (np.mean(roi_psd) + 1e-6)
        else:
            peak_ratio = 0.0

        # Scoring:
        # High SNR + High Peak Ratio -> Human (Score 0)
        # Low SNR or Flat -> Fake (Score 1)

        # Let's verify assumptions. Deepfakes have "flat line (noise) or random peaks".
        # If noise -> SNR might be low (energy spread) or high (if random sine wave).
        # Usually, deepfakes have *temporal smoothing*, so the signal is suppressed.
        # So low energy in physiological band relative to DC/LF? Detrend removes DC.

        # Simplified logic:
        # If peak_ratio is high (> 3.0), likely periodic heart beat -> Human.
        # Map peak_ratio 3.0 -> Score 0.0. peak_ratio 1.0 -> Score 1.0.

        # Invert logic for "Fake Prob"
        # score = 1.0 - sigmoid(peak_ratio - threshold)

        score = 1.0 / (1.0 + np.exp(peak_ratio - 2.5)) # Soft threshold around 2.5

        metadata = {
            "snr": float(snr),
            "peak_ratio": float(peak_ratio),
            "dominant_freq": float(freqs[mask][np.argmax(roi_psd)]) if len(roi_psd) > 0 else 0.0
        }

        return float(score), metadata

run(input_data)

Parameters:

Name Type Description Default
input_data str

Path to video file.

required
Source code in veridex/video/rppg.py
def run(self, input_data: str) -> DetectionResult:
    """
    Args:
        input_data: Path to video file.
    """
    self.check_dependencies()

    try:
        frames = self._load_video_frames(input_data, max_frames=300) # Analyze ~10 sec
        if frames is None or len(frames) < 30:
            return DetectionResult(
                score=0.5,
                confidence=0.0,
                metadata={"error": "Video too short or unreadable"},
                error="Video read error"
            )

        # Detect and Crop Face (Track first face found)
        faces = self._detect_faces(frames)

        # Check if faces is empty or malformed
        if faces is None or len(faces) == 0 or (isinstance(faces, np.ndarray) and faces.size == 0):
             return DetectionResult(
                score=0.5,
                confidence=0.0,
                metadata={"error": "No face detected"},
                error="No face detected"
            )

        # Extract BVP Signal - track if using trained weights
        bvp_signal, weights_loaded = self._extract_signal(faces)

        # Analyze PSD
        fake_prob, meta = self._analyze_psd(bvp_signal)

        # Calculate confidence based on signal quality and model training status
        peak_ratio = meta.get("peak_ratio", 0.0)
        snr = meta.get("snr", 0.0)

        # Base confidence from signal quality
        if peak_ratio > 4.0:
            signal_confidence = 0.85  # Very strong periodic signal
        elif peak_ratio > 3.0:
            signal_confidence = 0.75
        elif peak_ratio > 2.0:
            signal_confidence = 0.65
        elif peak_ratio > 1.5:
            signal_confidence = 0.50
        else:
            signal_confidence = 0.35  # Weak/noisy signal

        # Adjust by model training status
        if weights_loaded:
            # Trained model: use higher portion of signal confidence
            confidence = signal_confidence
        else:
            # Untrained model: significantly reduce confidence
            confidence = min(signal_confidence * 0.3, 0.4)  # Cap at 0.4 for untrained

        return DetectionResult(
            score=fake_prob,
            confidence=confidence,
            metadata={**meta, "model_trained": weights_loaded}
        )

    except Exception as e:
        return DetectionResult(score=0.0, confidence=0.0, error=str(e))

I3DSignal

I3DSignal

Bases: BaseSignal

Detects Deepfakes using Spatiotemporal features (I3D).

Source code in veridex/video/i3d.py
class I3DSignal(BaseSignal):
    """
    Detects Deepfakes using Spatiotemporal features (I3D).
    """

    @property
    def name(self) -> str:
        return "spatiotemporal_i3d"

    @property
    def dtype(self) -> str:
        return "video"

    def check_dependencies(self) -> None:
        try:
            import torch
            import cv2
        except ImportError:
            raise ImportError("I3DSignal requires 'torch' and 'opencv-python-headless'. Install veridex[video].")

    def run(self, input_data: str) -> DetectionResult:
        self.check_dependencies()
        try:
            # 1. Load Video Clip (Fixed size for I3D, e.g., 64 frames)
            clip = self._load_clip(input_data, frames_needed=64)
            if clip is None:
                return DetectionResult(score=0.5, confidence=0.0, error="Video too short")

            # 2. Run Inference
            score, weights_loaded = self._run_inference(clip)

            # 3. Calculate confidence based on model certainty and training status
            # Distance from 0.5 (uncertainty point) indicates model confidence
            distance_from_uncertain = abs(score - 0.5)

            # Map distance to model confidence
            # Distance 0.5 (max, score at 0 or 1) -> very confident
            # Distance 0.0 (score at 0.5) -> very uncertain
            model_confidence = min(distance_from_uncertain * 2, 1.0)  # Scale to [0, 1]

            # Boost base confidence for I3D (sophisticated spatiotemporal model)
            if model_confidence > 0.7:
                base_confidence = 0.90
            elif model_confidence > 0.5:
                base_confidence = 0.85
            elif model_confidence > 0.3:
                base_confidence = 0.75
            else:
                base_confidence = 0.65

            # Adjust by training status
            if weights_loaded:
                confidence = base_confidence
            else:
                # Untrained weights: very low confidence
                confidence = min(base_confidence * 0.15, 0.25)

            return DetectionResult(
                score=score,
                confidence=confidence,
                metadata={
                    "frames": 64,
                    "model_confidence": model_confidence,
                    "model_trained": weights_loaded
                }
            )

        except Exception as e:
            return DetectionResult(score=0.0, confidence=0.0, error=str(e))

    def _load_clip(self, path: str, frames_needed: int) -> Optional[np.ndarray]:
        import cv2
        cap = cv2.VideoCapture(path)
        frames = []
        while cap.isOpened() and len(frames) < frames_needed:
            ret, frame = cap.read()
            if not ret:
                break
            frame = cv2.resize(frame, (224, 224))
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frames.append(frame)
        cap.release()

        if len(frames) < frames_needed:
            # Pad or fail
            return None

        return np.array(frames) # (T, H, W, C)

    def _run_inference(self, clip: np.ndarray) -> tuple[float, bool]:
        """Run I3D inference and return score and whether trained weights were loaded."""
        import torch
        from veridex.video.models.i3d import InceptionI3D

        # Preprocess
        tensor = torch.from_numpy(clip).float() / 255.0 * 2 - 1 # [-1, 1]
        tensor = tensor.permute(3, 0, 1, 2) # (C, T, H, W)
        tensor = tensor.unsqueeze(0) # (1, C, T, H, W)

        model = InceptionI3D(num_classes=1)
        model.eval()

        # Load weights from centralized config
        from veridex.utils.downloads import get_cache_dir, download_file
        from veridex.video.weights import get_weight_config

        weight_config = get_weight_config('i3d')
        weights_url = weight_config['url']
        weights_path = os.path.join(get_cache_dir(), weight_config['filename'])
        sha256 = weight_config.get('sha256')

        weights_loaded = False
        if not os.path.exists(weights_path):
            try:
                download_file(weights_url, weights_path)
            except Exception:
                pass  # Silently fail, final warning below will inform user

        if os.path.exists(weights_path):
             try:
                model.load_state_dict(torch.load(weights_path, map_location='cpu'))
                logger.info(f"✓ Loaded I3D weights from {weights_path}")
                weights_loaded = True
             except Exception:
                pass  # Silently fail, final warning below will inform user

        if not weights_loaded:
            warnings.warn(
                "⚠ I3DSignal is using untrained weights. Predictions are random.\\n"
                "For production use, download real I3D weights trained on Kinetics-400.",
                UserWarning,
                stacklevel=2
            )

        with torch.no_grad():
            logits = model(tensor) # (1, 1, T_out)
            # Average over time dimension
            logit = logits.mean()
            prob = torch.sigmoid(logit).item()

        return prob, weights_loaded

LipSyncSignal

LipSyncSignal

Bases: BaseSignal

Detects Deepfakes by checking Audio-Visual Synchronization (Lip-Sync). Uses SyncNet logic.

Source code in veridex/video/lipsync.py
class LipSyncSignal(BaseSignal):
    """
    Detects Deepfakes by checking Audio-Visual Synchronization (Lip-Sync).
    Uses SyncNet logic.
    """

    @property
    def name(self) -> str:
        return "lipsync_wav2lip"

    @property
    def dtype(self) -> str:
        return "video"

    def check_dependencies(self) -> None:
        try:
            import torch
            import cv2
            import librosa
        except ImportError:
            raise ImportError("LipSyncSignal requires 'torch', 'opencv', and 'librosa'. Install veridex[video].")

    def run(self, input_data: str) -> DetectionResult:
        self.check_dependencies()
        try:
            # 1. Load Audio and Video segments
            # For robustness, we check the AV offset on multiple random 0.2s clips

            offsets = []
            weights_loaded_flags = []
            for _ in range(3): # Check 3 segments
                offset, weights_loaded = self._calculate_av_offset(input_data)
                if offset is not None:
                    offsets.append(offset)
                    weights_loaded_flags.append(weights_loaded)

            if not offsets:
                 return DetectionResult(score=0.5, confidence=0.0, error="Could not extract AV segments")

            avg_offset = sum(offsets) / len(offsets)
            offset_variance = np.var(offsets) if len(offsets) > 1 else 0.0
            any_weights_loaded = any(weights_loaded_flags)

            # Metric:
            # Offset is Euclidean distance between Audio and Video embeddings.
            # Small distance -> Sync -> Real.
            # Large distance -> Out of Sync -> Fake.
            # Real < 0.8 (heuristic threshold).

            score = 0.0
            threshold = 0.8
            if avg_offset > threshold:
                # Map distance to probability.
                score = min((avg_offset - threshold) / 1.0, 1.0)

            # Calculate confidence from measurement consistency and model status
            # Low variance in offsets = consistent measurement = high confidence
            if offset_variance < 0.05:
                measurement_confidence = 0.85  # Very consistent
            elif offset_variance < 0.1:
                measurement_confidence = 0.75
            elif offset_variance < 0.2:
                measurement_confidence = 0.65
            else:
                measurement_confidence = 0.45  # High variance, less reliable

            # Adjust by model training status
            if any_weights_loaded:
                confidence = measurement_confidence
            else:
                # Untrained model: reduce confidence significantly
                confidence = min(measurement_confidence * 0.35, 0.4)

            return DetectionResult(
                score=score,
                confidence=confidence,
                metadata={
                    "av_distance": avg_offset,
                    "offset_variance": offset_variance,
                    "num_segments": len(offsets),
                    "model_trained": any_weights_loaded
                }
            )

        except Exception as e:
            return DetectionResult(score=0.0, confidence=0.0, error=str(e))

    def _calculate_av_offset(self, path: str) -> tuple[Optional[float], bool]:
        """Calculate AV offset and return whether trained weights were loaded."""
        import torch
        import librosa
        import cv2
        from veridex.video.models.syncnet import SyncNet
        from veridex.video.processing import FaceDetector
        from veridex.utils.downloads import download_file, get_cache_dir

        # 1. Load Audio (0.2s segment)
        try:
            y, sr = librosa.load(path, sr=16000)
        except Exception:
            return None, False

        if len(y) < 16000: # Need at least 1 sec to find a good chunk
            return None, False

        # Pick a random start point
        import random
        start_sec = random.uniform(0, len(y)/sr - 0.3)
        start_sample = int(start_sec * sr)
        # 0.2s duration for SyncNet
        duration_samples = int(0.2 * sr)
        audio_chunk = y[start_sample : start_sample + duration_samples]

        # MFCC: 13 coeffs, window 25ms, hop 10ms
        # SyncNet expects specific MFCC shape.
        # (1, 1, 13, 20) -> 13 MFCCs over 20 timesteps (20*10ms = 200ms)
        mfcc = librosa.feature.mfcc(y=audio_chunk, sr=sr, n_mfcc=13, n_fft=400, hop_length=160)
        if mfcc.shape[1] < 20:
             mfcc = np.pad(mfcc, ((0,0), (0, 20-mfcc.shape[1])))
        mfcc = mfcc[:, :20]

        # 2. Load Video (5 frames corresponding to that 0.2s)
        # 0.2s at 25fps = 5 frames.
        cap = cv2.VideoCapture(path)
        fps = cap.get(cv2.CAP_PROP_FPS)
        if fps <= 0: fps = 25

        start_frame = int(start_sec * fps)
        cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)

        frames = []
        for _ in range(5):
            ret, frame = cap.read()
            if not ret: break
            frames.append(frame)
        cap.release()

        if len(frames) < 5:
            return None, False

        # 3. Detect and Crop Mouth
        # Simplified: Detect face, take lower half.
        detector = FaceDetector()
        face_crops = []
        for frame in frames:
            dets = detector.detect(frame)
            if not dets:
                # Fallback: center crop? Or just fail this segment
                return None, False

            # Largest face
            face = max(dets, key=lambda b: b[2] * b[3])
            x, y, w, h = face

            # Mouth region approximation (lower half of face)
            mouth_y = y + h // 2
            mouth_h = h // 2

            mouth_crop = detector.extract_face(frame, (x, mouth_y, w, mouth_h), size=(112, 112))
            face_crops.append(mouth_crop)

        # Stack frames
        # Input: (B, 15, 112, 112). 15 channels = 5 frames * 3 colors.
        # face_crops: 5 * (112, 112, 3)
        video_tensor = np.concatenate(face_crops, axis=2) # (112, 112, 15)

        # To Torch
        audio_t = torch.from_numpy(mfcc).float().unsqueeze(0).unsqueeze(0) # (1, 1, 13, 20)
        video_t = torch.from_numpy(video_tensor).float().permute(2, 0, 1).unsqueeze(0) # (1, 15, 112, 112)

        # 4. Inference
        model = SyncNet()
        model.eval()

        # Load weights from centralized config
        from veridex.video.weights import get_weight_config

        weight_config = get_weight_config('syncnet')
        weights_url = weight_config['url']
        weights_path = os.path.join(get_cache_dir(), weight_config['filename'])
        sha256 = weight_config.get('sha256')

        weights_loaded = False
        if not os.path.exists(weights_path):
            try:
                download_file(weights_url, weights_path)
            except Exception:
                pass  # Silently fail, final warning below will inform user

        if os.path.exists(weights_path):
             try:
                # Note: Official weights might be LuaTorch or different format.
                # This assumes a PyTorch converted version or compatible dict.
                model.load_state_dict(torch.load(weights_path, map_location='cpu'))
                logger.info(f"✓ Loaded SyncNet weights from {weights_path}")
                weights_loaded = True
             except Exception:
                pass  # Silently fail, final warning below will inform user

        if not weights_loaded:
            warnings.warn(
                "⚠ LipSyncSignal is using untrained weights. Predictions are random.\n"
                "For production use, download real SyncNet weights from VGG.",
                UserWarning,
                stacklevel=2
            )

        with torch.no_grad():
            a_emb, v_emb = model(audio_t, video_t)
            dist = torch.norm(a_emb - v_emb, p=2, dim=1).item()

        return dist, weights_loaded

Utilities

FaceDetector

FaceDetector

Multi-backend face detector with automatic fallback.

Backends (in order of accuracy): 1. MediaPipe (best, requires mediapipe package) 2. Haar Cascades (fast, less accurate)

Parameters:

Name Type Description Default
backend FaceBackend

'auto' (try MediaPipe then Haar), 'mediapipe', or 'haar'

'auto'
Source code in veridex/video/processing.py
class FaceDetector:
    """
    Multi-backend face detector with automatic fallback.

    Backends (in order of accuracy):
    1. MediaPipe (best, requires mediapipe package)
    2. Haar Cascades (fast, less accurate)

    Args:
        backend: 'auto' (try MediaPipe then Haar), 'mediapipe', or 'haar'
    """
    def __init__(self, backend: FaceBackend = 'auto'):
        try:
            import cv2
            self.cv2 = cv2
        except ImportError:
            raise ImportError("FaceDetector requires 'opencv-python-headless'. Please install veridex[video].")

        self.backend = backend

        if backend == 'auto':
            # Try MediaPipe first, fallback to Haar
            try:
                import mediapipe as mp
                self.backend = 'mediapipe'
                self._init_mediapipe()
            except (ImportError, AttributeError):
                warnings.warn(
                    "MediaPipe not installed or broken. Using Haar Cascades (lower accuracy).\n"
                    "For better face detection: pip install mediapipe",
                    UserWarning
                )
                self.backend = 'haar'
                self._init_haar()
        elif backend == 'mediapipe':
            self._init_mediapipe()
        elif backend == 'haar':
            self._init_haar()
        else:
            raise ValueError(f"Unknown backend: {backend}. Use 'auto', 'mediapipe', or 'haar'")

    def _init_mediapipe(self):
        """Initialize MediaPipe Face Detection."""
        import mediapipe as mp
        self.mp_face_detection = mp.solutions.face_detection
        self.detector = self.mp_face_detection.FaceDetection(
            model_selection=1,  # 0=short range (<2m), 1=full range
            min_detection_confidence=0.5
        )

    def _init_haar(self):
        """Initialize Haar Cascade Face Detection."""
        cascade_path = self.cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
        self.detector = self.cv2.CascadeClassifier(cascade_path)

    def detect(self, frame: np.ndarray) -> List[Tuple[int, int, int, int]]:
        """
        Detect faces in a frame.

        Args:
            frame: RGB or BGR numpy array (OpenCV uses BGR).

        Returns:
            List of (x, y, w, h) tuples.
        """
        if self.backend == 'mediapipe':
            return self._detect_mediapipe(frame)
        else:
            return self._detect_haar(frame)

    def _detect_mediapipe(self, frame: np.ndarray) -> List[Tuple[int, int, int, int]]:
        """Detect faces using MediaPipe."""
        # MediaPipe expects RGB
        if len(frame.shape) == 3 and frame.shape[2] == 3:
            # Assume BGR from OpenCV, convert to RGB
            frame_rgb = self.cv2.cvtColor(frame, self.cv2.COLOR_BGR2RGB)
        else:
            frame_rgb = frame

        results = self.detector.process(frame_rgb)

        if not results.detections:
            return []

        h, w = frame.shape[:2]
        bboxes = []
        for detection in results.detections:
            bbox = detection.location_data.relative_bounding_box
            x = int(bbox.xmin * w)
            y = int(bbox.ymin * h)
            box_w = int(bbox.width * w)
            box_h = int(bbox.height * h)
            bboxes.append((x, y, box_w, box_h))

        return bboxes

    def _detect_haar(self, frame: np.ndarray) -> List[Tuple[int, int, int, int]]:
        """Detect faces using Haar Cascades."""
        gray = self.cv2.cvtColor(frame, self.cv2.COLOR_BGR2GRAY)
        faces = self.detector.detectMultiScale(
            gray,
            scaleFactor=1.1,
            minNeighbors=5,
            minSize=(30, 30),
            flags=self.cv2.CASCADE_SCALE_IMAGE
        )
        return [tuple(f) for f in faces]

    def extract_face(self, frame: np.ndarray, bbox: Tuple[int, int, int, int], size: Tuple[int, int] = (128, 128)) -> np.ndarray:
        """
        Extract and resize the face ROI.
        """
        x, y, w, h = bbox
        # Ensure bounds
        h_img, w_img = frame.shape[:2]
        x = max(0, x)
        y = max(0, y)
        w = min(w, w_img - x)
        h = min(h, h_img - y)

        face = frame[y:y+h, x:x+w]
        if face.size == 0 or w == 0 or h == 0:
            return np.zeros((size[1], size[0], 3), dtype=frame.dtype)
        return self.cv2.resize(face, size)

    def track_faces(self, frames: List[np.ndarray], size: Tuple[int, int] = (128, 128)) -> np.ndarray:
        """
        Track and extract a single face across a sequence of frames.
        Uses simple IoU tracking and 'largest face' initialization.
        """
        roi_frames = []
        if not frames:
            return np.array(roi_frames)

        # 1. Init on first frame
        current_bbox = None

        # Try finding face in first few frames if missing in 0
        for i, frame in enumerate(frames):
            dets = self.detect(frame)
            if dets:
                # Pick largest
                current_bbox = max(dets, key=lambda b: b[2] * b[3])
                break

        if current_bbox is None:
            # No face found in entire video (or start)
            # Return zeros
            return np.zeros((len(frames), size[1], size[0], 3), dtype=np.uint8)

        # Backfill missing start
        for _ in range(i):
             roi_frames.append(self.extract_face(frames[0], current_bbox, size))

        # 2. Track
        for frame in frames[i:]:
            dets = self.detect(frame)
            if not dets:
                # Lost detection, keep previous bbox
                pass 
            else:
                # Find bbox with best overlap (IoU) with current_bbox
                best_iou = -1.0
                best_box = None

                cx, cy, cw, ch = current_bbox
                c_area = cw * ch

                for box in dets:
                    bx, by, bw, bh = box
                    # IoU calc
                    ix = max(cx, bx)
                    iy = max(cy, by)
                    iw = min(cx+cw, bx+bw) - ix
                    ih = min(cy+ch, by+bh) - iy

                    if iw > 0 and ih > 0:
                        inter = iw * ih
                        union = c_area + (bw * bh) - inter
                        iou = inter / union
                        if iou > best_iou:
                            best_iou = iou
                            best_box = box

                if best_box is not None and best_iou > 0.1: # Threshold for tracking drift
                    current_bbox = best_box
                else:
                    # If all detections are far away, it might be a new face or false positive.
                    # For RPPG we usually want to STICK to the subject.
                    # Keep previous bbox.
                    pass

            # Extract
            roi_frames.append(self.extract_face(frame, current_bbox, size))

        return np.array(roi_frames)

detect(frame)

Detect faces in a frame.

Parameters:

Name Type Description Default
frame ndarray

RGB or BGR numpy array (OpenCV uses BGR).

required

Returns:

Type Description
List[Tuple[int, int, int, int]]

List of (x, y, w, h) tuples.

Source code in veridex/video/processing.py
def detect(self, frame: np.ndarray) -> List[Tuple[int, int, int, int]]:
    """
    Detect faces in a frame.

    Args:
        frame: RGB or BGR numpy array (OpenCV uses BGR).

    Returns:
        List of (x, y, w, h) tuples.
    """
    if self.backend == 'mediapipe':
        return self._detect_mediapipe(frame)
    else:
        return self._detect_haar(frame)

extract_face(frame, bbox, size=(128, 128))

Extract and resize the face ROI.

Source code in veridex/video/processing.py
def extract_face(self, frame: np.ndarray, bbox: Tuple[int, int, int, int], size: Tuple[int, int] = (128, 128)) -> np.ndarray:
    """
    Extract and resize the face ROI.
    """
    x, y, w, h = bbox
    # Ensure bounds
    h_img, w_img = frame.shape[:2]
    x = max(0, x)
    y = max(0, y)
    w = min(w, w_img - x)
    h = min(h, h_img - y)

    face = frame[y:y+h, x:x+w]
    if face.size == 0 or w == 0 or h == 0:
        return np.zeros((size[1], size[0], 3), dtype=frame.dtype)
    return self.cv2.resize(face, size)

track_faces(frames, size=(128, 128))

Track and extract a single face across a sequence of frames. Uses simple IoU tracking and 'largest face' initialization.

Source code in veridex/video/processing.py
def track_faces(self, frames: List[np.ndarray], size: Tuple[int, int] = (128, 128)) -> np.ndarray:
    """
    Track and extract a single face across a sequence of frames.
    Uses simple IoU tracking and 'largest face' initialization.
    """
    roi_frames = []
    if not frames:
        return np.array(roi_frames)

    # 1. Init on first frame
    current_bbox = None

    # Try finding face in first few frames if missing in 0
    for i, frame in enumerate(frames):
        dets = self.detect(frame)
        if dets:
            # Pick largest
            current_bbox = max(dets, key=lambda b: b[2] * b[3])
            break

    if current_bbox is None:
        # No face found in entire video (or start)
        # Return zeros
        return np.zeros((len(frames), size[1], size[0], 3), dtype=np.uint8)

    # Backfill missing start
    for _ in range(i):
         roi_frames.append(self.extract_face(frames[0], current_bbox, size))

    # 2. Track
    for frame in frames[i:]:
        dets = self.detect(frame)
        if not dets:
            # Lost detection, keep previous bbox
            pass 
        else:
            # Find bbox with best overlap (IoU) with current_bbox
            best_iou = -1.0
            best_box = None

            cx, cy, cw, ch = current_bbox
            c_area = cw * ch

            for box in dets:
                bx, by, bw, bh = box
                # IoU calc
                ix = max(cx, bx)
                iy = max(cy, by)
                iw = min(cx+cw, bx+bw) - ix
                ih = min(cy+ch, by+bh) - iy

                if iw > 0 and ih > 0:
                    inter = iw * ih
                    union = c_area + (bw * bh) - inter
                    iou = inter / union
                    if iou > best_iou:
                        best_iou = iou
                        best_box = box

            if best_box is not None and best_iou > 0.1: # Threshold for tracking drift
                current_bbox = best_box
            else:
                # If all detections are far away, it might be a new face or false positive.
                # For RPPG we usually want to STICK to the subject.
                # Keep previous bbox.
                pass

        # Extract
        roi_frames.append(self.extract_face(frame, current_bbox, size))

    return np.array(roi_frames)

Weight Configuration

get_weight_config(model_name)

Get weight configuration for a model.

Checks environment variables first, then falls back to defaults.

Parameters:

Name Type Description Default
model_name str

One of 'physnet', 'i3d', 'syncnet'

required

Returns:

Type Description
dict

Dict with 'url', 'filename', 'sha256'

Example

config = get_weight_config('physnet') print(config['url'])

Source code in veridex/video/weights.py
def get_weight_config(model_name: str) -> dict:
    """
    Get weight configuration for a model.

    Checks environment variables first, then falls back to defaults.

    Args:
        model_name: One of 'physnet', 'i3d', 'syncnet'

    Returns:
        Dict with 'url', 'filename', 'sha256'

    Example:
        >>> config = get_weight_config('physnet')
        >>> print(config['url'])
    """
    if model_name not in DEFAULT_WEIGHTS:
        raise ValueError(f"Unknown model: {model_name}. Use one of {list(DEFAULT_WEIGHTS.keys())}")

    config = DEFAULT_WEIGHTS[model_name].copy()

    # Check for environment variable override
    env_var = f"VERIDEX_{model_name.upper()}_URL"
    if env_var in os.environ:
        config['url'] = os.environ[env_var]

    return config

set_weight_url(model_name, url, sha256=None)

Programmatically override weight URL.

Parameters:

Name Type Description Default
model_name str

One of 'physnet', 'i3d', 'syncnet'

required
url str

New URL to use

required
sha256 Optional[str]

Optional SHA256 checksum

None
Example

from veridex.video.weights import set_weight_url set_weight_url('physnet', 'https://my-server.com/physnet.pth')

Source code in veridex/video/weights.py
def set_weight_url(model_name: str, url: str, sha256: Optional[str] = None):
    """
    Programmatically override weight URL.

    Args:
        model_name: One of 'physnet', 'i3d', 'syncnet'
        url: New URL to use
        sha256: Optional SHA256 checksum

    Example:
        >>> from veridex.video.weights import set_weight_url
        >>> set_weight_url('physnet', 'https://my-server.com/physnet.pth')
    """
    if model_name not in DEFAULT_WEIGHTS:
        raise ValueError(f"Unknown model: {model_name}")

    DEFAULT_WEIGHTS[model_name]['url'] = url
    if sha256 is not None:
        DEFAULT_WEIGHTS[model_name]['sha256'] = sha256

Video Processing Utilities

chunk_video_frames(frames, chunk_size, overlap=0)

Yield chunks of video frames for processing long videos.

Parameters:

Name Type Description Default
frames ndarray

(T, H, W, C) array

required
chunk_size int

Frames per chunk

required
overlap int

Overlapping frames between chunks

0

Yields:

Type Description
Tuple[int, ndarray]

(start_idx, chunk_frames)

Example

frames = np.zeros((1000, 224, 224, 3)) for start, chunk in chunk_video_frames(frames, 300, overlap=30): ... # Process chunk of 300 frames ... result = process(chunk)

Source code in veridex/video/utils.py
def chunk_video_frames(
    frames: np.ndarray, 
    chunk_size: int, 
    overlap: int = 0
) -> Iterator[Tuple[int, np.ndarray]]:
    """
    Yield chunks of video frames for processing long videos.

    Args:
        frames: (T, H, W, C) array
        chunk_size: Frames per chunk
        overlap: Overlapping frames between chunks

    Yields:
        (start_idx, chunk_frames)

    Example:
        >>> frames = np.zeros((1000, 224, 224, 3))
        >>> for start, chunk in chunk_video_frames(frames, 300, overlap=30):
        ...     # Process chunk of 300 frames
        ...     result = process(chunk)
    """
    total_frames = len(frames)
    stride = chunk_size - overlap

    for start in range(0, total_frames, stride):
        end = min(start + chunk_size, total_frames)
        yield start, frames[start:end]

        if end >= total_frames:
            break

smart_sample_frames(total_frames, target_frames, strategy='uniform')

Sample frame indices intelligently.

Parameters:

Name Type Description Default
total_frames int

Total available frames

required
target_frames int

Desired number of frames

required
strategy Literal['uniform', 'random']

'uniform' (evenly spaced) or 'random'

'uniform'

Returns:

Type Description
List[int]

List of frame indices to sample

Example

indices = smart_sample_frames(300, 64, 'uniform') len(indices) 64

Source code in veridex/video/utils.py
def smart_sample_frames(
    total_frames: int, 
    target_frames: int, 
    strategy: Literal['uniform', 'random'] = 'uniform'
) -> List[int]:
    """
    Sample frame indices intelligently.

    Args:
        total_frames: Total available frames
        target_frames: Desired number of frames
        strategy: 'uniform' (evenly spaced) or 'random'

    Returns:
        List of frame indices to sample

    Example:
        >>> indices = smart_sample_frames(300, 64, 'uniform')
        >>> len(indices)
        64
    """
    if target_frames >= total_frames:
        return list(range(total_frames))

    if strategy == 'uniform':
        return np.linspace(0, total_frames - 1, target_frames, dtype=int).tolist()
    elif strategy == 'random':
        return sorted(np.random.choice(total_frames, target_frames, replace=False))
    else:
        raise ValueError(f"Unknown strategy: {strategy}. Use 'uniform' or 'random'")

validate_video_file(file_path)

Validate video file and extract metadata.

Parameters:

Name Type Description Default
file_path str

Path to video file

required

Returns:

Type Description
Tuple[bool, str, dict]

(is_valid, error_message, metadata_dict)

Example

valid, error, meta = validate_video_file('video.mp4') if valid: ... print(f"FPS: {meta['fps']}, Frames: {meta['total_frames']}")

Source code in veridex/video/utils.py
def validate_video_file(file_path: str) -> Tuple[bool, str, dict]:
    """
    Validate video file and extract metadata.

    Args:
        file_path: Path to video file

    Returns:
        (is_valid, error_message, metadata_dict)

    Example:
        >>> valid, error, meta = validate_video_file('video.mp4')
        >>> if valid:
        ...     print(f"FPS: {meta['fps']}, Frames: {meta['total_frames']}")
    """
    import os
    import cv2

    if not os.path.exists(file_path):
        return False, f"File not found: {file_path}", {}

    cap = cv2.VideoCapture(file_path)
    if not cap.isOpened():
        return False, "Unable to open video file. Format may be unsupported.", {}

    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    cap.release()

    metadata = {
        'fps': fps,
        'total_frames': total_frames,
        'width': width,
        'height': height,
        'duration_seconds': total_frames / fps if fps > 0 else 0
    }

    if total_frames < 30:
        return False, "Video too short (< 1 second)", metadata

    return True, "", metadata