- TimelineEditor, VideoStudio 컴포넌트 신규 추가 - 백엔드 transcriber, video_processor 서비스 개선 - 프론트엔드 HomePage 리팩토링 및 스타일 업데이트 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
481 lines
18 KiB
Python
481 lines
18 KiB
Python
import whisper
|
||
import asyncio
|
||
import os
|
||
from typing import List, Optional, Tuple
|
||
from app.models.schemas import TranscriptSegment
|
||
from app.config import settings
|
||
|
||
# Global model cache
|
||
_model = None
|
||
|
||
|
||
def get_whisper_model():
|
||
"""Load Whisper model (cached)."""
|
||
global _model
|
||
if _model is None:
|
||
print(f"Loading Whisper model: {settings.WHISPER_MODEL}")
|
||
_model = whisper.load_model(settings.WHISPER_MODEL)
|
||
return _model
|
||
|
||
|
||
async def check_audio_availability(video_path: str) -> Tuple[bool, str]:
|
||
"""
|
||
Check if video has usable audio for transcription.
|
||
|
||
Returns:
|
||
Tuple of (has_audio, message)
|
||
"""
|
||
from app.services.video_processor import has_audio_stream, get_audio_volume_info, is_audio_silent
|
||
|
||
# Check if audio stream exists
|
||
if not await has_audio_stream(video_path):
|
||
return False, "no_audio_stream"
|
||
|
||
# Check if audio is silent
|
||
volume_info = await get_audio_volume_info(video_path)
|
||
if is_audio_silent(volume_info):
|
||
return False, "audio_silent"
|
||
|
||
return True, "audio_ok"
|
||
|
||
|
||
async def transcribe_video(
|
||
video_path: str,
|
||
use_noise_reduction: bool = True,
|
||
noise_reduction_level: str = "medium",
|
||
use_vocal_separation: bool = False,
|
||
progress_callback: Optional[callable] = None,
|
||
) -> Tuple[bool, str, Optional[List[TranscriptSegment]]]:
|
||
"""
|
||
Transcribe video audio using Whisper.
|
||
|
||
Args:
|
||
video_path: Path to video file
|
||
use_noise_reduction: Whether to apply noise reduction before transcription
|
||
noise_reduction_level: "light", "medium", or "heavy"
|
||
use_vocal_separation: Whether to separate vocals from background music first
|
||
progress_callback: Optional async callback function(step: str, progress: int) for progress updates
|
||
|
||
Returns:
|
||
Tuple of (success, message, segments, detected_language)
|
||
- success=False with message="NO_AUDIO" means video has no audio
|
||
- success=False with message="SILENT_AUDIO" means audio is too quiet
|
||
- success=False with message="SINGING_ONLY" means only singing detected (no speech)
|
||
"""
|
||
# Helper to call progress callback if provided
|
||
async def report_progress(step: str, progress: int):
|
||
print(f"[Transcriber] report_progress: {step} ({progress}%), has_callback: {progress_callback is not None}")
|
||
if progress_callback:
|
||
await progress_callback(step, progress)
|
||
|
||
if not os.path.exists(video_path):
|
||
return False, f"Video file not found: {video_path}", None, None
|
||
|
||
# Check audio availability
|
||
has_audio, audio_status = await check_audio_availability(video_path)
|
||
if not has_audio:
|
||
if audio_status == "no_audio_stream":
|
||
return False, "NO_AUDIO", None, None
|
||
elif audio_status == "audio_silent":
|
||
return False, "SILENT_AUDIO", None, None
|
||
|
||
audio_path = video_path # Default to video path (Whisper can handle it)
|
||
temp_files = [] # Track temp files for cleanup
|
||
|
||
try:
|
||
video_dir = os.path.dirname(video_path)
|
||
|
||
# Step 1: Vocal separation (if enabled)
|
||
if use_vocal_separation:
|
||
from app.services.audio_separator import separate_vocals, analyze_vocal_type
|
||
|
||
await report_progress("vocal_separation", 15)
|
||
print("Separating vocals from background music...")
|
||
separation_dir = os.path.join(video_dir, "separated")
|
||
|
||
success, message, vocals_path, _ = await separate_vocals(
|
||
video_path,
|
||
separation_dir
|
||
)
|
||
|
||
if success and vocals_path:
|
||
print(f"Vocal separation complete: {vocals_path}")
|
||
temp_files.append(separation_dir)
|
||
|
||
# Analyze if vocals are speech or singing
|
||
print("Analyzing vocal type (speech vs singing)...")
|
||
vocal_type, confidence = await analyze_vocal_type(vocals_path)
|
||
print(f"Vocal analysis: {vocal_type} (confidence: {confidence:.2f})")
|
||
|
||
# Treat as singing if:
|
||
# 1. Explicitly detected as singing
|
||
# 2. Mixed with low confidence (< 0.6) - likely music, not clear speech
|
||
if vocal_type == "singing" or (vocal_type == "mixed" and confidence < 0.6):
|
||
# Only singing/music detected - no clear speech to transcribe
|
||
_cleanup_temp_files(temp_files)
|
||
reason = "SINGING_ONLY" if vocal_type == "singing" else "MUSIC_DOMINANT"
|
||
print(f"No clear speech detected ({reason}), awaiting manual subtitle")
|
||
return False, "SINGING_ONLY", None, None
|
||
|
||
# Use vocals for transcription
|
||
audio_path = vocals_path
|
||
else:
|
||
print(f"Vocal separation failed: {message}, continuing with original audio")
|
||
|
||
# Step 2: Apply noise reduction (if enabled and not using separated vocals)
|
||
if use_noise_reduction and audio_path == video_path:
|
||
from app.services.video_processor import extract_audio_with_noise_reduction
|
||
|
||
await report_progress("extracting_audio", 20)
|
||
cleaned_path = os.path.join(video_dir, "audio_cleaned.wav")
|
||
|
||
await report_progress("noise_reduction", 25)
|
||
print(f"Applying {noise_reduction_level} noise reduction...")
|
||
success, message = await extract_audio_with_noise_reduction(
|
||
video_path,
|
||
cleaned_path,
|
||
noise_reduction_level
|
||
)
|
||
|
||
if success:
|
||
print(f"Noise reduction complete: {message}")
|
||
audio_path = cleaned_path
|
||
temp_files.append(cleaned_path)
|
||
else:
|
||
print(f"Noise reduction failed: {message}, falling back to original audio")
|
||
|
||
# Step 3: Transcribe with Whisper
|
||
await report_progress("transcribing", 35)
|
||
model = get_whisper_model()
|
||
|
||
print(f"Transcribing audio: {audio_path}")
|
||
# Run Whisper in thread pool to avoid blocking the event loop
|
||
result = await asyncio.to_thread(
|
||
model.transcribe,
|
||
audio_path,
|
||
task="transcribe",
|
||
language=None, # Auto-detect
|
||
verbose=False,
|
||
word_timestamps=True,
|
||
)
|
||
|
||
# Split long segments using word-level timestamps
|
||
segments = _split_segments_by_words(
|
||
result.get("segments", []),
|
||
max_duration=2.0, # Maximum segment duration in seconds (shorter for better sync)
|
||
min_words=1, # Minimum words per segment
|
||
)
|
||
|
||
# Clean up temp files
|
||
_cleanup_temp_files(temp_files)
|
||
|
||
detected_lang = result.get("language", "unknown")
|
||
print(f"Detected language: {detected_lang}")
|
||
extras = []
|
||
if use_vocal_separation:
|
||
extras.append("vocal separation")
|
||
if use_noise_reduction:
|
||
extras.append(f"noise reduction: {noise_reduction_level}")
|
||
extra_info = f" ({', '.join(extras)})" if extras else ""
|
||
|
||
# Return tuple with 4 elements: success, message, segments, detected_language
|
||
return True, f"Transcription complete (detected: {detected_lang}){extra_info}", segments, detected_lang
|
||
|
||
except Exception as e:
|
||
_cleanup_temp_files(temp_files)
|
||
return False, f"Transcription error: {str(e)}", None, None
|
||
|
||
|
||
def _split_segments_by_words(
|
||
raw_segments: list,
|
||
max_duration: float = 4.0,
|
||
min_words: int = 2,
|
||
) -> List[TranscriptSegment]:
|
||
"""
|
||
Split long Whisper segments into shorter ones using word-level timestamps.
|
||
|
||
Args:
|
||
raw_segments: Raw segments from Whisper output
|
||
max_duration: Maximum duration for each segment in seconds
|
||
min_words: Minimum words per segment (to avoid single-word segments)
|
||
|
||
Returns:
|
||
List of TranscriptSegment with shorter durations
|
||
"""
|
||
segments = []
|
||
|
||
for seg in raw_segments:
|
||
words = seg.get("words", [])
|
||
seg_text = seg.get("text", "").strip()
|
||
seg_start = seg.get("start", 0)
|
||
seg_end = seg.get("end", 0)
|
||
seg_duration = seg_end - seg_start
|
||
|
||
# If no word timestamps or segment is short enough, use as-is
|
||
if not words or seg_duration <= max_duration:
|
||
segments.append(TranscriptSegment(
|
||
start=seg_start,
|
||
end=seg_end,
|
||
text=seg_text,
|
||
))
|
||
continue
|
||
|
||
# Split segment using word timestamps
|
||
current_words = []
|
||
current_start = None
|
||
|
||
for i, word in enumerate(words):
|
||
word_start = word.get("start", seg_start)
|
||
word_end = word.get("end", seg_end)
|
||
word_text = word.get("word", "").strip()
|
||
|
||
if not word_text:
|
||
continue
|
||
|
||
# Start a new segment
|
||
if current_start is None:
|
||
current_start = word_start
|
||
|
||
current_words.append(word_text)
|
||
current_duration = word_end - current_start
|
||
|
||
# Check if we should split here
|
||
is_last_word = (i == len(words) - 1)
|
||
should_split = False
|
||
|
||
if is_last_word:
|
||
should_split = True
|
||
elif current_duration >= max_duration and len(current_words) >= min_words:
|
||
should_split = True
|
||
elif current_duration >= max_duration * 0.5:
|
||
# Split at natural break points (punctuation) more aggressively
|
||
if word_text.endswith((',', '.', '!', '?', '。', ',', '!', '?', '、', ';', ';')):
|
||
should_split = True
|
||
elif current_duration >= 1.0 and word_text.endswith(('。', '!', '?', '.', '!', '?')):
|
||
# Always split at sentence endings if we have at least 1 second of content
|
||
should_split = True
|
||
|
||
if should_split and current_words:
|
||
# Create segment
|
||
text = " ".join(current_words)
|
||
# For Chinese/Japanese, remove spaces between words
|
||
if any('\u4e00' <= c <= '\u9fff' for c in text):
|
||
text = text.replace(" ", "")
|
||
|
||
segments.append(TranscriptSegment(
|
||
start=current_start,
|
||
end=word_end,
|
||
text=text,
|
||
))
|
||
|
||
# Reset for next segment
|
||
current_words = []
|
||
current_start = None
|
||
|
||
return segments
|
||
|
||
|
||
def _cleanup_temp_files(paths: list):
|
||
"""Clean up temporary files and directories."""
|
||
import shutil
|
||
for path in paths:
|
||
try:
|
||
if os.path.isdir(path):
|
||
shutil.rmtree(path, ignore_errors=True)
|
||
elif os.path.exists(path):
|
||
os.remove(path)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def segments_to_srt(segments: List[TranscriptSegment], use_translated: bool = True) -> str:
|
||
"""Convert segments to SRT format."""
|
||
srt_lines = []
|
||
|
||
for i, seg in enumerate(segments, 1):
|
||
start_time = format_srt_time(seg.start)
|
||
end_time = format_srt_time(seg.end)
|
||
text = seg.translated if use_translated and seg.translated else seg.text
|
||
|
||
srt_lines.append(f"{i}")
|
||
srt_lines.append(f"{start_time} --> {end_time}")
|
||
srt_lines.append(text)
|
||
srt_lines.append("")
|
||
|
||
return "\n".join(srt_lines)
|
||
|
||
|
||
def format_srt_time(seconds: float) -> str:
|
||
"""Format seconds to SRT timestamp format (HH:MM:SS,mmm)."""
|
||
hours = int(seconds // 3600)
|
||
minutes = int((seconds % 3600) // 60)
|
||
secs = int(seconds % 60)
|
||
millis = int((seconds % 1) * 1000)
|
||
return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"
|
||
|
||
|
||
def auto_wrap_text(text: str, max_chars: int) -> str:
|
||
"""
|
||
자동으로 긴 텍스트를 2줄로 나눔.
|
||
|
||
Args:
|
||
text: 원본 텍스트
|
||
max_chars: 줄당 최대 글자 수 (0이면 비활성화)
|
||
|
||
Returns:
|
||
줄바꿈이 적용된 텍스트 (\\N 사용)
|
||
"""
|
||
if max_chars <= 0 or len(text) <= max_chars:
|
||
return text
|
||
|
||
# 이미 수동 줄바꿈이 있으면 그대로 반환 (\N, \n, /N, /n 모두 체크)
|
||
if "\\N" in text or "\\n" in text or "/N" in text or "/n" in text:
|
||
return text
|
||
|
||
# 중간 지점 근처에서 좋은 끊김점 찾기
|
||
mid = len(text) // 2
|
||
best_break = mid
|
||
|
||
# 공백, 쉼표, 마침표 등에서 끊기 우선
|
||
break_chars = [' ', ',', '.', '!', '?', '。', ',', '!', '?', '、']
|
||
|
||
# 중간점에서 가장 가까운 끊김점 찾기 (앞뒤 10자 범위)
|
||
for offset in range(min(10, mid)):
|
||
# 중간 뒤쪽 확인
|
||
if mid + offset < len(text) and text[mid + offset] in break_chars:
|
||
best_break = mid + offset + 1
|
||
break
|
||
# 중간 앞쪽 확인
|
||
if mid - offset >= 0 and text[mid - offset] in break_chars:
|
||
best_break = mid - offset + 1
|
||
break
|
||
|
||
# 끊김점이 없으면 그냥 중간에서 자르기
|
||
line1 = text[:best_break].strip()
|
||
line2 = text[best_break:].strip()
|
||
|
||
if line2:
|
||
return f"{line1}\\N{line2}"
|
||
return line1
|
||
|
||
|
||
def segments_to_ass(
|
||
segments: List[TranscriptSegment],
|
||
use_translated: bool = True,
|
||
font_size: int = 70,
|
||
font_color: str = "FFFFFF",
|
||
outline_color: str = "000000",
|
||
font_name: str = "Pretendard",
|
||
position: str = "center", # top, center, bottom
|
||
margin_v: int = 50, # 수직 위치 (0=가장자리, 100=화면 중심쪽)
|
||
outline_width: int = 4, # 아웃라인 두께 (가독성)
|
||
bold: bool = True,
|
||
shadow: int = 2, # 그림자 깊이
|
||
background_box: bool = False, # False=아웃라인 스타일 (깔끔함)
|
||
background_opacity: str = "80", # 00=transparent, FF=opaque
|
||
animation: str = "fade", # none, fade, pop
|
||
time_offset: float = 0.0, # Delay all subtitles by this amount (for intro text)
|
||
max_chars_per_line: int = 0, # 줄당 최대 글자 수 (0=비활성화, 15~20 권장)
|
||
) -> str:
|
||
"""
|
||
Convert segments to ASS format with styling.
|
||
|
||
Args:
|
||
segments: List of transcript segments
|
||
use_translated: Use translated text if available
|
||
font_size: Font size in pixels
|
||
font_color: Font color in hex (without #)
|
||
outline_color: Outline color in hex (without #)
|
||
font_name: Font family name
|
||
position: Subtitle position - "top", "center", or "bottom"
|
||
margin_v: Vertical margin (0=edge, 100=toward center) - percentage of screen height
|
||
outline_width: Outline thickness
|
||
bold: Use bold text
|
||
shadow: Shadow depth (0-4)
|
||
background_box: Show semi-transparent background box
|
||
animation: Animation type - "none", "fade", or "pop"
|
||
time_offset: Delay all subtitle timings by this amount in seconds (useful when intro text is shown)
|
||
|
||
Returns:
|
||
ASS formatted subtitle string
|
||
"""
|
||
# ASS Alignment values:
|
||
# 1=Bottom-Left, 2=Bottom-Center, 3=Bottom-Right
|
||
# 4=Middle-Left, 5=Middle-Center, 6=Middle-Right
|
||
# 7=Top-Left, 8=Top-Center, 9=Top-Right
|
||
#
|
||
# position='top'으로 고정하고 margin_v를 화면 높이의 퍼센트로 직접 사용
|
||
# margin_v=5 → 상단 5%, margin_v=95 → 하단 95%
|
||
alignment = 8 # Top-Center (상단 기준으로 margin_v 적용)
|
||
|
||
# margin_v를 화면 높이의 퍼센트로 직접 변환 (1920 높이 기준)
|
||
# margin_v=5 → 96px, margin_v=50 → 960px, margin_v=95 → 1824px
|
||
ass_margin_v = int((margin_v / 100) * 1920)
|
||
|
||
# Bold: -1 = bold, 0 = normal
|
||
bold_value = -1 if bold else 0
|
||
|
||
# BorderStyle: 1 = outline + shadow, 3 = opaque box (background)
|
||
border_style = 3 if background_box else 1
|
||
|
||
# BackColour alpha: use provided opacity or default
|
||
back_alpha = background_opacity if background_box else "80"
|
||
|
||
# ASS header
|
||
ass_content = f"""[Script Info]
|
||
Title: Shorts Maker Subtitle
|
||
ScriptType: v4.00+
|
||
PlayDepth: 0
|
||
PlayResX: 1080
|
||
PlayResY: 1920
|
||
|
||
[V4+ Styles]
|
||
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
|
||
Style: Default,{font_name},{font_size},&H00{font_color},&H00FFFFFF,&H00{outline_color},&H{back_alpha}000000,{bold_value},0,0,0,100,100,0,0,{border_style},{outline_width},{shadow},{alignment},30,30,{ass_margin_v},1
|
||
|
||
[Events]
|
||
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
|
||
"""
|
||
|
||
for seg in segments:
|
||
# Apply time offset (for intro text overlay)
|
||
start_time = format_ass_time(seg.start + time_offset)
|
||
end_time = format_ass_time(seg.end + time_offset)
|
||
text = seg.translated if use_translated and seg.translated else seg.text
|
||
|
||
# 1. 자동 줄바꿈 적용 (max_chars_per_line이 설정된 경우)
|
||
if max_chars_per_line > 0:
|
||
text = auto_wrap_text(text, max_chars_per_line)
|
||
|
||
# 2. 수동 줄바꿈 처리: \N, \n, /N, /n을 모두 지원
|
||
# 사용자가 /N (슬래시)를 입력해도 동작하도록 함
|
||
text = text.replace("/N", "<<LINEBREAK>>").replace("/n", "<<LINEBREAK>>")
|
||
text = text.replace("\\N", "<<LINEBREAK>>").replace("\\n", "<<LINEBREAK>>")
|
||
|
||
# 3. Escape special characters (백슬래시, 중괄호)
|
||
text = text.replace("\\", "\\\\").replace("{", "\\{").replace("}", "\\}")
|
||
|
||
# 4. 플레이스홀더를 ASS 줄바꿈으로 복원
|
||
text = text.replace("<<LINEBREAK>>", "\\N")
|
||
|
||
# 5. Add animation effects
|
||
if animation == "fade":
|
||
# Fade in/out effect (250ms)
|
||
text = f"{{\\fad(250,250)}}{text}"
|
||
elif animation == "pop":
|
||
# Pop-in effect with scale animation
|
||
text = f"{{\\t(0,150,\\fscx110\\fscy110)\\t(150,300,\\fscx100\\fscy100)}}{text}"
|
||
|
||
ass_content += f"Dialogue: 0,{start_time},{end_time},Default,,0,0,0,,{text}\n"
|
||
|
||
return ass_content
|
||
|
||
|
||
def format_ass_time(seconds: float) -> str:
|
||
"""Format seconds to ASS timestamp format (H:MM:SS.cc)."""
|
||
hours = int(seconds // 3600)
|
||
minutes = int((seconds % 3600) // 60)
|
||
secs = int(seconds % 60)
|
||
centis = int((seconds % 1) * 100)
|
||
return f"{hours}:{minutes:02d}:{secs:02d}.{centis:02d}"
|