Files
bini-shorts-maker/backend/app/services/transcriber.py
kihong.kim 5c57f33903 feat: 타임라인 에디터 및 비디오 스튜디오 컴포넌트 추가
- TimelineEditor, VideoStudio 컴포넌트 신규 추가
- 백엔드 transcriber, video_processor 서비스 개선
- 프론트엔드 HomePage 리팩토링 및 스타일 업데이트

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-06 21:21:58 +09:00

481 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import whisper
import asyncio
import os
from typing import List, Optional, Tuple
from app.models.schemas import TranscriptSegment
from app.config import settings
# Global model cache
_model = None
def get_whisper_model():
"""Load Whisper model (cached)."""
global _model
if _model is None:
print(f"Loading Whisper model: {settings.WHISPER_MODEL}")
_model = whisper.load_model(settings.WHISPER_MODEL)
return _model
async def check_audio_availability(video_path: str) -> Tuple[bool, str]:
"""
Check if video has usable audio for transcription.
Returns:
Tuple of (has_audio, message)
"""
from app.services.video_processor import has_audio_stream, get_audio_volume_info, is_audio_silent
# Check if audio stream exists
if not await has_audio_stream(video_path):
return False, "no_audio_stream"
# Check if audio is silent
volume_info = await get_audio_volume_info(video_path)
if is_audio_silent(volume_info):
return False, "audio_silent"
return True, "audio_ok"
async def transcribe_video(
video_path: str,
use_noise_reduction: bool = True,
noise_reduction_level: str = "medium",
use_vocal_separation: bool = False,
progress_callback: Optional[callable] = None,
) -> Tuple[bool, str, Optional[List[TranscriptSegment]]]:
"""
Transcribe video audio using Whisper.
Args:
video_path: Path to video file
use_noise_reduction: Whether to apply noise reduction before transcription
noise_reduction_level: "light", "medium", or "heavy"
use_vocal_separation: Whether to separate vocals from background music first
progress_callback: Optional async callback function(step: str, progress: int) for progress updates
Returns:
Tuple of (success, message, segments, detected_language)
- success=False with message="NO_AUDIO" means video has no audio
- success=False with message="SILENT_AUDIO" means audio is too quiet
- success=False with message="SINGING_ONLY" means only singing detected (no speech)
"""
# Helper to call progress callback if provided
async def report_progress(step: str, progress: int):
print(f"[Transcriber] report_progress: {step} ({progress}%), has_callback: {progress_callback is not None}")
if progress_callback:
await progress_callback(step, progress)
if not os.path.exists(video_path):
return False, f"Video file not found: {video_path}", None, None
# Check audio availability
has_audio, audio_status = await check_audio_availability(video_path)
if not has_audio:
if audio_status == "no_audio_stream":
return False, "NO_AUDIO", None, None
elif audio_status == "audio_silent":
return False, "SILENT_AUDIO", None, None
audio_path = video_path # Default to video path (Whisper can handle it)
temp_files = [] # Track temp files for cleanup
try:
video_dir = os.path.dirname(video_path)
# Step 1: Vocal separation (if enabled)
if use_vocal_separation:
from app.services.audio_separator import separate_vocals, analyze_vocal_type
await report_progress("vocal_separation", 15)
print("Separating vocals from background music...")
separation_dir = os.path.join(video_dir, "separated")
success, message, vocals_path, _ = await separate_vocals(
video_path,
separation_dir
)
if success and vocals_path:
print(f"Vocal separation complete: {vocals_path}")
temp_files.append(separation_dir)
# Analyze if vocals are speech or singing
print("Analyzing vocal type (speech vs singing)...")
vocal_type, confidence = await analyze_vocal_type(vocals_path)
print(f"Vocal analysis: {vocal_type} (confidence: {confidence:.2f})")
# Treat as singing if:
# 1. Explicitly detected as singing
# 2. Mixed with low confidence (< 0.6) - likely music, not clear speech
if vocal_type == "singing" or (vocal_type == "mixed" and confidence < 0.6):
# Only singing/music detected - no clear speech to transcribe
_cleanup_temp_files(temp_files)
reason = "SINGING_ONLY" if vocal_type == "singing" else "MUSIC_DOMINANT"
print(f"No clear speech detected ({reason}), awaiting manual subtitle")
return False, "SINGING_ONLY", None, None
# Use vocals for transcription
audio_path = vocals_path
else:
print(f"Vocal separation failed: {message}, continuing with original audio")
# Step 2: Apply noise reduction (if enabled and not using separated vocals)
if use_noise_reduction and audio_path == video_path:
from app.services.video_processor import extract_audio_with_noise_reduction
await report_progress("extracting_audio", 20)
cleaned_path = os.path.join(video_dir, "audio_cleaned.wav")
await report_progress("noise_reduction", 25)
print(f"Applying {noise_reduction_level} noise reduction...")
success, message = await extract_audio_with_noise_reduction(
video_path,
cleaned_path,
noise_reduction_level
)
if success:
print(f"Noise reduction complete: {message}")
audio_path = cleaned_path
temp_files.append(cleaned_path)
else:
print(f"Noise reduction failed: {message}, falling back to original audio")
# Step 3: Transcribe with Whisper
await report_progress("transcribing", 35)
model = get_whisper_model()
print(f"Transcribing audio: {audio_path}")
# Run Whisper in thread pool to avoid blocking the event loop
result = await asyncio.to_thread(
model.transcribe,
audio_path,
task="transcribe",
language=None, # Auto-detect
verbose=False,
word_timestamps=True,
)
# Split long segments using word-level timestamps
segments = _split_segments_by_words(
result.get("segments", []),
max_duration=2.0, # Maximum segment duration in seconds (shorter for better sync)
min_words=1, # Minimum words per segment
)
# Clean up temp files
_cleanup_temp_files(temp_files)
detected_lang = result.get("language", "unknown")
print(f"Detected language: {detected_lang}")
extras = []
if use_vocal_separation:
extras.append("vocal separation")
if use_noise_reduction:
extras.append(f"noise reduction: {noise_reduction_level}")
extra_info = f" ({', '.join(extras)})" if extras else ""
# Return tuple with 4 elements: success, message, segments, detected_language
return True, f"Transcription complete (detected: {detected_lang}){extra_info}", segments, detected_lang
except Exception as e:
_cleanup_temp_files(temp_files)
return False, f"Transcription error: {str(e)}", None, None
def _split_segments_by_words(
raw_segments: list,
max_duration: float = 4.0,
min_words: int = 2,
) -> List[TranscriptSegment]:
"""
Split long Whisper segments into shorter ones using word-level timestamps.
Args:
raw_segments: Raw segments from Whisper output
max_duration: Maximum duration for each segment in seconds
min_words: Minimum words per segment (to avoid single-word segments)
Returns:
List of TranscriptSegment with shorter durations
"""
segments = []
for seg in raw_segments:
words = seg.get("words", [])
seg_text = seg.get("text", "").strip()
seg_start = seg.get("start", 0)
seg_end = seg.get("end", 0)
seg_duration = seg_end - seg_start
# If no word timestamps or segment is short enough, use as-is
if not words or seg_duration <= max_duration:
segments.append(TranscriptSegment(
start=seg_start,
end=seg_end,
text=seg_text,
))
continue
# Split segment using word timestamps
current_words = []
current_start = None
for i, word in enumerate(words):
word_start = word.get("start", seg_start)
word_end = word.get("end", seg_end)
word_text = word.get("word", "").strip()
if not word_text:
continue
# Start a new segment
if current_start is None:
current_start = word_start
current_words.append(word_text)
current_duration = word_end - current_start
# Check if we should split here
is_last_word = (i == len(words) - 1)
should_split = False
if is_last_word:
should_split = True
elif current_duration >= max_duration and len(current_words) >= min_words:
should_split = True
elif current_duration >= max_duration * 0.5:
# Split at natural break points (punctuation) more aggressively
if word_text.endswith((',', '.', '!', '?', '', '', '', '', '', '', ';')):
should_split = True
elif current_duration >= 1.0 and word_text.endswith(('', '', '', '.', '!', '?')):
# Always split at sentence endings if we have at least 1 second of content
should_split = True
if should_split and current_words:
# Create segment
text = " ".join(current_words)
# For Chinese/Japanese, remove spaces between words
if any('\u4e00' <= c <= '\u9fff' for c in text):
text = text.replace(" ", "")
segments.append(TranscriptSegment(
start=current_start,
end=word_end,
text=text,
))
# Reset for next segment
current_words = []
current_start = None
return segments
def _cleanup_temp_files(paths: list):
"""Clean up temporary files and directories."""
import shutil
for path in paths:
try:
if os.path.isdir(path):
shutil.rmtree(path, ignore_errors=True)
elif os.path.exists(path):
os.remove(path)
except Exception:
pass
def segments_to_srt(segments: List[TranscriptSegment], use_translated: bool = True) -> str:
"""Convert segments to SRT format."""
srt_lines = []
for i, seg in enumerate(segments, 1):
start_time = format_srt_time(seg.start)
end_time = format_srt_time(seg.end)
text = seg.translated if use_translated and seg.translated else seg.text
srt_lines.append(f"{i}")
srt_lines.append(f"{start_time} --> {end_time}")
srt_lines.append(text)
srt_lines.append("")
return "\n".join(srt_lines)
def format_srt_time(seconds: float) -> str:
"""Format seconds to SRT timestamp format (HH:MM:SS,mmm)."""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
millis = int((seconds % 1) * 1000)
return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"
def auto_wrap_text(text: str, max_chars: int) -> str:
"""
자동으로 긴 텍스트를 2줄로 나눔.
Args:
text: 원본 텍스트
max_chars: 줄당 최대 글자 수 (0이면 비활성화)
Returns:
줄바꿈이 적용된 텍스트 (\\N 사용)
"""
if max_chars <= 0 or len(text) <= max_chars:
return text
# 이미 수동 줄바꿈이 있으면 그대로 반환 (\N, \n, /N, /n 모두 체크)
if "\\N" in text or "\\n" in text or "/N" in text or "/n" in text:
return text
# 중간 지점 근처에서 좋은 끊김점 찾기
mid = len(text) // 2
best_break = mid
# 공백, 쉼표, 마침표 등에서 끊기 우선
break_chars = [' ', ',', '.', '!', '?', '', '', '', '', '']
# 중간점에서 가장 가까운 끊김점 찾기 (앞뒤 10자 범위)
for offset in range(min(10, mid)):
# 중간 뒤쪽 확인
if mid + offset < len(text) and text[mid + offset] in break_chars:
best_break = mid + offset + 1
break
# 중간 앞쪽 확인
if mid - offset >= 0 and text[mid - offset] in break_chars:
best_break = mid - offset + 1
break
# 끊김점이 없으면 그냥 중간에서 자르기
line1 = text[:best_break].strip()
line2 = text[best_break:].strip()
if line2:
return f"{line1}\\N{line2}"
return line1
def segments_to_ass(
segments: List[TranscriptSegment],
use_translated: bool = True,
font_size: int = 70,
font_color: str = "FFFFFF",
outline_color: str = "000000",
font_name: str = "Pretendard",
position: str = "center", # top, center, bottom
margin_v: int = 50, # 수직 위치 (0=가장자리, 100=화면 중심쪽)
outline_width: int = 4, # 아웃라인 두께 (가독성)
bold: bool = True,
shadow: int = 2, # 그림자 깊이
background_box: bool = False, # False=아웃라인 스타일 (깔끔함)
background_opacity: str = "80", # 00=transparent, FF=opaque
animation: str = "fade", # none, fade, pop
time_offset: float = 0.0, # Delay all subtitles by this amount (for intro text)
max_chars_per_line: int = 0, # 줄당 최대 글자 수 (0=비활성화, 15~20 권장)
) -> str:
"""
Convert segments to ASS format with styling.
Args:
segments: List of transcript segments
use_translated: Use translated text if available
font_size: Font size in pixels
font_color: Font color in hex (without #)
outline_color: Outline color in hex (without #)
font_name: Font family name
position: Subtitle position - "top", "center", or "bottom"
margin_v: Vertical margin (0=edge, 100=toward center) - percentage of screen height
outline_width: Outline thickness
bold: Use bold text
shadow: Shadow depth (0-4)
background_box: Show semi-transparent background box
animation: Animation type - "none", "fade", or "pop"
time_offset: Delay all subtitle timings by this amount in seconds (useful when intro text is shown)
Returns:
ASS formatted subtitle string
"""
# ASS Alignment values:
# 1=Bottom-Left, 2=Bottom-Center, 3=Bottom-Right
# 4=Middle-Left, 5=Middle-Center, 6=Middle-Right
# 7=Top-Left, 8=Top-Center, 9=Top-Right
#
# position='top'으로 고정하고 margin_v를 화면 높이의 퍼센트로 직접 사용
# margin_v=5 → 상단 5%, margin_v=95 → 하단 95%
alignment = 8 # Top-Center (상단 기준으로 margin_v 적용)
# margin_v를 화면 높이의 퍼센트로 직접 변환 (1920 높이 기준)
# margin_v=5 → 96px, margin_v=50 → 960px, margin_v=95 → 1824px
ass_margin_v = int((margin_v / 100) * 1920)
# Bold: -1 = bold, 0 = normal
bold_value = -1 if bold else 0
# BorderStyle: 1 = outline + shadow, 3 = opaque box (background)
border_style = 3 if background_box else 1
# BackColour alpha: use provided opacity or default
back_alpha = background_opacity if background_box else "80"
# ASS header
ass_content = f"""[Script Info]
Title: Shorts Maker Subtitle
ScriptType: v4.00+
PlayDepth: 0
PlayResX: 1080
PlayResY: 1920
[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
Style: Default,{font_name},{font_size},&H00{font_color},&H00FFFFFF,&H00{outline_color},&H{back_alpha}000000,{bold_value},0,0,0,100,100,0,0,{border_style},{outline_width},{shadow},{alignment},30,30,{ass_margin_v},1
[Events]
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
"""
for seg in segments:
# Apply time offset (for intro text overlay)
start_time = format_ass_time(seg.start + time_offset)
end_time = format_ass_time(seg.end + time_offset)
text = seg.translated if use_translated and seg.translated else seg.text
# 1. 자동 줄바꿈 적용 (max_chars_per_line이 설정된 경우)
if max_chars_per_line > 0:
text = auto_wrap_text(text, max_chars_per_line)
# 2. 수동 줄바꿈 처리: \N, \n, /N, /n을 모두 지원
# 사용자가 /N (슬래시)를 입력해도 동작하도록 함
text = text.replace("/N", "<<LINEBREAK>>").replace("/n", "<<LINEBREAK>>")
text = text.replace("\\N", "<<LINEBREAK>>").replace("\\n", "<<LINEBREAK>>")
# 3. Escape special characters (백슬래시, 중괄호)
text = text.replace("\\", "\\\\").replace("{", "\\{").replace("}", "\\}")
# 4. 플레이스홀더를 ASS 줄바꿈으로 복원
text = text.replace("<<LINEBREAK>>", "\\N")
# 5. Add animation effects
if animation == "fade":
# Fade in/out effect (250ms)
text = f"{{\\fad(250,250)}}{text}"
elif animation == "pop":
# Pop-in effect with scale animation
text = f"{{\\t(0,150,\\fscx110\\fscy110)\\t(150,300,\\fscx100\\fscy100)}}{text}"
ass_content += f"Dialogue: 0,{start_time},{end_time},Default,,0,0,0,,{text}\n"
return ass_content
def format_ass_time(seconds: float) -> str:
"""Format seconds to ASS timestamp format (H:MM:SS.cc)."""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
centis = int((seconds % 1) * 100)
return f"{hours}:{minutes:02d}:{secs:02d}.{centis:02d}"