import subprocess import asyncio import os from typing import Optional, Tuple from app.config import settings async def process_video( input_path: str, output_path: str, subtitle_path: Optional[str] = None, bgm_path: Optional[str] = None, bgm_volume: float = 0.3, keep_original_audio: bool = False, intro_text: Optional[str] = None, intro_duration: float = 0.7, intro_font_size: int = 100, ) -> Tuple[bool, str]: """ Process video: remove audio, add subtitles, add BGM, add intro text. Args: input_path: Path to input video output_path: Path for output video subtitle_path: Path to ASS/SRT subtitle file bgm_path: Path to BGM audio file bgm_volume: Volume level for BGM (0.0 - 1.0) keep_original_audio: Whether to keep original audio intro_text: Text to display at the beginning of video (YouTube Shorts thumbnail) intro_duration: How long to display intro text (seconds) intro_font_size: Font size for intro text (100-120 recommended) Returns: Tuple of (success, message) """ if not os.path.exists(input_path): return False, f"Input video not found: {input_path}" os.makedirs(os.path.dirname(output_path), exist_ok=True) # Build FFmpeg command cmd = ["ffmpeg", "-y"] # -y to overwrite # Input video cmd.extend(["-i", input_path]) # Input BGM if provided (stream_loop must come BEFORE -i) if bgm_path and os.path.exists(bgm_path): cmd.extend(["-stream_loop", "-1"]) # Loop BGM infinitely cmd.extend(["-i", bgm_path]) # Build filter complex filter_parts = [] audio_parts = [] # Audio handling if keep_original_audio and bgm_path and os.path.exists(bgm_path): # Mix original audio with BGM filter_parts.append(f"[0:a]volume=1.0[original]") filter_parts.append(f"[1:a]volume={bgm_volume}[bgm]") filter_parts.append(f"[original][bgm]amix=inputs=2:duration=shortest[audio]") audio_output = "[audio]" elif bgm_path and os.path.exists(bgm_path): # BGM only (no original audio) filter_parts.append(f"[1:a]volume={bgm_volume}[audio]") audio_output = "[audio]" elif keep_original_audio: # Original audio only audio_output = "0:a" else: # No audio audio_output = None # Build video filter chain video_filters = [] # Note: We no longer use tpad to add frozen frames, as it extends the video duration. # Instead, intro text is simply overlaid on the existing video content. # 2. Add subtitle overlay if provided if subtitle_path and os.path.exists(subtitle_path): escaped_path = subtitle_path.replace("\\", "/").replace(":", "\\:").replace("'", "\\'") video_filters.append(f"ass='{escaped_path}'") # 3. Add intro text overlay if provided (shown during frozen frame portion) if intro_text: # Find a suitable font - try common Korean fonts font_options = [ "/System/Library/Fonts/Supplemental/AppleGothic.ttf", # macOS Korean "/System/Library/Fonts/AppleSDGothicNeo.ttc", # macOS Korean "/usr/share/fonts/truetype/nanum/NanumGothicBold.ttf", # Linux Korean "/usr/share/fonts/opentype/noto/NotoSansCJK-Bold.ttc", # Linux CJK ] font_file = None for font in font_options: if os.path.exists(font): font_file = font.replace(":", "\\:") break # Adjust font size and split text if too long # Shorts video is 1080 width, so ~10-12 chars fit comfortably at 100px text_len = len(intro_text) adjusted_font_size = intro_font_size # Split into 2 lines if text is long (more than 10 chars) lines = [] if text_len > 10: # Find best split point near middle mid = text_len // 2 split_pos = mid for i in range(mid, max(0, mid - 5), -1): if intro_text[i] in ' ,、,': split_pos = i + 1 break for i in range(mid, min(text_len, mid + 5)): if intro_text[i] in ' ,、,': split_pos = i + 1 break line1 = intro_text[:split_pos].strip() line2 = intro_text[split_pos:].strip() if line2: lines = [line1, line2] else: lines = [intro_text] else: lines = [intro_text] # Adjust font size based on longest line length max_line_len = max(len(line) for line in lines) if max_line_len > 12: adjusted_font_size = int(intro_font_size * 10 / max_line_len) adjusted_font_size = max(50, min(adjusted_font_size, intro_font_size)) # Clamp between 50-100 # Add fade effect timing fade_out_start = max(0.1, intro_duration - 0.3) alpha_expr = f"if(gt(t,{fade_out_start}),(({intro_duration}-t)/0.3),1)" # Create drawtext filter(s) for each line line_height = adjusted_font_size + 20 total_height = line_height * len(lines) for i, line in enumerate(lines): escaped_text = line.replace("'", "\\'").replace(":", "\\:").replace("\\", "\\\\") # Calculate y position for this line (centered overall) if len(lines) == 1: y_expr = "(h-text_h)/2" else: # Center the block of lines, then position each line y_offset = int((i - (len(lines) - 1) / 2) * line_height) y_expr = f"(h-text_h)/2+{y_offset}" drawtext_parts = [ f"text='{escaped_text}'", f"fontsize={adjusted_font_size}", "fontcolor=white", "x=(w-text_w)/2", # Center horizontally f"y={y_expr}", f"enable='lt(t,{intro_duration})'", "borderw=3", "bordercolor=black", "box=1", "boxcolor=black@0.6", "boxborderw=15", f"alpha='{alpha_expr}'", ] if font_file: drawtext_parts.insert(1, f"fontfile='{font_file}'") video_filters.append(f"drawtext={':'.join(drawtext_parts)}") # Combine video filters video_filter_str = ",".join(video_filters) if video_filters else None # Construct FFmpeg command if filter_parts or video_filter_str: if filter_parts and video_filter_str: full_filter = ";".join(filter_parts) + f";[0:v]{video_filter_str}[vout]" cmd.extend(["-filter_complex", full_filter]) cmd.extend(["-map", "[vout]"]) if audio_output and audio_output.startswith("["): cmd.extend(["-map", audio_output]) elif audio_output: cmd.extend(["-map", audio_output]) elif video_filter_str: cmd.extend(["-vf", video_filter_str]) if bgm_path and os.path.exists(bgm_path): cmd.extend(["-filter_complex", f"[1:a]volume={bgm_volume}[audio]"]) cmd.extend(["-map", "0:v", "-map", "[audio]"]) elif not keep_original_audio: cmd.extend(["-an"]) # No audio elif filter_parts: cmd.extend(["-filter_complex", ";".join(filter_parts)]) cmd.extend(["-map", "0:v"]) if audio_output and audio_output.startswith("["): cmd.extend(["-map", audio_output]) else: if not keep_original_audio: cmd.extend(["-an"]) # Output settings cmd.extend([ "-c:v", "libx264", "-preset", "medium", "-crf", "23", "-c:a", "aac", "-b:a", "128k", "-shortest", output_path ]) try: # Run FFmpeg in thread pool to avoid blocking the event loop result = await asyncio.to_thread( subprocess.run, cmd, capture_output=True, text=True, timeout=600, # 10 minute timeout ) if result.returncode != 0: error_msg = result.stderr[-500:] if result.stderr else "Unknown error" return False, f"FFmpeg error: {error_msg}" if os.path.exists(output_path): return True, "Video processing complete" else: return False, "Output file not created" except subprocess.TimeoutExpired: return False, "Processing timed out" except Exception as e: return False, f"Processing error: {str(e)}" async def get_video_duration(video_path: str) -> Optional[float]: """Get video duration in seconds.""" cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", video_path ] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if result.returncode == 0: return float(result.stdout.strip()) except Exception: pass return None async def get_video_info(video_path: str) -> Optional[dict]: """Get video information (duration, resolution, etc.).""" import json as json_module cmd = [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height,duration:format=duration", "-of", "json", video_path ] try: result = await asyncio.to_thread( subprocess.run, cmd, capture_output=True, text=True, timeout=30, ) if result.returncode == 0: data = json_module.loads(result.stdout) info = {} # Get duration from format (more reliable) if "format" in data and "duration" in data["format"]: info["duration"] = float(data["format"]["duration"]) # Get resolution from stream if "streams" in data and len(data["streams"]) > 0: stream = data["streams"][0] info["width"] = stream.get("width") info["height"] = stream.get("height") return info if info else None except Exception: pass return None async def trim_video( input_path: str, output_path: str, start_time: float, end_time: float, ) -> Tuple[bool, str]: """ Trim video to specified time range. Args: input_path: Path to input video output_path: Path for output video start_time: Start time in seconds end_time: End time in seconds Returns: Tuple of (success, message) """ if not os.path.exists(input_path): return False, f"Input video not found: {input_path}" # Validate time range duration = await get_video_duration(input_path) if duration is None: return False, "Could not get video duration" if start_time < 0: start_time = 0 if end_time > duration: end_time = duration if start_time >= end_time: return False, f"Invalid time range: start ({start_time}) >= end ({end_time})" os.makedirs(os.path.dirname(output_path), exist_ok=True) trim_duration = end_time - start_time # Log trim parameters for debugging print(f"[Trim] Input: {input_path}") print(f"[Trim] Original duration: {duration:.3f}s") print(f"[Trim] Requested: start={start_time:.3f}s, end={end_time:.3f}s") print(f"[Trim] Output duration should be: {trim_duration:.3f}s") # Use -ss BEFORE -i for input seeking (faster and more reliable for end trimming) # Combined with -t for accurate duration control # -accurate_seek ensures frame-accurate seeking cmd = [ "ffmpeg", "-y", "-accurate_seek", # Enable accurate seeking "-ss", str(start_time), # Input seeking (before -i) "-i", input_path, "-t", str(trim_duration), # Duration of output "-c:v", "libx264", # Re-encode video for accurate cut "-preset", "fast", # Fast encoding preset "-crf", "18", # High quality (lower = better) "-c:a", "aac", # Re-encode audio "-b:a", "128k", # Audio bitrate "-avoid_negative_ts", "make_zero", # Fix timestamp issues output_path ] print(f"[Trim] Command: {' '.join(cmd)}") try: result = await asyncio.to_thread( subprocess.run, cmd, capture_output=True, text=True, timeout=120, ) if result.returncode != 0: error_msg = result.stderr[-300:] if result.stderr else "Unknown error" print(f"[Trim] FFmpeg error: {error_msg}") return False, f"Trim failed: {error_msg}" if os.path.exists(output_path): new_duration = await get_video_duration(output_path) print(f"[Trim] Success! New duration: {new_duration:.3f}s (expected: {trim_duration:.3f}s)") print(f"[Trim] Difference from expected: {abs(new_duration - trim_duration):.3f}s") return True, f"Video trimmed successfully ({new_duration:.1f}s)" else: print("[Trim] Error: Output file not created") return False, "Output file not created" except subprocess.TimeoutExpired: print("[Trim] Error: Timeout") return False, "Trim operation timed out" except Exception as e: print(f"[Trim] Error: {str(e)}") return False, f"Trim error: {str(e)}" async def extract_frame( video_path: str, output_path: str, timestamp: float, ) -> Tuple[bool, str]: """ Extract a single frame from video at specified timestamp. Args: video_path: Path to input video output_path: Path for output image (jpg/png) timestamp: Time in seconds Returns: Tuple of (success, message) """ if not os.path.exists(video_path): return False, f"Video not found: {video_path}" os.makedirs(os.path.dirname(output_path), exist_ok=True) cmd = [ "ffmpeg", "-y", "-ss", str(timestamp), "-i", video_path, "-frames:v", "1", "-q:v", "2", output_path ] try: result = await asyncio.to_thread( subprocess.run, cmd, capture_output=True, text=True, timeout=30, ) if result.returncode == 0 and os.path.exists(output_path): return True, "Frame extracted" return False, result.stderr[-200:] if result.stderr else "Unknown error" except Exception as e: return False, str(e) async def get_audio_duration(audio_path: str) -> Optional[float]: """Get audio duration in seconds.""" return await get_video_duration(audio_path) # Same command works async def extract_audio(video_path: str, output_path: str) -> Tuple[bool, str]: """Extract audio from video.""" cmd = [ "ffmpeg", "-y", "-i", video_path, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", output_path ] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) if result.returncode == 0: return True, "Audio extracted" return False, result.stderr except Exception as e: return False, str(e) async def extract_audio_with_noise_reduction( video_path: str, output_path: str, noise_reduction_level: str = "medium" ) -> Tuple[bool, str]: """ Extract audio from video with noise reduction for better STT accuracy. Args: video_path: Path to input video output_path: Path for output audio (WAV format recommended) noise_reduction_level: "light", "medium", or "heavy" Returns: Tuple of (success, message) """ if not os.path.exists(video_path): return False, f"Video file not found: {video_path}" # Build audio filter chain based on noise reduction level filters = [] # 1. High-pass filter: Remove low frequency rumble (< 80Hz) filters.append("highpass=f=80") # 2. Low-pass filter: Remove high frequency hiss (> 8000Hz for speech) filters.append("lowpass=f=8000") if noise_reduction_level == "light": # Light: Just basic frequency filtering pass elif noise_reduction_level == "medium": # Medium: Add FFT-based denoiser # afftdn: nr=noise reduction amount (0-100), nf=noise floor filters.append("afftdn=nf=-25:nr=10:nt=w") elif noise_reduction_level == "heavy": # Heavy: More aggressive noise reduction filters.append("afftdn=nf=-20:nr=20:nt=w") # Add dynamic range compression to normalize volume filters.append("acompressor=threshold=-20dB:ratio=4:attack=5:release=50") # 3. Normalize audio levels filters.append("loudnorm=I=-16:TP=-1.5:LRA=11") filter_chain = ",".join(filters) cmd = [ "ffmpeg", "-y", "-i", video_path, "-vn", # No video "-af", filter_chain, "-acodec", "pcm_s16le", # PCM format for Whisper "-ar", "16000", # 16kHz sample rate (Whisper optimal) "-ac", "1", # Mono output_path ] try: # Run FFmpeg in thread pool to avoid blocking the event loop result = await asyncio.to_thread( subprocess.run, cmd, capture_output=True, text=True, timeout=120, ) if result.returncode != 0: error_msg = result.stderr[-300:] if result.stderr else "Unknown error" return False, f"Audio extraction failed: {error_msg}" if os.path.exists(output_path): return True, f"Audio extracted with {noise_reduction_level} noise reduction" else: return False, "Output file not created" except subprocess.TimeoutExpired: return False, "Audio extraction timed out" except Exception as e: return False, f"Audio extraction error: {str(e)}" async def analyze_audio_noise_level(audio_path: str) -> Optional[dict]: """ Analyze audio to detect noise level. Returns dict with mean_volume, max_volume, noise_floor estimates. """ cmd = [ "ffmpeg", "-i", audio_path, "-af", "volumedetect", "-f", "null", "-" ] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) stderr = result.stderr # Parse volume detection output info = {} for line in stderr.split('\n'): if 'mean_volume' in line: info['mean_volume'] = float(line.split(':')[1].strip().replace(' dB', '')) elif 'max_volume' in line: info['max_volume'] = float(line.split(':')[1].strip().replace(' dB', '')) return info if info else None except Exception: return None async def has_audio_stream(video_path: str) -> bool: """ Check if video file has an audio stream. Returns: True if video has audio, False otherwise """ cmd = [ "ffprobe", "-v", "error", "-select_streams", "a", # Select only audio streams "-show_entries", "stream=codec_type", "-of", "csv=p=0", video_path ] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) # If there's audio, ffprobe will output "audio" return "audio" in result.stdout.lower() except Exception: return False async def get_audio_volume_info(video_path: str) -> Optional[dict]: """ Get audio volume information to detect silent audio. Returns: dict with mean_volume, or None if no audio or error """ # First check if audio stream exists if not await has_audio_stream(video_path): return None cmd = [ "ffmpeg", "-i", video_path, "-af", "volumedetect", "-f", "null", "-" ] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) stderr = result.stderr info = {} for line in stderr.split('\n'): if 'mean_volume' in line: info['mean_volume'] = float(line.split(':')[1].strip().replace(' dB', '')) elif 'max_volume' in line: info['max_volume'] = float(line.split(':')[1].strip().replace(' dB', '')) return info if info else None except Exception: return None def is_audio_silent(volume_info: Optional[dict], threshold_db: float = -50.0) -> bool: """ Check if audio is effectively silent (below threshold). Args: volume_info: dict from get_audio_volume_info threshold_db: Volume below this is considered silent (default -50dB) Returns: True if silent or no audio, False otherwise """ if not volume_info: return True mean_volume = volume_info.get('mean_volume', -100) return mean_volume < threshold_db