import whisper import asyncio import os from typing import List, Optional, Tuple from app.models.schemas import TranscriptSegment from app.config import settings # Global model cache _model = None def get_whisper_model(): """Load Whisper model (cached).""" global _model if _model is None: print(f"Loading Whisper model: {settings.WHISPER_MODEL}") _model = whisper.load_model(settings.WHISPER_MODEL) return _model async def check_audio_availability(video_path: str) -> Tuple[bool, str]: """ Check if video has usable audio for transcription. Returns: Tuple of (has_audio, message) """ from app.services.video_processor import has_audio_stream, get_audio_volume_info, is_audio_silent # Check if audio stream exists if not await has_audio_stream(video_path): return False, "no_audio_stream" # Check if audio is silent volume_info = await get_audio_volume_info(video_path) if is_audio_silent(volume_info): return False, "audio_silent" return True, "audio_ok" async def transcribe_video( video_path: str, use_noise_reduction: bool = True, noise_reduction_level: str = "medium", use_vocal_separation: bool = False, progress_callback: Optional[callable] = None, ) -> Tuple[bool, str, Optional[List[TranscriptSegment]]]: """ Transcribe video audio using Whisper. Args: video_path: Path to video file use_noise_reduction: Whether to apply noise reduction before transcription noise_reduction_level: "light", "medium", or "heavy" use_vocal_separation: Whether to separate vocals from background music first progress_callback: Optional async callback function(step: str, progress: int) for progress updates Returns: Tuple of (success, message, segments, detected_language) - success=False with message="NO_AUDIO" means video has no audio - success=False with message="SILENT_AUDIO" means audio is too quiet - success=False with message="SINGING_ONLY" means only singing detected (no speech) """ # Helper to call progress callback if provided async def report_progress(step: str, progress: int): print(f"[Transcriber] report_progress: {step} ({progress}%), has_callback: {progress_callback is not None}") if progress_callback: await progress_callback(step, progress) if not os.path.exists(video_path): return False, f"Video file not found: {video_path}", None, None # Check audio availability has_audio, audio_status = await check_audio_availability(video_path) if not has_audio: if audio_status == "no_audio_stream": return False, "NO_AUDIO", None, None elif audio_status == "audio_silent": return False, "SILENT_AUDIO", None, None audio_path = video_path # Default to video path (Whisper can handle it) temp_files = [] # Track temp files for cleanup try: video_dir = os.path.dirname(video_path) # Step 1: Vocal separation (if enabled) if use_vocal_separation: from app.services.audio_separator import separate_vocals, analyze_vocal_type await report_progress("vocal_separation", 15) print("Separating vocals from background music...") separation_dir = os.path.join(video_dir, "separated") success, message, vocals_path, _ = await separate_vocals( video_path, separation_dir ) if success and vocals_path: print(f"Vocal separation complete: {vocals_path}") temp_files.append(separation_dir) # Analyze if vocals are speech or singing print("Analyzing vocal type (speech vs singing)...") vocal_type, confidence = await analyze_vocal_type(vocals_path) print(f"Vocal analysis: {vocal_type} (confidence: {confidence:.2f})") # Treat as singing if: # 1. Explicitly detected as singing # 2. Mixed with low confidence (< 0.6) - likely music, not clear speech if vocal_type == "singing" or (vocal_type == "mixed" and confidence < 0.6): # Only singing/music detected - no clear speech to transcribe _cleanup_temp_files(temp_files) reason = "SINGING_ONLY" if vocal_type == "singing" else "MUSIC_DOMINANT" print(f"No clear speech detected ({reason}), awaiting manual subtitle") return False, "SINGING_ONLY", None, None # Use vocals for transcription audio_path = vocals_path else: print(f"Vocal separation failed: {message}, continuing with original audio") # Step 2: Apply noise reduction (if enabled and not using separated vocals) if use_noise_reduction and audio_path == video_path: from app.services.video_processor import extract_audio_with_noise_reduction await report_progress("extracting_audio", 20) cleaned_path = os.path.join(video_dir, "audio_cleaned.wav") await report_progress("noise_reduction", 25) print(f"Applying {noise_reduction_level} noise reduction...") success, message = await extract_audio_with_noise_reduction( video_path, cleaned_path, noise_reduction_level ) if success: print(f"Noise reduction complete: {message}") audio_path = cleaned_path temp_files.append(cleaned_path) else: print(f"Noise reduction failed: {message}, falling back to original audio") # Step 3: Transcribe with Whisper await report_progress("transcribing", 35) model = get_whisper_model() print(f"Transcribing audio: {audio_path}") # Run Whisper in thread pool to avoid blocking the event loop result = await asyncio.to_thread( model.transcribe, audio_path, task="transcribe", language=None, # Auto-detect verbose=False, word_timestamps=True, ) # Split long segments using word-level timestamps segments = _split_segments_by_words( result.get("segments", []), max_duration=2.0, # Maximum segment duration in seconds (shorter for better sync) min_words=1, # Minimum words per segment ) # Clean up temp files _cleanup_temp_files(temp_files) detected_lang = result.get("language", "unknown") print(f"Detected language: {detected_lang}") extras = [] if use_vocal_separation: extras.append("vocal separation") if use_noise_reduction: extras.append(f"noise reduction: {noise_reduction_level}") extra_info = f" ({', '.join(extras)})" if extras else "" # Return tuple with 4 elements: success, message, segments, detected_language return True, f"Transcription complete (detected: {detected_lang}){extra_info}", segments, detected_lang except Exception as e: _cleanup_temp_files(temp_files) return False, f"Transcription error: {str(e)}", None, None def _split_segments_by_words( raw_segments: list, max_duration: float = 4.0, min_words: int = 2, ) -> List[TranscriptSegment]: """ Split long Whisper segments into shorter ones using word-level timestamps. Args: raw_segments: Raw segments from Whisper output max_duration: Maximum duration for each segment in seconds min_words: Minimum words per segment (to avoid single-word segments) Returns: List of TranscriptSegment with shorter durations """ segments = [] for seg in raw_segments: words = seg.get("words", []) seg_text = seg.get("text", "").strip() seg_start = seg.get("start", 0) seg_end = seg.get("end", 0) seg_duration = seg_end - seg_start # If no word timestamps or segment is short enough, use as-is if not words or seg_duration <= max_duration: segments.append(TranscriptSegment( start=seg_start, end=seg_end, text=seg_text, )) continue # Split segment using word timestamps current_words = [] current_start = None for i, word in enumerate(words): word_start = word.get("start", seg_start) word_end = word.get("end", seg_end) word_text = word.get("word", "").strip() if not word_text: continue # Start a new segment if current_start is None: current_start = word_start current_words.append(word_text) current_duration = word_end - current_start # Check if we should split here is_last_word = (i == len(words) - 1) should_split = False if is_last_word: should_split = True elif current_duration >= max_duration and len(current_words) >= min_words: should_split = True elif current_duration >= max_duration * 0.5: # Split at natural break points (punctuation) more aggressively if word_text.endswith((',', '.', '!', '?', '。', ',', '!', '?', '、', ';', ';')): should_split = True elif current_duration >= 1.0 and word_text.endswith(('。', '!', '?', '.', '!', '?')): # Always split at sentence endings if we have at least 1 second of content should_split = True if should_split and current_words: # Create segment text = " ".join(current_words) # For Chinese/Japanese, remove spaces between words if any('\u4e00' <= c <= '\u9fff' for c in text): text = text.replace(" ", "") segments.append(TranscriptSegment( start=current_start, end=word_end, text=text, )) # Reset for next segment current_words = [] current_start = None return segments def _cleanup_temp_files(paths: list): """Clean up temporary files and directories.""" import shutil for path in paths: try: if os.path.isdir(path): shutil.rmtree(path, ignore_errors=True) elif os.path.exists(path): os.remove(path) except Exception: pass def segments_to_srt(segments: List[TranscriptSegment], use_translated: bool = True) -> str: """Convert segments to SRT format.""" srt_lines = [] for i, seg in enumerate(segments, 1): start_time = format_srt_time(seg.start) end_time = format_srt_time(seg.end) text = seg.translated if use_translated and seg.translated else seg.text srt_lines.append(f"{i}") srt_lines.append(f"{start_time} --> {end_time}") srt_lines.append(text) srt_lines.append("") return "\n".join(srt_lines) def format_srt_time(seconds: float) -> str: """Format seconds to SRT timestamp format (HH:MM:SS,mmm).""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) millis = int((seconds % 1) * 1000) return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}" def segments_to_ass( segments: List[TranscriptSegment], use_translated: bool = True, font_size: int = 28, font_color: str = "FFFFFF", outline_color: str = "000000", font_name: str = "NanumGothic", position: str = "bottom", # top, center, bottom outline_width: int = 3, bold: bool = True, shadow: int = 1, background_box: bool = True, background_opacity: str = "E0", # 00=transparent, FF=opaque animation: str = "none", # none, fade, pop time_offset: float = 0.0, # Delay all subtitles by this amount (for intro text) ) -> str: """ Convert segments to ASS format with styling. Args: segments: List of transcript segments use_translated: Use translated text if available font_size: Font size in pixels font_color: Font color in hex (without #) outline_color: Outline color in hex (without #) font_name: Font family name position: Subtitle position - "top", "center", or "bottom" outline_width: Outline thickness bold: Use bold text shadow: Shadow depth (0-4) background_box: Show semi-transparent background box animation: Animation type - "none", "fade", or "pop" time_offset: Delay all subtitle timings by this amount in seconds (useful when intro text is shown) Returns: ASS formatted subtitle string """ # ASS Alignment values: # 1=Bottom-Left, 2=Bottom-Center, 3=Bottom-Right # 4=Middle-Left, 5=Middle-Center, 6=Middle-Right # 7=Top-Left, 8=Top-Center, 9=Top-Right alignment_map = { "top": 8, # Top-Center "center": 5, # Middle-Center (영상 가운데) "bottom": 2, # Bottom-Center (기본값) } alignment = alignment_map.get(position, 2) # Adjust margin based on position (낮은 값 = 화면 가장자리에 더 가까움) # 원본 자막을 덮기 위해 하단 마진을 작게 설정 margin_v = 30 if position == "bottom" else (100 if position == "top" else 10) # Bold: -1 = bold, 0 = normal bold_value = -1 if bold else 0 # BorderStyle: 1 = outline + shadow, 3 = opaque box (background) border_style = 3 if background_box else 1 # BackColour alpha: use provided opacity or default back_alpha = background_opacity if background_box else "80" # ASS header ass_content = f"""[Script Info] Title: Shorts Maker Subtitle ScriptType: v4.00+ PlayDepth: 0 PlayResX: 1080 PlayResY: 1920 [V4+ Styles] Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding Style: Default,{font_name},{font_size},&H00{font_color},&H00FFFFFF,&H00{outline_color},&H{back_alpha}000000,{bold_value},0,0,0,100,100,0,0,{border_style},{outline_width},{shadow},{alignment},30,30,{margin_v},1 [Events] Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text """ for seg in segments: # Apply time offset (for intro text overlay) start_time = format_ass_time(seg.start + time_offset) end_time = format_ass_time(seg.end + time_offset) text = seg.translated if use_translated and seg.translated else seg.text # Escape special characters text = text.replace("\\", "\\\\").replace("{", "\\{").replace("}", "\\}") # Add animation effects if animation == "fade": # Fade in/out effect (250ms) text = f"{{\\fad(250,250)}}{text}" elif animation == "pop": # Pop-in effect with scale animation text = f"{{\\t(0,150,\\fscx110\\fscy110)\\t(150,300,\\fscx100\\fscy100)}}{text}" ass_content += f"Dialogue: 0,{start_time},{end_time},Default,,0,0,0,,{text}\n" return ass_content def format_ass_time(seconds: float) -> str: """Format seconds to ASS timestamp format (H:MM:SS.cc).""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) centis = int((seconds % 1) * 100) return f"{hours}:{minutes:02d}:{secs:02d}.{centis:02d}"