Spaces:
Sleeping
Sleeping
| """ | |
| I/O utilities for video processing and data export. | |
| This module provides functions for reading/writing videos, | |
| exporting trajectory data to CSV, and handling file operations. | |
| """ | |
| import cv2 | |
| import csv | |
| import numpy as np | |
| from typing import List, Tuple, Optional, Generator | |
| from pathlib import Path | |
| class VideoReader: | |
| """ | |
| Context manager for reading video files frame by frame. | |
| Attributes: | |
| video_path (str): Path to input video file | |
| cap (cv2.VideoCapture): OpenCV video capture object | |
| """ | |
| def __init__(self, video_path: str): | |
| """ | |
| Initialize video reader. | |
| Args: | |
| video_path: Path to the video file | |
| Raises: | |
| FileNotFoundError: If video file doesn't exist | |
| RuntimeError: If video cannot be opened | |
| """ | |
| self.video_path = video_path | |
| if not Path(video_path).exists(): | |
| raise FileNotFoundError(f"Video file not found: {video_path}") | |
| self.cap = cv2.VideoCapture(video_path) | |
| if not self.cap.isOpened(): | |
| raise RuntimeError(f"Failed to open video: {video_path}") | |
| def __enter__(self): | |
| """Context manager entry.""" | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| """Context manager exit - release video capture.""" | |
| self.cap.release() | |
| def get_properties(self) -> dict: | |
| """ | |
| Get video properties. | |
| Returns: | |
| Dictionary containing fps, frame_count, width, height | |
| """ | |
| return { | |
| 'fps': self.cap.get(cv2.CAP_PROP_FPS), | |
| 'frame_count': int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)), | |
| 'width': int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)), | |
| 'height': int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| } | |
| def read_frames(self) -> Generator[Tuple[int, np.ndarray], None, None]: | |
| """ | |
| Generator that yields frames from the video. | |
| Yields: | |
| Tuple of (frame_number, frame_array) | |
| """ | |
| frame_num = 0 | |
| while True: | |
| ret, frame = self.cap.read() | |
| if not ret: | |
| break | |
| yield frame_num, frame | |
| frame_num += 1 | |
| def read_frame(self) -> Tuple[bool, Optional[np.ndarray]]: | |
| """ | |
| Read a single frame. | |
| Returns: | |
| Tuple of (success, frame) where success is a boolean | |
| """ | |
| return self.cap.read() | |
| class VideoWriter: | |
| """ | |
| Context manager for writing video files. | |
| Attributes: | |
| output_path (str): Path to output video file | |
| fps (float): Frame rate | |
| width (int): Frame width | |
| height (int): Frame height | |
| """ | |
| def __init__( | |
| self, | |
| output_path: str, | |
| fps: float, | |
| width: int, | |
| height: int, | |
| codec: str = 'mp4v' | |
| ): | |
| """ | |
| Initialize video writer. | |
| Args: | |
| output_path: Path to save the video | |
| fps: Frame rate | |
| width: Frame width in pixels | |
| height: Frame height in pixels | |
| codec: Video codec fourcc code | |
| """ | |
| self.output_path = output_path | |
| self.fps = fps | |
| self.width = width | |
| self.height = height | |
| # Create output directory if it doesn't exist | |
| Path(output_path).parent.mkdir(parents=True, exist_ok=True) | |
| # Initialize video writer | |
| fourcc = cv2.VideoWriter_fourcc(*codec) | |
| self.writer = cv2.VideoWriter( | |
| output_path, | |
| fourcc, | |
| fps, | |
| (width, height) | |
| ) | |
| if not self.writer.isOpened(): | |
| raise RuntimeError(f"Failed to create video writer: {output_path}") | |
| def __enter__(self): | |
| """Context manager entry.""" | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| """Context manager exit - release video writer.""" | |
| self.writer.release() | |
| def write_frame(self, frame: np.ndarray): | |
| """ | |
| Write a single frame to the video. | |
| Args: | |
| frame: Frame array in BGR format | |
| """ | |
| # Ensure frame has correct dimensions | |
| if frame.shape[1] != self.width or frame.shape[0] != self.height: | |
| frame = cv2.resize(frame, (self.width, self.height)) | |
| self.writer.write(frame) | |
| def export_trajectory_csv( | |
| trajectory: List[Tuple[float, float, float, float, int]], | |
| fps: float, | |
| output_path: str | |
| ) -> bool: | |
| """ | |
| Export trajectory data to CSV file. | |
| Args: | |
| trajectory: List of (x, y, vx, vy, frame_num) tuples | |
| fps: Video frame rate | |
| output_path: Path to save CSV file | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| # Create output directory if needed | |
| Path(output_path).parent.mkdir(parents=True, exist_ok=True) | |
| with open(output_path, 'w', newline='') as csvfile: | |
| writer = csv.writer(csvfile) | |
| # Write header | |
| writer.writerow([ | |
| 'frame', | |
| 'timestamp_sec', | |
| 'x_pixels', | |
| 'y_pixels', | |
| 'velocity_x_px_per_sec', | |
| 'velocity_y_px_per_sec', | |
| 'speed_px_per_sec' | |
| ]) | |
| # Write data rows | |
| for x, y, vx, vy, frame_num in trajectory: | |
| timestamp = frame_num / fps | |
| speed = np.sqrt(vx**2 + vy**2) / (1.0 / fps) | |
| writer.writerow([ | |
| frame_num, | |
| f"{timestamp:.3f}", | |
| f"{x:.2f}", | |
| f"{y:.2f}", | |
| f"{vx / (1.0 / fps):.2f}", | |
| f"{vy / (1.0 / fps):.2f}", | |
| f"{speed:.2f}" | |
| ]) | |
| return True | |
| except Exception as e: | |
| print(f"Error exporting CSV: {str(e)}") | |
| return False | |
| def get_video_info(video_path: str) -> Optional[dict]: | |
| """ | |
| Get basic information about a video file. | |
| Args: | |
| video_path: Path to video file | |
| Returns: | |
| Dictionary with video properties or None if failed | |
| """ | |
| try: | |
| with VideoReader(video_path) as reader: | |
| return reader.get_properties() | |
| except Exception as e: | |
| print(f"Error reading video info: {str(e)}") | |
| return None | |
| def validate_video_file(video_path: str) -> Tuple[bool, str]: | |
| """ | |
| Validate that a video file exists and can be opened. | |
| Args: | |
| video_path: Path to video file | |
| Returns: | |
| Tuple of (is_valid, error_message) | |
| """ | |
| if not video_path: | |
| return False, "No video path provided" | |
| path = Path(video_path) | |
| if not path.exists(): | |
| return False, f"Video file not found: {video_path}" | |
| if not path.is_file(): | |
| return False, f"Path is not a file: {video_path}" | |
| # Try to open the video | |
| try: | |
| with VideoReader(video_path) as reader: | |
| props = reader.get_properties() | |
| if props['frame_count'] == 0: | |
| return False, "Video has no frames" | |
| if props['fps'] <= 0: | |
| return False, "Invalid video frame rate" | |
| return True, "Valid video file" | |
| except Exception as e: | |
| return False, f"Failed to open video: {str(e)}" | |
| def create_output_directory(output_dir: str = "output") -> Path: | |
| """ | |
| Create output directory if it doesn't exist. | |
| Args: | |
| output_dir: Directory name/path | |
| Returns: | |
| Path object for the output directory | |
| """ | |
| output_path = Path(output_dir) | |
| output_path.mkdir(parents=True, exist_ok=True) | |
| return output_path | |