Initial commit: YouTube Shorts maker application

Features:
- Video download from TikTok/Douyin using yt-dlp
- Audio transcription with OpenAI Whisper
- GPT-4 translation (direct/summarize/rewrite modes)
- Subtitle generation with ASS format
- Video trimming with frame-accurate preview
- BGM integration with volume control
- Intro text overlay support
- Thumbnail generation with text overlay

Tech stack:
- Backend: FastAPI, Python 3.11+
- Frontend: React, Vite, TailwindCSS
- Video processing: FFmpeg
- AI: OpenAI Whisper, GPT-4

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
kihong.kim
2026-01-03 21:38:34 +09:00
commit c3795138da
64 changed files with 13059 additions and 0 deletions

View File

@@ -0,0 +1,15 @@
from app.services.downloader import download_video, detect_platform, get_video_info
from app.services.transcriber import transcribe_video, segments_to_srt, segments_to_ass
from app.services.translator import (
translate_segments,
translate_single,
generate_shorts_script,
TranslationMode,
)
from app.services.video_processor import (
process_video,
get_video_duration,
extract_audio,
extract_audio_with_noise_reduction,
analyze_audio_noise_level,
)

View File

@@ -0,0 +1,317 @@
"""
Audio separation service using Demucs for vocal/music separation.
Also includes speech vs singing detection.
"""
import subprocess
import os
import shutil
from typing import Optional, Tuple
from pathlib import Path
# Demucs runs in a separate Python 3.11 environment due to compatibility issues
DEMUCS_VENV_PATH = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
"venv_demucs"
)
DEMUCS_PYTHON = os.path.join(DEMUCS_VENV_PATH, "bin", "python")
async def separate_vocals(
input_path: str,
output_dir: str,
model: str = "htdemucs"
) -> Tuple[bool, str, Optional[str], Optional[str]]:
"""
Separate vocals from background music using Demucs.
Args:
input_path: Path to input audio/video file
output_dir: Directory to save separated tracks
model: Demucs model to use (htdemucs, htdemucs_ft, mdx_extra)
Returns:
Tuple of (success, message, vocals_path, no_vocals_path)
"""
if not os.path.exists(input_path):
return False, f"Input file not found: {input_path}", None, None
os.makedirs(output_dir, exist_ok=True)
# Check if Demucs venv exists
if not os.path.exists(DEMUCS_PYTHON):
return False, f"Demucs environment not found at {DEMUCS_VENV_PATH}. Run setup script.", None, None
# Run Demucs for two-stem separation (vocals vs accompaniment)
cmd = [
DEMUCS_PYTHON, "-m", "demucs",
"--two-stems=vocals",
"-n", model,
"-o", output_dir,
input_path
]
try:
print(f"Running Demucs separation: {' '.join(cmd)}")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=600, # 10 minute timeout
)
if result.returncode != 0:
error_msg = result.stderr[-500:] if result.stderr else "Unknown error"
return False, f"Demucs error: {error_msg}", None, None
# Find output files
# Demucs outputs to: output_dir/model_name/track_name/vocals.wav, no_vocals.wav
input_name = Path(input_path).stem
demucs_output = os.path.join(output_dir, model, input_name)
vocals_path = os.path.join(demucs_output, "vocals.wav")
no_vocals_path = os.path.join(demucs_output, "no_vocals.wav")
if not os.path.exists(vocals_path):
return False, "Vocals file not created", None, None
# Move files to simpler location
final_vocals = os.path.join(output_dir, "vocals.wav")
final_no_vocals = os.path.join(output_dir, "no_vocals.wav")
shutil.move(vocals_path, final_vocals)
if os.path.exists(no_vocals_path):
shutil.move(no_vocals_path, final_no_vocals)
# Clean up Demucs output directory
shutil.rmtree(os.path.join(output_dir, model), ignore_errors=True)
return True, "Vocals separated successfully", final_vocals, final_no_vocals
except subprocess.TimeoutExpired:
return False, "Separation timed out", None, None
except FileNotFoundError:
return False, "Demucs not installed. Run: pip install demucs", None, None
except Exception as e:
return False, f"Separation error: {str(e)}", None, None
async def analyze_vocal_type(
vocals_path: str,
speech_threshold: float = 0.7
) -> Tuple[str, float]:
"""
Analyze if vocal track contains speech or singing.
Uses multiple heuristics:
1. Speech has more silence gaps (pauses between words)
2. Speech has more varied pitch changes
3. Singing has more sustained notes
Args:
vocals_path: Path to vocals audio file
speech_threshold: Threshold for speech detection (0-1)
Returns:
Tuple of (vocal_type, confidence)
vocal_type: "speech", "singing", or "mixed"
"""
if not os.path.exists(vocals_path):
return "unknown", 0.0
# Analyze silence ratio using FFmpeg
# Speech typically has 30-50% silence, singing has less
silence_ratio = await _get_silence_ratio(vocals_path)
# Analyze zero-crossing rate (speech has higher ZCR variance)
zcr_variance = await _get_zcr_variance(vocals_path)
# Analyze spectral flatness (speech has higher flatness)
spectral_score = await _get_spectral_analysis(vocals_path)
# Combine scores
speech_score = 0.0
# High silence ratio indicates speech (pauses between sentences)
if silence_ratio > 0.25:
speech_score += 0.4
elif silence_ratio > 0.15:
speech_score += 0.2
# High spectral variance indicates speech
if spectral_score > 0.5:
speech_score += 0.3
elif spectral_score > 0.3:
speech_score += 0.15
# ZCR variance
if zcr_variance > 0.5:
speech_score += 0.3
elif zcr_variance > 0.3:
speech_score += 0.15
# Determine type
# speech_threshold=0.7: High confidence speech
# singing_threshold=0.4: Below this is likely singing (music)
# Between 0.4-0.7: Mixed or uncertain
if speech_score >= speech_threshold:
return "speech", speech_score
elif speech_score < 0.4:
return "singing", 1.0 - speech_score
else:
# For mixed, lean towards singing if score is closer to lower bound
# This helps avoid transcribing song lyrics as speech
return "mixed", speech_score
async def _get_silence_ratio(audio_path: str, threshold_db: float = -35) -> float:
"""Get ratio of silence in audio file."""
cmd = [
"ffmpeg", "-i", audio_path,
"-af", f"silencedetect=noise={threshold_db}dB:d=0.3",
"-f", "null", "-"
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
stderr = result.stderr
# Count silence periods
silence_count = stderr.count("silence_end")
# Get total duration
duration = await _get_audio_duration(audio_path)
if not duration or duration == 0:
return 0.0
# Parse total silence duration
total_silence = 0.0
lines = stderr.split('\n')
for line in lines:
if 'silence_duration' in line:
try:
dur = float(line.split('silence_duration:')[1].strip().split()[0])
total_silence += dur
except (IndexError, ValueError):
pass
return min(total_silence / duration, 1.0)
except Exception:
return 0.0
async def _get_zcr_variance(audio_path: str) -> float:
"""Get zero-crossing rate variance (simplified estimation)."""
# Use FFmpeg to analyze audio stats
cmd = [
"ffmpeg", "-i", audio_path,
"-af", "astats=metadata=1:reset=1",
"-f", "null", "-"
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
stderr = result.stderr
# Look for RMS level variations as proxy for ZCR variance
rms_values = []
for line in stderr.split('\n'):
if 'RMS_level' in line:
try:
val = float(line.split(':')[1].strip().split()[0])
if val != float('-inf'):
rms_values.append(val)
except (IndexError, ValueError):
pass
if len(rms_values) > 1:
mean_rms = sum(rms_values) / len(rms_values)
variance = sum((x - mean_rms) ** 2 for x in rms_values) / len(rms_values)
# Normalize to 0-1 range
return min(variance / 100, 1.0)
return 0.3 # Default moderate value
except Exception:
return 0.3
async def _get_spectral_analysis(audio_path: str) -> float:
"""Analyze spectral characteristics (speech has more flat spectrum)."""
# Use volume detect as proxy for spectral analysis
cmd = [
"ffmpeg", "-i", audio_path,
"-af", "volumedetect",
"-f", "null", "-"
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
stderr = result.stderr
mean_vol = None
max_vol = None
for line in stderr.split('\n'):
if 'mean_volume' in line:
try:
mean_vol = float(line.split(':')[1].strip().replace(' dB', ''))
except (IndexError, ValueError):
pass
elif 'max_volume' in line:
try:
max_vol = float(line.split(':')[1].strip().replace(' dB', ''))
except (IndexError, ValueError):
pass
if mean_vol is not None and max_vol is not None:
# Large difference between mean and max indicates speech dynamics
diff = abs(max_vol - mean_vol)
# Speech typically has 15-25dB dynamic range
if diff > 20:
return 0.7
elif diff > 12:
return 0.5
else:
return 0.2
return 0.3
except Exception:
return 0.3
async def _get_audio_duration(audio_path: str) -> Optional[float]:
"""Get audio duration in seconds."""
cmd = [
"ffprobe",
"-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
audio_path
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
return float(result.stdout.strip())
except Exception:
pass
return None
async def check_demucs_available() -> bool:
"""Check if Demucs is installed in the dedicated environment."""
if not os.path.exists(DEMUCS_PYTHON):
return False
try:
result = subprocess.run(
[DEMUCS_PYTHON, "-m", "demucs", "--help"],
capture_output=True,
timeout=10
)
return result.returncode == 0
except Exception:
return False

View File

@@ -0,0 +1,495 @@
"""
BGM Provider Service - Freesound & Pixabay Integration
Freesound API: https://freesound.org/docs/api/
- 500,000+ Creative Commons licensed sounds
- Free API with generous rate limits
- Various licenses (CC0, CC-BY, CC-BY-NC, etc.)
Pixabay: Manual download recommended (no public Music API)
"""
import os
import httpx
import aiofiles
from typing import Optional, List, Tuple
from pydantic import BaseModel
from app.config import settings
class FreesoundTrack(BaseModel):
"""Freesound track model."""
id: int
name: str
duration: float # seconds
tags: List[str]
license: str
username: str
preview_url: str # HQ preview (128kbps mp3)
download_url: str # Original file (requires auth)
description: str = ""
class BGMSearchResult(BaseModel):
"""BGM search result."""
id: str
title: str
duration: int
tags: List[str]
preview_url: str
download_url: str = ""
license: str = ""
source: str = "freesound"
# Freesound license filters for commercial use
# CC0 and CC-BY are commercially usable, CC-BY-NC is NOT
COMMERCIAL_LICENSES = [
"Creative Commons 0", # CC0 - Public Domain
"Attribution", # CC-BY - Attribution required
"Attribution Noncommercial", # Exclude this (NOT commercial)
]
# License filter string for commercial-only search
COMMERCIAL_LICENSE_FILTER = 'license:"Creative Commons 0" OR license:"Attribution"'
async def search_freesound(
query: str,
min_duration: int = 10,
max_duration: int = 180, # Shorts typically < 60s, allow some buffer
page: int = 1,
page_size: int = 15,
filter_music: bool = True,
commercial_only: bool = True, # Default: only commercially usable
) -> Tuple[bool, str, List[BGMSearchResult]]:
"""
Search for sounds on Freesound API.
Args:
query: Search keywords (e.g., "upbeat music", "chill background")
min_duration: Minimum duration in seconds
max_duration: Maximum duration in seconds
page: Page number (1-indexed)
page_size: Results per page (max 150)
filter_music: Add "music" to query for better BGM results
commercial_only: Only return commercially usable licenses (CC0, CC-BY)
Returns:
Tuple of (success, message, results)
"""
api_key = settings.FREESOUND_API_KEY
if not api_key:
return False, "Freesound API key not configured. Get one at https://freesound.org/apiv2/apply", []
# Add "music" filter for better BGM results
search_query = f"{query} music" if filter_music and "music" not in query.lower() else query
# Build filter string for duration and license
filter_parts = [f"duration:[{min_duration} TO {max_duration}]"]
if commercial_only:
# Filter for commercially usable licenses only
# CC0 (Creative Commons 0) and CC-BY (Attribution) are commercial-OK
# Exclude CC-BY-NC (Noncommercial)
filter_parts.append('license:"Creative Commons 0"')
filter_str = " ".join(filter_parts)
params = {
"token": api_key,
"query": search_query,
"filter": filter_str,
"page": page,
"page_size": min(page_size, 150),
"fields": "id,name,duration,tags,license,username,previews,description",
"sort": "score", # relevance
}
try:
async with httpx.AsyncClient() as client:
response = await client.get(
"https://freesound.org/apiv2/search/text/",
params=params,
timeout=30,
)
if response.status_code == 401:
return False, "Invalid Freesound API key", []
if response.status_code != 200:
return False, f"Freesound API error: HTTP {response.status_code}", []
data = response.json()
results = []
for sound in data.get("results", []):
# Get preview URLs (prefer high quality)
previews = sound.get("previews", {})
preview_url = (
previews.get("preview-hq-mp3") or
previews.get("preview-lq-mp3") or
""
)
# Parse license for display
license_url = sound.get("license", "")
license_name = _parse_freesound_license(license_url)
results.append(BGMSearchResult(
id=str(sound["id"]),
title=sound.get("name", "Unknown"),
duration=int(sound.get("duration", 0)),
tags=sound.get("tags", [])[:10], # Limit tags
preview_url=preview_url,
download_url=f"https://freesound.org/apiv2/sounds/{sound['id']}/download/",
license=license_name,
source="freesound",
))
total = data.get("count", 0)
license_info = " (commercial use OK)" if commercial_only else ""
message = f"Found {total} sounds on Freesound{license_info}"
return True, message, results
except httpx.TimeoutException:
return False, "Freesound API timeout", []
except Exception as e:
return False, f"Freesound search error: {str(e)}", []
def _parse_freesound_license(license_url: str) -> str:
"""Parse Freesound license URL to human-readable name."""
if "zero" in license_url or "cc0" in license_url.lower():
return "CC0 (Public Domain)"
elif "by-nc" in license_url:
return "CC BY-NC (Non-Commercial)"
elif "by-sa" in license_url:
return "CC BY-SA (Share Alike)"
elif "by/" in license_url:
return "CC BY (Attribution)"
elif "sampling+" in license_url:
return "Sampling+"
else:
return "See License"
async def download_freesound(
sound_id: str,
output_dir: str,
filename: str,
) -> Tuple[bool, str, Optional[str]]:
"""
Download a sound from Freesound.
Note: Freesound requires OAuth for original file downloads.
This function downloads the HQ preview (128kbps MP3) which is sufficient for BGM.
Args:
sound_id: Freesound sound ID
output_dir: Directory to save file
filename: Output filename (without extension)
Returns:
Tuple of (success, message, file_path)
"""
api_key = settings.FREESOUND_API_KEY
if not api_key:
return False, "Freesound API key not configured", None
try:
async with httpx.AsyncClient() as client:
# First, get sound info to get preview URL
info_response = await client.get(
f"https://freesound.org/apiv2/sounds/{sound_id}/",
params={
"token": api_key,
"fields": "id,name,previews,license,username",
},
timeout=30,
)
if info_response.status_code != 200:
return False, f"Failed to get sound info: HTTP {info_response.status_code}", None
sound_data = info_response.json()
previews = sound_data.get("previews", {})
# Get high quality preview URL
preview_url = previews.get("preview-hq-mp3")
if not preview_url:
preview_url = previews.get("preview-lq-mp3")
if not preview_url:
return False, "No preview URL available", None
# Download the preview
audio_response = await client.get(preview_url, timeout=60, follow_redirects=True)
if audio_response.status_code != 200:
return False, f"Download failed: HTTP {audio_response.status_code}", None
# Save file
os.makedirs(output_dir, exist_ok=True)
file_path = os.path.join(output_dir, f"{filename}.mp3")
async with aiofiles.open(file_path, 'wb') as f:
await f.write(audio_response.content)
# Get attribution info
username = sound_data.get("username", "Unknown")
license_name = _parse_freesound_license(sound_data.get("license", ""))
return True, f"Downloaded from Freesound (by {username}, {license_name})", file_path
except httpx.TimeoutException:
return False, "Download timeout", None
except Exception as e:
return False, f"Download error: {str(e)}", None
async def search_and_download_bgm(
keywords: List[str],
output_dir: str,
max_duration: int = 120,
commercial_only: bool = True,
) -> Tuple[bool, str, Optional[str], Optional[BGMSearchResult]]:
"""
Search for BGM and download the best match.
Args:
keywords: Search keywords from BGM recommendation
output_dir: Directory to save downloaded file
max_duration: Maximum duration in seconds
commercial_only: Only search commercially usable licenses (CC0)
Returns:
Tuple of (success, message, file_path, matched_result)
"""
if not settings.FREESOUND_API_KEY:
return False, "Freesound API key not configured", None, None
# Try searching with combined keywords
query = " ".join(keywords[:3])
success, message, results = await search_freesound(
query=query,
min_duration=15,
max_duration=max_duration,
page_size=10,
commercial_only=commercial_only,
)
if not success or not results:
# Try with individual keywords
for keyword in keywords[:3]:
success, message, results = await search_freesound(
query=keyword,
min_duration=15,
max_duration=max_duration,
page_size=5,
commercial_only=commercial_only,
)
if success and results:
break
if not results:
return False, "No matching BGM found on Freesound", None, None
# Select the best result (first one, sorted by relevance)
best_match = results[0]
# Download it
safe_filename = best_match.title.lower().replace(" ", "_")[:50]
safe_filename = "".join(c for c in safe_filename if c.isalnum() or c == "_")
success, download_msg, file_path = await download_freesound(
sound_id=best_match.id,
output_dir=output_dir,
filename=safe_filename,
)
if not success:
return False, download_msg, None, best_match
return True, download_msg, file_path, best_match
async def search_pixabay_music(
query: str = "",
category: str = "",
min_duration: int = 0,
max_duration: int = 120,
page: int = 1,
per_page: int = 20,
) -> Tuple[bool, str, List[BGMSearchResult]]:
"""
Search for royalty-free music on Pixabay.
Note: Pixabay doesn't have a public Music API, returns curated list instead.
"""
# Pixabay's music API is not publicly available
# Return curated recommendations instead
return await _get_curated_bgm_list(query)
async def _get_curated_bgm_list(query: str = "") -> Tuple[bool, str, List[BGMSearchResult]]:
"""
Return curated list of recommended free BGM sources.
Since Pixabay Music API requires special access, we provide curated recommendations.
"""
# Curated BGM recommendations (these are categories/suggestions, not actual files)
curated_bgm = [
{
"id": "upbeat_energetic",
"title": "Upbeat & Energetic",
"duration": 60,
"tags": ["upbeat", "energetic", "happy", "positive"],
"description": "활기찬 쇼츠에 적합",
},
{
"id": "chill_lofi",
"title": "Chill Lo-Fi",
"duration": 60,
"tags": ["chill", "lofi", "relaxing", "calm"],
"description": "편안한 분위기의 콘텐츠",
},
{
"id": "epic_cinematic",
"title": "Epic & Cinematic",
"duration": 60,
"tags": ["epic", "cinematic", "dramatic", "intense"],
"description": "드라마틱한 순간",
},
{
"id": "funny_quirky",
"title": "Funny & Quirky",
"duration": 30,
"tags": ["funny", "quirky", "comedy", "playful"],
"description": "유머러스한 콘텐츠",
},
{
"id": "corporate_tech",
"title": "Corporate & Tech",
"duration": 60,
"tags": ["corporate", "tech", "modern", "professional"],
"description": "정보성 콘텐츠",
},
]
# Filter by query if provided
if query:
query_lower = query.lower()
filtered = [
bgm for bgm in curated_bgm
if query_lower in bgm["title"].lower()
or any(query_lower in tag for tag in bgm["tags"])
]
curated_bgm = filtered if filtered else curated_bgm
results = [
BGMSearchResult(
id=bgm["id"],
title=bgm["title"],
duration=bgm["duration"],
tags=bgm["tags"],
preview_url="", # Would be filled with actual URL
source="curated",
)
for bgm in curated_bgm
]
return True, "Curated BGM list", results
async def download_from_url(
url: str,
output_path: str,
filename: str,
) -> Tuple[bool, str, Optional[str]]:
"""
Download audio file from URL.
Args:
url: Audio file URL
output_path: Directory to save file
filename: Output filename (without extension)
Returns:
Tuple of (success, message, file_path)
"""
try:
os.makedirs(output_path, exist_ok=True)
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=60, follow_redirects=True)
if response.status_code != 200:
return False, f"Download failed: HTTP {response.status_code}", None
# Determine file extension from content-type
content_type = response.headers.get("content-type", "")
if "mpeg" in content_type:
ext = ".mp3"
elif "wav" in content_type:
ext = ".wav"
elif "ogg" in content_type:
ext = ".ogg"
else:
ext = ".mp3" # Default to mp3
file_path = os.path.join(output_path, f"{filename}{ext}")
with open(file_path, "wb") as f:
f.write(response.content)
return True, "Download complete", file_path
except Exception as e:
return False, f"Download error: {str(e)}", None
# Popular free BGM download links
FREE_BGM_SOURCES = {
"freesound": {
"name": "Freesound",
"url": "https://freesound.org/",
"license": "CC0/CC-BY/CC-BY-NC (Various)",
"description": "500,000+ CC licensed sounds, API available",
"api_available": True,
"api_url": "https://freesound.org/apiv2/apply",
},
"pixabay": {
"name": "Pixabay Music",
"url": "https://pixabay.com/music/",
"license": "Pixabay License (Free for commercial use)",
"description": "Large collection of royalty-free music",
"api_available": False,
},
"mixkit": {
"name": "Mixkit",
"url": "https://mixkit.co/free-stock-music/",
"license": "Mixkit License (Free for commercial use)",
"description": "High-quality free music tracks",
"api_available": False,
},
"uppbeat": {
"name": "Uppbeat",
"url": "https://uppbeat.io/",
"license": "Free tier: 10 tracks/month",
"description": "YouTube-friendly music",
"api_available": False,
},
"youtube_audio_library": {
"name": "YouTube Audio Library",
"url": "https://studio.youtube.com/channel/UC/music",
"license": "Free for YouTube videos",
"description": "Google's free music library",
"api_available": False,
},
}
def get_free_bgm_sources() -> dict:
"""Get list of recommended free BGM sources."""
return FREE_BGM_SOURCES

View File

@@ -0,0 +1,295 @@
"""
BGM Recommender Service
Analyzes script content and recommends appropriate BGM based on mood/tone.
Uses GPT to analyze the emotional tone and suggests matching music.
"""
import os
from typing import List, Tuple, Optional
from openai import OpenAI
from pydantic import BaseModel
from app.config import settings
from app.models.schemas import TranscriptSegment
class BGMRecommendation(BaseModel):
"""BGM recommendation result."""
mood: str # detected mood
energy: str # low, medium, high
suggested_genres: List[str]
search_keywords: List[str]
reasoning: str
matched_bgm_id: Optional[str] = None # if found in local library
# Mood to BGM mapping
MOOD_BGM_MAPPING = {
"upbeat": {
"genres": ["pop", "electronic", "dance"],
"keywords": ["upbeat", "energetic", "happy", "positive"],
"energy": "high",
},
"chill": {
"genres": ["lofi", "ambient", "acoustic"],
"keywords": ["chill", "relaxing", "calm", "peaceful"],
"energy": "low",
},
"dramatic": {
"genres": ["cinematic", "orchestral", "epic"],
"keywords": ["dramatic", "epic", "intense", "cinematic"],
"energy": "high",
},
"funny": {
"genres": ["comedy", "quirky", "playful"],
"keywords": ["funny", "quirky", "comedy", "playful"],
"energy": "medium",
},
"emotional": {
"genres": ["piano", "strings", "ballad"],
"keywords": ["emotional", "sad", "touching", "heartfelt"],
"energy": "low",
},
"informative": {
"genres": ["corporate", "background", "minimal"],
"keywords": ["corporate", "background", "tech", "modern"],
"energy": "medium",
},
"exciting": {
"genres": ["rock", "action", "sports"],
"keywords": ["exciting", "action", "sports", "adventure"],
"energy": "high",
},
"mysterious": {
"genres": ["ambient", "dark", "suspense"],
"keywords": ["mysterious", "suspense", "dark", "tension"],
"energy": "medium",
},
}
async def analyze_script_mood(
segments: List[TranscriptSegment],
use_translated: bool = True,
) -> Tuple[bool, str, Optional[BGMRecommendation]]:
"""
Analyze script content to determine mood and recommend BGM.
Args:
segments: Transcript segments (original or translated)
use_translated: Whether to use translated text
Returns:
Tuple of (success, message, recommendation)
"""
if not settings.OPENAI_API_KEY:
return False, "OpenAI API key not configured", None
if not segments:
return False, "No transcript segments provided", None
# Combine script text
script_text = "\n".join([
seg.translated if use_translated and seg.translated else seg.text
for seg in segments
])
try:
client = OpenAI(api_key=settings.OPENAI_API_KEY)
response = client.chat.completions.create(
model=settings.OPENAI_MODEL,
messages=[
{
"role": "system",
"content": """You are a music supervisor for YouTube Shorts.
Analyze the script and determine the best background music mood.
Respond in JSON format ONLY:
{
"mood": "one of: upbeat, chill, dramatic, funny, emotional, informative, exciting, mysterious",
"energy": "low, medium, or high",
"reasoning": "brief explanation in Korean (1 sentence)"
}
Consider:
- Overall emotional tone of the content
- Pacing and energy level
- Target audience engagement
- What would make viewers watch till the end"""
},
{
"role": "user",
"content": f"Script:\n{script_text}"
}
],
temperature=0.3,
max_tokens=200,
)
# Parse response
import json
result_text = response.choices[0].message.content.strip()
# Clean up JSON if wrapped in markdown
if result_text.startswith("```"):
result_text = result_text.split("```")[1]
if result_text.startswith("json"):
result_text = result_text[4:]
result = json.loads(result_text)
mood = result.get("mood", "upbeat")
energy = result.get("energy", "medium")
reasoning = result.get("reasoning", "")
# Get BGM suggestions based on mood
mood_info = MOOD_BGM_MAPPING.get(mood, MOOD_BGM_MAPPING["upbeat"])
recommendation = BGMRecommendation(
mood=mood,
energy=energy,
suggested_genres=mood_info["genres"],
search_keywords=mood_info["keywords"],
reasoning=reasoning,
)
return True, f"Mood analysis complete: {mood}", recommendation
except json.JSONDecodeError as e:
return False, f"Failed to parse mood analysis: {str(e)}", None
except Exception as e:
return False, f"Mood analysis error: {str(e)}", None
async def find_matching_bgm(
recommendation: BGMRecommendation,
available_bgm: List[dict],
) -> Optional[str]:
"""
Find a matching BGM from available library based on recommendation.
Args:
recommendation: BGM recommendation from mood analysis
available_bgm: List of available BGM info dicts with 'id' and 'name'
Returns:
BGM ID if found, None otherwise
"""
if not available_bgm:
return None
keywords = recommendation.search_keywords + [recommendation.mood]
# Score each BGM based on keyword matching
best_match = None
best_score = 0
for bgm in available_bgm:
bgm_name = bgm.get("name", "").lower()
bgm_id = bgm.get("id", "").lower()
score = 0
for keyword in keywords:
if keyword.lower() in bgm_name or keyword.lower() in bgm_id:
score += 1
if score > best_score:
best_score = score
best_match = bgm.get("id")
return best_match if best_score > 0 else None
async def recommend_bgm_for_script(
segments: List[TranscriptSegment],
available_bgm: List[dict],
use_translated: bool = True,
) -> Tuple[bool, str, Optional[BGMRecommendation]]:
"""
Complete BGM recommendation workflow:
1. Analyze script mood
2. Find matching BGM from library
3. Return recommendation with search keywords for external sources
Args:
segments: Transcript segments
available_bgm: List of available BGM in library
use_translated: Whether to use translated text
Returns:
Tuple of (success, message, recommendation with matched_bgm_id if found)
"""
# Step 1: Analyze mood
success, message, recommendation = await analyze_script_mood(
segments, use_translated
)
if not success or not recommendation:
return success, message, recommendation
# Step 2: Find matching BGM in library
matched_id = await find_matching_bgm(recommendation, available_bgm)
if matched_id:
recommendation.matched_bgm_id = matched_id
message = f"Mood: {recommendation.mood} | Matched BGM: {matched_id}"
else:
message = f"Mood: {recommendation.mood} | No local BGM matched, search with: {', '.join(recommendation.search_keywords[:3])}"
return True, message, recommendation
# Predefined BGM presets for common content types
BGM_PRESETS = {
"cooking": {
"mood": "chill",
"keywords": ["cooking", "food", "kitchen", "cozy"],
},
"fitness": {
"mood": "upbeat",
"keywords": ["workout", "fitness", "energetic", "motivation"],
},
"tutorial": {
"mood": "informative",
"keywords": ["tutorial", "tech", "corporate", "background"],
},
"comedy": {
"mood": "funny",
"keywords": ["funny", "comedy", "quirky", "playful"],
},
"travel": {
"mood": "exciting",
"keywords": ["travel", "adventure", "upbeat", "inspiring"],
},
"asmr": {
"mood": "chill",
"keywords": ["asmr", "relaxing", "ambient", "soft"],
},
"news": {
"mood": "informative",
"keywords": ["news", "corporate", "serious", "background"],
},
"gaming": {
"mood": "exciting",
"keywords": ["gaming", "electronic", "action", "intense"],
},
}
def get_preset_recommendation(content_type: str) -> Optional[BGMRecommendation]:
"""Get BGM recommendation for common content types."""
preset = BGM_PRESETS.get(content_type.lower())
if not preset:
return None
mood = preset["mood"]
mood_info = MOOD_BGM_MAPPING.get(mood, MOOD_BGM_MAPPING["upbeat"])
return BGMRecommendation(
mood=mood,
energy=mood_info["energy"],
suggested_genres=mood_info["genres"],
search_keywords=preset["keywords"],
reasoning=f"Preset for {content_type} content",
)

View File

@@ -0,0 +1,297 @@
"""
Default BGM Initializer
Downloads pre-selected royalty-free BGM tracks on first startup.
Tracks are from Kevin MacLeod (incompetech.com) - CC-BY 4.0 License.
Free for commercial use with attribution: "Kevin MacLeod (incompetech.com)"
"""
import os
import httpx
import aiofiles
import asyncio
from typing import List, Tuple, Optional
from pydantic import BaseModel
class DefaultBGM(BaseModel):
"""Default BGM track info."""
id: str
name: str
url: str
category: str
description: str
# Curated list of royalty-free BGM from Kevin MacLeod (incompetech.com)
# CC-BY 4.0 License - Free for commercial use with attribution
# Attribution: "Kevin MacLeod (incompetech.com)"
DEFAULT_BGM_TRACKS: List[DefaultBGM] = [
# === 활기찬/에너지 (Upbeat/Energetic) ===
DefaultBGM(
id="upbeat_energetic",
name="Upbeat Energetic",
url="https://incompetech.com/music/royalty-free/mp3-royaltyfree/Vivacity.mp3",
category="upbeat",
description="활기차고 에너지 넘치는 BGM - 피트니스, 챌린지 영상",
),
DefaultBGM(
id="happy_pop",
name="Happy Pop",
url="https://incompetech.com/music/royalty-free/mp3-royaltyfree/Carefree.mp3",
category="upbeat",
description="밝고 경쾌한 팝 BGM - 제품 소개, 언박싱",
),
DefaultBGM(
id="upbeat_fun",
name="Upbeat Fun",
url="https://incompetech.com/music/royalty-free/mp3-royaltyfree/Happy%20Happy%20Game%20Show.mp3",
category="upbeat",
description="신나는 게임쇼 비트 - 트렌디한 쇼츠",
),
# === 차분한/편안한 (Chill/Relaxing) ===
DefaultBGM(
id="chill_lofi",
name="Chill Lo-Fi",
url="https://incompetech.com/music/royalty-free/mp3-royaltyfree/Gymnopedie%20No%201.mp3",
category="chill",
description="차분하고 편안한 피아노 BGM - 일상, 브이로그",
),
DefaultBGM(
id="calm_piano",
name="Calm Piano",
url="https://incompetech.com/music/royalty-free/mp3-royaltyfree/Prelude%20No.%201.mp3",
category="chill",
description="잔잔한 피아노 BGM - 감성적인 콘텐츠",
),
DefaultBGM(
id="soft_ambient",
name="Soft Ambient",
url="https://incompetech.com/music/royalty-free/mp3-royaltyfree/Dreamlike.mp3",
category="chill",
description="부드러운 앰비언트 - ASMR, 수면 콘텐츠",
),
# === 유머/코미디 (Funny/Comedy) ===
DefaultBGM(
id="funny_comedy",
name="Funny Comedy",
url="https://incompetech.com/music/royalty-free/mp3-royaltyfree/Sneaky%20Snitch.mp3",
category="funny",
description="유쾌한 코미디 BGM - 코미디, 밈 영상",
),
DefaultBGM(
id="quirky_playful",
name="Quirky Playful",
url="https://incompetech.com/music/royalty-free/mp3-royaltyfree/Monkeys%20Spinning%20Monkeys.mp3",
category="funny",
description="장난스럽고 귀여운 BGM - 펫, 키즈 콘텐츠",
),
# === 드라마틱/시네마틱 (Cinematic) ===
DefaultBGM(
id="cinematic_epic",
name="Cinematic Epic",
url="https://incompetech.com/music/royalty-free/mp3-royaltyfree/Epic%20Unease.mp3",
category="cinematic",
description="웅장한 시네마틱 BGM - 리뷰, 소개 영상",
),
DefaultBGM(
id="inspirational",
name="Inspirational",
url="https://incompetech.com/music/royalty-free/mp3-royaltyfree/Hero%20Theme.mp3",
category="cinematic",
description="영감을 주는 BGM - 동기부여, 성장 콘텐츠",
),
# === 생활용품/제품 리뷰 (Lifestyle/Product) ===
DefaultBGM(
id="lifestyle_modern",
name="Lifestyle Modern",
url="https://incompetech.com/music/royalty-free/mp3-royaltyfree/Acoustic%20Breeze.mp3",
category="lifestyle",
description="모던한 라이프스타일 BGM - 제품 리뷰",
),
DefaultBGM(
id="shopping_bright",
name="Shopping Bright",
url="https://incompetech.com/music/royalty-free/mp3-royaltyfree/Pleasant%20Porridge.mp3",
category="lifestyle",
description="밝은 쇼핑 BGM - 하울, 추천 영상",
),
DefaultBGM(
id="soft_corporate",
name="Soft Corporate",
url="https://incompetech.com/music/royalty-free/mp3-royaltyfree/Laid%20Back%20Guitars.mp3",
category="lifestyle",
description="부드러운 기업형 BGM - 정보성 콘텐츠",
),
# === 어쿠스틱/감성 (Acoustic/Emotional) ===
DefaultBGM(
id="soft_acoustic",
name="Soft Acoustic",
url="https://incompetech.com/music/royalty-free/mp3-royaltyfree/Peaceful.mp3",
category="acoustic",
description="따뜻한 어쿠스틱 BGM - 요리, 일상 브이로그",
),
DefaultBGM(
id="gentle_guitar",
name="Gentle Guitar",
url="https://incompetech.com/music/royalty-free/mp3-royaltyfree/Sunflower%20Slow%20Drag.mp3",
category="acoustic",
description="잔잔한 기타 BGM - 여행, 풍경 영상",
),
# === 트렌디/일렉트로닉 (Trendy/Electronic) ===
DefaultBGM(
id="electronic_chill",
name="Electronic Chill",
url="https://incompetech.com/music/royalty-free/mp3-royaltyfree/Digital%20Lemonade.mp3",
category="electronic",
description="일렉트로닉 칠아웃 - 테크, 게임 콘텐츠",
),
DefaultBGM(
id="driving_beat",
name="Driving Beat",
url="https://incompetech.com/music/royalty-free/mp3-royaltyfree/Cipher.mp3",
category="electronic",
description="드라이빙 비트 - 스포츠, 액션 영상",
),
]
async def download_bgm_file(
url: str,
output_path: str,
timeout: int = 60,
) -> Tuple[bool, str]:
"""
Download a single BGM file.
Args:
url: Download URL
output_path: Full path to save the file
timeout: Download timeout in seconds
Returns:
Tuple of (success, message)
"""
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "audio/mpeg,audio/*;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
}
try:
async with httpx.AsyncClient(follow_redirects=True, headers=headers) as client:
response = await client.get(url, timeout=timeout)
if response.status_code != 200:
return False, f"HTTP {response.status_code}"
# Ensure directory exists
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# Save file
async with aiofiles.open(output_path, 'wb') as f:
await f.write(response.content)
return True, "Downloaded successfully"
except httpx.TimeoutException:
return False, "Download timeout"
except Exception as e:
return False, str(e)
async def initialize_default_bgm(
bgm_dir: str,
force: bool = False,
) -> Tuple[int, int, List[str]]:
"""
Initialize default BGM tracks.
Downloads default BGM tracks if not already present.
Args:
bgm_dir: Directory to save BGM files
force: Force re-download even if files exist
Returns:
Tuple of (downloaded_count, skipped_count, error_messages)
"""
os.makedirs(bgm_dir, exist_ok=True)
downloaded = 0
skipped = 0
errors = []
for track in DEFAULT_BGM_TRACKS:
output_path = os.path.join(bgm_dir, f"{track.id}.mp3")
# Skip if already exists (unless force=True)
if os.path.exists(output_path) and not force:
skipped += 1
print(f"[BGM] Skipping {track.name} (already exists)")
continue
print(f"[BGM] Downloading {track.name}...")
success, message = await download_bgm_file(track.url, output_path)
if success:
downloaded += 1
print(f"[BGM] Downloaded {track.name}")
else:
errors.append(f"{track.name}: {message}")
print(f"[BGM] Failed to download {track.name}: {message}")
return downloaded, skipped, errors
async def get_default_bgm_list() -> List[dict]:
"""
Get list of default BGM tracks with metadata.
Returns:
List of BGM info dictionaries
"""
return [
{
"id": track.id,
"name": track.name,
"category": track.category,
"description": track.description,
}
for track in DEFAULT_BGM_TRACKS
]
def check_default_bgm_status(bgm_dir: str) -> dict:
"""
Check which default BGM tracks are installed.
Args:
bgm_dir: BGM directory path
Returns:
Status dictionary with installed/missing tracks
"""
installed = []
missing = []
for track in DEFAULT_BGM_TRACKS:
file_path = os.path.join(bgm_dir, f"{track.id}.mp3")
if os.path.exists(file_path):
installed.append(track.id)
else:
missing.append(track.id)
return {
"total": len(DEFAULT_BGM_TRACKS),
"installed": len(installed),
"missing": len(missing),
"installed_ids": installed,
"missing_ids": missing,
}

View File

@@ -0,0 +1,158 @@
import subprocess
import os
import re
from typing import Optional, Tuple
from app.config import settings
def detect_platform(url: str) -> str:
"""Detect video platform from URL."""
if "douyin" in url or "iesdouyin" in url:
return "douyin"
elif "kuaishou" in url or "gifshow" in url:
return "kuaishou"
elif "bilibili" in url:
return "bilibili"
elif "youtube" in url or "youtu.be" in url:
return "youtube"
elif "tiktok" in url:
return "tiktok"
else:
return "unknown"
def sanitize_filename(filename: str) -> str:
"""Sanitize filename to be safe for filesystem."""
# Remove or replace invalid characters
filename = re.sub(r'[<>:"/\\|?*]', '_', filename)
# Limit length
if len(filename) > 100:
filename = filename[:100]
return filename
def get_cookies_path(platform: str) -> Optional[str]:
"""Get cookies file path for a platform."""
cookies_dir = os.path.join(os.path.dirname(settings.DOWNLOAD_DIR), "cookies")
# Check for platform-specific cookies first (e.g., douyin.txt)
platform_cookies = os.path.join(cookies_dir, f"{platform}.txt")
if os.path.exists(platform_cookies):
return platform_cookies
# Check for generic cookies.txt
generic_cookies = os.path.join(cookies_dir, "cookies.txt")
if os.path.exists(generic_cookies):
return generic_cookies
return None
async def download_video(url: str, job_id: str) -> Tuple[bool, str, Optional[str]]:
"""
Download video using yt-dlp.
Returns:
Tuple of (success, message, video_path)
"""
output_dir = os.path.join(settings.DOWNLOAD_DIR, job_id)
os.makedirs(output_dir, exist_ok=True)
output_template = os.path.join(output_dir, "%(title).50s.%(ext)s")
# yt-dlp command with options for Chinese platforms
cmd = [
"yt-dlp",
"--no-playlist",
"-f", "best[ext=mp4]/best",
"--merge-output-format", "mp4",
"-o", output_template,
"--no-check-certificate",
"--socket-timeout", "30",
"--retries", "3",
"--user-agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
]
platform = detect_platform(url)
# Add cookies if available (required for Douyin, Kuaishou)
cookies_path = get_cookies_path(platform)
if cookies_path:
cmd.extend(["--cookies", cookies_path])
print(f"Using cookies from: {cookies_path}")
elif platform in ["douyin", "kuaishou", "bilibili"]:
# Try to use browser cookies if no cookies file
# Priority: Chrome > Firefox > Edge
cmd.extend(["--cookies-from-browser", "chrome"])
print(f"Using cookies from Chrome browser for {platform}")
# Platform-specific options
if platform in ["douyin", "kuaishou"]:
# Use browser impersonation for anti-bot bypass
cmd.extend([
"--impersonate", "chrome-123:macos-14",
"--extractor-args", "generic:impersonate",
])
# Add proxy if configured (for geo-restricted platforms)
if settings.PROXY_URL:
cmd.extend(["--proxy", settings.PROXY_URL])
print(f"Using proxy: {settings.PROXY_URL}")
cmd.append(url)
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=300, # 5 minute timeout
)
if result.returncode != 0:
error_msg = result.stderr or result.stdout or "Unknown error"
return False, f"Download failed: {error_msg}", None
# Find the downloaded file
for file in os.listdir(output_dir):
if file.endswith((".mp4", ".webm", ".mkv")):
video_path = os.path.join(output_dir, file)
return True, "Download successful", video_path
return False, "No video file found after download", None
except subprocess.TimeoutExpired:
return False, "Download timed out (5 minutes)", None
except Exception as e:
return False, f"Download error: {str(e)}", None
def get_video_info(url: str) -> Optional[dict]:
"""Get video metadata without downloading."""
cmd = [
"yt-dlp",
"-j", # JSON output
"--no-download",
]
# Add proxy if configured
if settings.PROXY_URL:
cmd.extend(["--proxy", settings.PROXY_URL])
cmd.append(url)
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=60,
)
if result.returncode == 0:
import json
return json.loads(result.stdout)
except Exception:
pass
return None

View File

@@ -0,0 +1,399 @@
"""
Thumbnail Generator Service
Generates YouTube Shorts thumbnails with:
1. Frame extraction from video
2. GPT-generated catchphrase
3. Text overlay with styling
"""
import os
import subprocess
import asyncio
from typing import Optional, Tuple, List
from openai import OpenAI
from PIL import Image, ImageDraw, ImageFont
from app.config import settings
from app.models.schemas import TranscriptSegment
def get_openai_client() -> OpenAI:
"""Get OpenAI client."""
return OpenAI(api_key=settings.OPENAI_API_KEY)
async def extract_frame(
video_path: str,
output_path: str,
timestamp: float = 2.0,
) -> Tuple[bool, str]:
"""
Extract a single frame from video.
Args:
video_path: Path to video file
output_path: Path to save thumbnail image
timestamp: Time in seconds to extract frame
Returns:
Tuple of (success, message)
"""
try:
cmd = [
"ffmpeg", "-y",
"-ss", str(timestamp),
"-i", video_path,
"-vframes", "1",
"-q:v", "2", # High quality JPEG
output_path
]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
_, stderr = await process.communicate()
if process.returncode != 0:
return False, f"FFmpeg error: {stderr.decode()[:200]}"
if not os.path.exists(output_path):
return False, "Frame extraction failed - no output file"
return True, "Frame extracted successfully"
except Exception as e:
return False, f"Frame extraction error: {str(e)}"
async def generate_catchphrase(
transcript: List[TranscriptSegment],
style: str = "homeshopping",
) -> Tuple[bool, str, str]:
"""
Generate a catchy thumbnail text using GPT.
Args:
transcript: List of transcript segments (with translations)
style: Style of catchphrase (homeshopping, viral, informative)
Returns:
Tuple of (success, message, catchphrase)
"""
if not settings.OPENAI_API_KEY:
return False, "OpenAI API key not configured", ""
try:
client = get_openai_client()
# Combine translated text
if transcript and transcript[0].translated:
full_text = " ".join([seg.translated for seg in transcript if seg.translated])
else:
full_text = " ".join([seg.text for seg in transcript])
style_guides = {
"homeshopping": """홈쇼핑 스타일의 임팩트 있는 문구를 만드세요.
- "이거 하나면 끝!" 같은 강렬한 어필
- 혜택/효과 강조
- 숫자 활용 (예: "10초만에", "50% 절약")
- 질문형도 OK (예: "아직도 힘들게?")""",
"viral": """바이럴 쇼츠 스타일의 호기심 유발 문구를 만드세요.
- 궁금증 유발
- 반전/놀라움 암시
- 이모지 1-2개 사용 가능""",
"informative": """정보성 콘텐츠 스타일의 명확한 문구를 만드세요.
- 핵심 정보 전달
- 간결하고 명확하게""",
}
style_guide = style_guides.get(style, style_guides["homeshopping"])
system_prompt = f"""당신은 YouTube Shorts 썸네일 문구 전문가입니다.
{style_guide}
규칙:
- 반드시 15자 이내!
- 한 줄로 작성
- 한글만 사용 (영어/한자 금지)
- 출력은 문구만! (설명 없이)
예시 출력:
이거 하나면 끝!
10초면 완성!
아직도 힘들게?
진짜 이게 돼요?"""
response = client.chat.completions.create(
model=settings.OPENAI_MODEL,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"다음 영상 내용으로 썸네일 문구를 만들어주세요:\n\n{full_text[:500]}"}
],
temperature=0.8,
max_tokens=50,
)
catchphrase = response.choices[0].message.content.strip()
# Clean up
catchphrase = catchphrase.strip('"\'""''')
# Ensure max length
if len(catchphrase) > 20:
catchphrase = catchphrase[:20]
return True, "Catchphrase generated", catchphrase
except Exception as e:
return False, f"GPT error: {str(e)}", ""
def add_text_overlay(
image_path: str,
output_path: str,
text: str,
font_size: int = 80,
font_color: str = "#FFFFFF",
stroke_color: str = "#000000",
stroke_width: int = 4,
position: str = "center",
font_name: str = "NanumGothicBold",
) -> Tuple[bool, str]:
"""
Add text overlay to image using PIL.
Args:
image_path: Input image path
output_path: Output image path
text: Text to overlay
font_size: Font size in pixels
font_color: Text color (hex)
stroke_color: Outline color (hex)
stroke_width: Outline thickness
position: Text position (top, center, bottom)
font_name: Font family name
Returns:
Tuple of (success, message)
"""
try:
# Open image
img = Image.open(image_path)
draw = ImageDraw.Draw(img)
img_width, img_height = img.size
# Maximum text width (90% of image width)
max_text_width = int(img_width * 0.9)
# Try to load font
def load_font(size):
font_paths = [
f"/usr/share/fonts/truetype/nanum/{font_name}.ttf",
f"/usr/share/fonts/opentype/nanum/{font_name}.otf",
f"/System/Library/Fonts/{font_name}.ttf",
f"/Library/Fonts/{font_name}.ttf",
f"~/Library/Fonts/{font_name}.ttf",
f"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
]
for path in font_paths:
expanded_path = os.path.expanduser(path)
if os.path.exists(expanded_path):
try:
return ImageFont.truetype(expanded_path, size)
except:
continue
return None
font = load_font(font_size)
if font is None:
font = ImageFont.load_default()
font_size = 40
# Check text width and adjust if necessary
bbox = draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
lines = [text]
if text_width > max_text_width:
# Try splitting into 2 lines first
mid = len(text) // 2
# Find best split point near middle (at space or comma if exists)
split_pos = mid
for i in range(mid, max(0, mid - 5), -1):
if text[i] in ' ,、,':
split_pos = i + 1
break
for i in range(mid, min(len(text), mid + 5)):
if text[i] in ' ,、,':
split_pos = i + 1
break
# Split text into 2 lines
line1 = text[:split_pos].strip()
line2 = text[split_pos:].strip()
lines = [line1, line2] if line2 else [line1]
# Check if 2-line version fits
max_line_width = max(
draw.textbbox((0, 0), line, font=font)[2] - draw.textbbox((0, 0), line, font=font)[0]
for line in lines
)
# If still too wide, reduce font size
while max_line_width > max_text_width and font_size > 40:
font_size -= 5
font = load_font(font_size)
if font is None:
font = ImageFont.load_default()
break
max_line_width = max(
draw.textbbox((0, 0), line, font=font)[2] - draw.textbbox((0, 0), line, font=font)[0]
for line in lines
)
# Calculate total text height for multi-line
line_height = font_size + 10
total_height = line_height * len(lines)
# Calculate starting y position
if position == "top":
start_y = img_height // 6
elif position == "bottom":
start_y = img_height - img_height // 4 - total_height
else: # center
start_y = (img_height - total_height) // 2
# Convert hex colors to RGB
def hex_to_rgb(hex_color):
hex_color = hex_color.lstrip('#')
return tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4))
text_rgb = hex_to_rgb(font_color)
stroke_rgb = hex_to_rgb(stroke_color)
# Draw each line
for i, line in enumerate(lines):
bbox = draw.textbbox((0, 0), line, font=font)
line_width = bbox[2] - bbox[0]
# Account for left bearing (bbox[0]) to prevent first character cut-off
# Some fonts/characters have non-zero left offset
x = (img_width - line_width) // 2 - bbox[0]
y = start_y + i * line_height
# Draw text with stroke (outline)
for dx in range(-stroke_width, stroke_width + 1):
for dy in range(-stroke_width, stroke_width + 1):
if dx != 0 or dy != 0:
draw.text((x + dx, y + dy), line, font=font, fill=stroke_rgb)
# Draw main text
draw.text((x, y), line, font=font, fill=text_rgb)
# Save
img.save(output_path, "JPEG", quality=95)
return True, "Text overlay added"
except Exception as e:
return False, f"Text overlay error: {str(e)}"
async def generate_thumbnail(
job_id: str,
video_path: str,
transcript: List[TranscriptSegment],
timestamp: float = 2.0,
style: str = "homeshopping",
custom_text: Optional[str] = None,
font_size: int = 80,
position: str = "center",
) -> Tuple[bool, str, Optional[str]]:
"""
Generate a complete thumbnail with text overlay.
Args:
job_id: Job ID for naming
video_path: Path to video file
transcript: Transcript segments
timestamp: Time to extract frame
style: Catchphrase style
custom_text: Custom text (skip GPT generation)
font_size: Font size
position: Text position
Returns:
Tuple of (success, message, thumbnail_path)
"""
# Paths
frame_path = os.path.join(settings.PROCESSED_DIR, f"{job_id}_frame.jpg")
thumbnail_path = os.path.join(settings.PROCESSED_DIR, f"{job_id}_thumbnail.jpg")
# Step 1: Extract frame
success, msg = await extract_frame(video_path, frame_path, timestamp)
if not success:
return False, msg, None
# Step 2: Generate or use custom text
if custom_text:
catchphrase = custom_text
else:
success, msg, catchphrase = await generate_catchphrase(transcript, style)
if not success:
# Fallback: use first translation
catchphrase = transcript[0].translated if transcript and transcript[0].translated else "확인해보세요!"
# Step 3: Add text overlay
success, msg = add_text_overlay(
frame_path,
thumbnail_path,
catchphrase,
font_size=font_size,
position=position,
)
if not success:
return False, msg, None
# Cleanup frame
if os.path.exists(frame_path):
os.remove(frame_path)
return True, f"Thumbnail generated: {catchphrase}", thumbnail_path
async def get_video_timestamps(video_path: str, count: int = 5) -> List[float]:
"""
Get evenly distributed timestamps from video for thumbnail selection.
Args:
video_path: Path to video
count: Number of timestamps to return
Returns:
List of timestamps in seconds
"""
try:
cmd = [
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
video_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
duration = float(result.stdout.strip())
# Generate evenly distributed timestamps (skip first and last 10%)
start = duration * 0.1
end = duration * 0.9
step = (end - start) / (count - 1) if count > 1 else 0
timestamps = [start + i * step for i in range(count)]
return timestamps
except Exception:
return [1.0, 3.0, 5.0, 7.0, 10.0] # Fallback

View File

@@ -0,0 +1,421 @@
import whisper
import asyncio
import os
from typing import List, Optional, Tuple
from app.models.schemas import TranscriptSegment
from app.config import settings
# Global model cache
_model = None
def get_whisper_model():
"""Load Whisper model (cached)."""
global _model
if _model is None:
print(f"Loading Whisper model: {settings.WHISPER_MODEL}")
_model = whisper.load_model(settings.WHISPER_MODEL)
return _model
async def check_audio_availability(video_path: str) -> Tuple[bool, str]:
"""
Check if video has usable audio for transcription.
Returns:
Tuple of (has_audio, message)
"""
from app.services.video_processor import has_audio_stream, get_audio_volume_info, is_audio_silent
# Check if audio stream exists
if not await has_audio_stream(video_path):
return False, "no_audio_stream"
# Check if audio is silent
volume_info = await get_audio_volume_info(video_path)
if is_audio_silent(volume_info):
return False, "audio_silent"
return True, "audio_ok"
async def transcribe_video(
video_path: str,
use_noise_reduction: bool = True,
noise_reduction_level: str = "medium",
use_vocal_separation: bool = False,
progress_callback: Optional[callable] = None,
) -> Tuple[bool, str, Optional[List[TranscriptSegment]]]:
"""
Transcribe video audio using Whisper.
Args:
video_path: Path to video file
use_noise_reduction: Whether to apply noise reduction before transcription
noise_reduction_level: "light", "medium", or "heavy"
use_vocal_separation: Whether to separate vocals from background music first
progress_callback: Optional async callback function(step: str, progress: int) for progress updates
Returns:
Tuple of (success, message, segments, detected_language)
- success=False with message="NO_AUDIO" means video has no audio
- success=False with message="SILENT_AUDIO" means audio is too quiet
- success=False with message="SINGING_ONLY" means only singing detected (no speech)
"""
# Helper to call progress callback if provided
async def report_progress(step: str, progress: int):
print(f"[Transcriber] report_progress: {step} ({progress}%), has_callback: {progress_callback is not None}")
if progress_callback:
await progress_callback(step, progress)
if not os.path.exists(video_path):
return False, f"Video file not found: {video_path}", None, None
# Check audio availability
has_audio, audio_status = await check_audio_availability(video_path)
if not has_audio:
if audio_status == "no_audio_stream":
return False, "NO_AUDIO", None, None
elif audio_status == "audio_silent":
return False, "SILENT_AUDIO", None, None
audio_path = video_path # Default to video path (Whisper can handle it)
temp_files = [] # Track temp files for cleanup
try:
video_dir = os.path.dirname(video_path)
# Step 1: Vocal separation (if enabled)
if use_vocal_separation:
from app.services.audio_separator import separate_vocals, analyze_vocal_type
await report_progress("vocal_separation", 15)
print("Separating vocals from background music...")
separation_dir = os.path.join(video_dir, "separated")
success, message, vocals_path, _ = await separate_vocals(
video_path,
separation_dir
)
if success and vocals_path:
print(f"Vocal separation complete: {vocals_path}")
temp_files.append(separation_dir)
# Analyze if vocals are speech or singing
print("Analyzing vocal type (speech vs singing)...")
vocal_type, confidence = await analyze_vocal_type(vocals_path)
print(f"Vocal analysis: {vocal_type} (confidence: {confidence:.2f})")
# Treat as singing if:
# 1. Explicitly detected as singing
# 2. Mixed with low confidence (< 0.6) - likely music, not clear speech
if vocal_type == "singing" or (vocal_type == "mixed" and confidence < 0.6):
# Only singing/music detected - no clear speech to transcribe
_cleanup_temp_files(temp_files)
reason = "SINGING_ONLY" if vocal_type == "singing" else "MUSIC_DOMINANT"
print(f"No clear speech detected ({reason}), awaiting manual subtitle")
return False, "SINGING_ONLY", None, None
# Use vocals for transcription
audio_path = vocals_path
else:
print(f"Vocal separation failed: {message}, continuing with original audio")
# Step 2: Apply noise reduction (if enabled and not using separated vocals)
if use_noise_reduction and audio_path == video_path:
from app.services.video_processor import extract_audio_with_noise_reduction
await report_progress("extracting_audio", 20)
cleaned_path = os.path.join(video_dir, "audio_cleaned.wav")
await report_progress("noise_reduction", 25)
print(f"Applying {noise_reduction_level} noise reduction...")
success, message = await extract_audio_with_noise_reduction(
video_path,
cleaned_path,
noise_reduction_level
)
if success:
print(f"Noise reduction complete: {message}")
audio_path = cleaned_path
temp_files.append(cleaned_path)
else:
print(f"Noise reduction failed: {message}, falling back to original audio")
# Step 3: Transcribe with Whisper
await report_progress("transcribing", 35)
model = get_whisper_model()
print(f"Transcribing audio: {audio_path}")
# Run Whisper in thread pool to avoid blocking the event loop
result = await asyncio.to_thread(
model.transcribe,
audio_path,
task="transcribe",
language=None, # Auto-detect
verbose=False,
word_timestamps=True,
)
# Split long segments using word-level timestamps
segments = _split_segments_by_words(
result.get("segments", []),
max_duration=2.0, # Maximum segment duration in seconds (shorter for better sync)
min_words=1, # Minimum words per segment
)
# Clean up temp files
_cleanup_temp_files(temp_files)
detected_lang = result.get("language", "unknown")
print(f"Detected language: {detected_lang}")
extras = []
if use_vocal_separation:
extras.append("vocal separation")
if use_noise_reduction:
extras.append(f"noise reduction: {noise_reduction_level}")
extra_info = f" ({', '.join(extras)})" if extras else ""
# Return tuple with 4 elements: success, message, segments, detected_language
return True, f"Transcription complete (detected: {detected_lang}){extra_info}", segments, detected_lang
except Exception as e:
_cleanup_temp_files(temp_files)
return False, f"Transcription error: {str(e)}", None, None
def _split_segments_by_words(
raw_segments: list,
max_duration: float = 4.0,
min_words: int = 2,
) -> List[TranscriptSegment]:
"""
Split long Whisper segments into shorter ones using word-level timestamps.
Args:
raw_segments: Raw segments from Whisper output
max_duration: Maximum duration for each segment in seconds
min_words: Minimum words per segment (to avoid single-word segments)
Returns:
List of TranscriptSegment with shorter durations
"""
segments = []
for seg in raw_segments:
words = seg.get("words", [])
seg_text = seg.get("text", "").strip()
seg_start = seg.get("start", 0)
seg_end = seg.get("end", 0)
seg_duration = seg_end - seg_start
# If no word timestamps or segment is short enough, use as-is
if not words or seg_duration <= max_duration:
segments.append(TranscriptSegment(
start=seg_start,
end=seg_end,
text=seg_text,
))
continue
# Split segment using word timestamps
current_words = []
current_start = None
for i, word in enumerate(words):
word_start = word.get("start", seg_start)
word_end = word.get("end", seg_end)
word_text = word.get("word", "").strip()
if not word_text:
continue
# Start a new segment
if current_start is None:
current_start = word_start
current_words.append(word_text)
current_duration = word_end - current_start
# Check if we should split here
is_last_word = (i == len(words) - 1)
should_split = False
if is_last_word:
should_split = True
elif current_duration >= max_duration and len(current_words) >= min_words:
should_split = True
elif current_duration >= max_duration * 0.5:
# Split at natural break points (punctuation) more aggressively
if word_text.endswith((',', '.', '!', '?', '', '', '', '', '', '', ';')):
should_split = True
elif current_duration >= 1.0 and word_text.endswith(('', '', '', '.', '!', '?')):
# Always split at sentence endings if we have at least 1 second of content
should_split = True
if should_split and current_words:
# Create segment
text = " ".join(current_words)
# For Chinese/Japanese, remove spaces between words
if any('\u4e00' <= c <= '\u9fff' for c in text):
text = text.replace(" ", "")
segments.append(TranscriptSegment(
start=current_start,
end=word_end,
text=text,
))
# Reset for next segment
current_words = []
current_start = None
return segments
def _cleanup_temp_files(paths: list):
"""Clean up temporary files and directories."""
import shutil
for path in paths:
try:
if os.path.isdir(path):
shutil.rmtree(path, ignore_errors=True)
elif os.path.exists(path):
os.remove(path)
except Exception:
pass
def segments_to_srt(segments: List[TranscriptSegment], use_translated: bool = True) -> str:
"""Convert segments to SRT format."""
srt_lines = []
for i, seg in enumerate(segments, 1):
start_time = format_srt_time(seg.start)
end_time = format_srt_time(seg.end)
text = seg.translated if use_translated and seg.translated else seg.text
srt_lines.append(f"{i}")
srt_lines.append(f"{start_time} --> {end_time}")
srt_lines.append(text)
srt_lines.append("")
return "\n".join(srt_lines)
def format_srt_time(seconds: float) -> str:
"""Format seconds to SRT timestamp format (HH:MM:SS,mmm)."""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
millis = int((seconds % 1) * 1000)
return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"
def segments_to_ass(
segments: List[TranscriptSegment],
use_translated: bool = True,
font_size: int = 28,
font_color: str = "FFFFFF",
outline_color: str = "000000",
font_name: str = "NanumGothic",
position: str = "bottom", # top, center, bottom
outline_width: int = 3,
bold: bool = True,
shadow: int = 1,
background_box: bool = True,
background_opacity: str = "E0", # 00=transparent, FF=opaque
animation: str = "none", # none, fade, pop
time_offset: float = 0.0, # Delay all subtitles by this amount (for intro text)
) -> str:
"""
Convert segments to ASS format with styling.
Args:
segments: List of transcript segments
use_translated: Use translated text if available
font_size: Font size in pixels
font_color: Font color in hex (without #)
outline_color: Outline color in hex (without #)
font_name: Font family name
position: Subtitle position - "top", "center", or "bottom"
outline_width: Outline thickness
bold: Use bold text
shadow: Shadow depth (0-4)
background_box: Show semi-transparent background box
animation: Animation type - "none", "fade", or "pop"
time_offset: Delay all subtitle timings by this amount in seconds (useful when intro text is shown)
Returns:
ASS formatted subtitle string
"""
# ASS Alignment values:
# 1=Bottom-Left, 2=Bottom-Center, 3=Bottom-Right
# 4=Middle-Left, 5=Middle-Center, 6=Middle-Right
# 7=Top-Left, 8=Top-Center, 9=Top-Right
alignment_map = {
"top": 8, # Top-Center
"center": 5, # Middle-Center (영상 가운데)
"bottom": 2, # Bottom-Center (기본값)
}
alignment = alignment_map.get(position, 2)
# Adjust margin based on position (낮은 값 = 화면 가장자리에 더 가까움)
# 원본 자막을 덮기 위해 하단 마진을 작게 설정
margin_v = 30 if position == "bottom" else (100 if position == "top" else 10)
# Bold: -1 = bold, 0 = normal
bold_value = -1 if bold else 0
# BorderStyle: 1 = outline + shadow, 3 = opaque box (background)
border_style = 3 if background_box else 1
# BackColour alpha: use provided opacity or default
back_alpha = background_opacity if background_box else "80"
# ASS header
ass_content = f"""[Script Info]
Title: Shorts Maker Subtitle
ScriptType: v4.00+
PlayDepth: 0
PlayResX: 1080
PlayResY: 1920
[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
Style: Default,{font_name},{font_size},&H00{font_color},&H00FFFFFF,&H00{outline_color},&H{back_alpha}000000,{bold_value},0,0,0,100,100,0,0,{border_style},{outline_width},{shadow},{alignment},30,30,{margin_v},1
[Events]
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
"""
for seg in segments:
# Apply time offset (for intro text overlay)
start_time = format_ass_time(seg.start + time_offset)
end_time = format_ass_time(seg.end + time_offset)
text = seg.translated if use_translated and seg.translated else seg.text
# Escape special characters
text = text.replace("\\", "\\\\").replace("{", "\\{").replace("}", "\\}")
# Add animation effects
if animation == "fade":
# Fade in/out effect (250ms)
text = f"{{\\fad(250,250)}}{text}"
elif animation == "pop":
# Pop-in effect with scale animation
text = f"{{\\t(0,150,\\fscx110\\fscy110)\\t(150,300,\\fscx100\\fscy100)}}{text}"
ass_content += f"Dialogue: 0,{start_time},{end_time},Default,,0,0,0,,{text}\n"
return ass_content
def format_ass_time(seconds: float) -> str:
"""Format seconds to ASS timestamp format (H:MM:SS.cc)."""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
centis = int((seconds % 1) * 100)
return f"{hours}:{minutes:02d}:{secs:02d}.{centis:02d}"

View File

@@ -0,0 +1,468 @@
import re
from typing import List, Tuple, Optional
from openai import OpenAI
from app.models.schemas import TranscriptSegment
from app.config import settings
def get_openai_client() -> OpenAI:
"""Get OpenAI client."""
return OpenAI(api_key=settings.OPENAI_API_KEY)
class TranslationMode:
"""Translation mode options."""
DIRECT = "direct" # 직접 번역 (원본 구조 유지)
SUMMARIZE = "summarize" # 요약 후 번역
REWRITE = "rewrite" # 요약 + 한글 대본 재작성
async def shorten_text(client: OpenAI, text: str, max_chars: int) -> str:
"""
Shorten a Korean text to fit within character limit.
Args:
client: OpenAI client
text: Text to shorten
max_chars: Maximum character count
Returns:
Shortened text
"""
try:
response = client.chat.completions.create(
model=settings.OPENAI_MODEL,
messages=[
{
"role": "system",
"content": f"""한국어 자막을 {max_chars}자 이내로 줄이세요.
규칙:
- 반드시 {max_chars}자 이하!
- 핵심 의미만 유지
- 자연스러운 한국어
- 존댓말 유지
- 출력은 줄인 문장만!
예시:
입력: "요리할 때마다 한 시간이 걸리셨죠?" (18자)
제한: 10자
출력: "시간 오래 걸리죠" (8자)
입력: "채소 다듬는 데만 30분 걸리셨죠" (16자)
제한: 10자
출력: "채소만 30분" (6자)"""
},
{
"role": "user",
"content": f"입력: \"{text}\" ({len(text)}자)\n제한: {max_chars}\n출력:"
}
],
temperature=0.3,
max_tokens=50,
)
shortened = response.choices[0].message.content.strip()
# Remove quotes, parentheses, and extra characters
shortened = shortened.strip('"\'""''')
# Remove any trailing parenthetical notes like "(10자)"
shortened = re.sub(r'\s*\([^)]*자\)\s*$', '', shortened)
shortened = re.sub(r'\s*\(\d+자\)\s*$', '', shortened)
# Remove any remaining quotes
shortened = shortened.replace('"', '').replace('"', '').replace('"', '')
shortened = shortened.replace("'", '').replace("'", '').replace("'", '')
shortened = shortened.strip()
# If still too long, truncate cleanly
if len(shortened) > max_chars:
shortened = shortened[:max_chars]
return shortened
except Exception as e:
# Fallback: simple truncation
if len(text) > max_chars:
return text[:max_chars-1] + ""
return text
async def translate_segments(
segments: List[TranscriptSegment],
target_language: str = "Korean",
mode: str = TranslationMode.DIRECT,
max_tokens: Optional[int] = None,
) -> Tuple[bool, str, List[TranscriptSegment]]:
"""
Translate transcript segments to target language using OpenAI.
Args:
segments: List of transcript segments
target_language: Target language for translation
mode: Translation mode (direct, summarize, rewrite)
max_tokens: Maximum output tokens (for cost control)
Returns:
Tuple of (success, message, translated_segments)
"""
if not settings.OPENAI_API_KEY:
return False, "OpenAI API key not configured", segments
try:
client = get_openai_client()
# Batch translate for efficiency
texts = [seg.text for seg in segments]
combined_text = "\n---\n".join(texts)
# Calculate video duration for context
total_duration = segments[-1].end if segments else 0
# Calculate segment info for length guidance
segment_info = []
for i, seg in enumerate(segments):
duration = seg.end - seg.start
max_chars = int(duration * 5) # ~5 Korean chars per second (stricter for better sync)
segment_info.append(f"[{i+1}] {duration:.1f}초 = 최대 {max_chars}자 (엄수!)")
# Get custom prompt settings from config
gpt_role = settings.GPT_ROLE or "친근한 유튜브 쇼츠 자막 작가"
gpt_tone = settings.GPT_TONE or "존댓말"
gpt_style = settings.GPT_STYLE or ""
# Tone examples
tone_examples = {
"존댓말": '~해요, ~이에요, ~하죠',
"반말": '~해, ~야, ~지',
"격식체": '~합니다, ~입니다',
}
tone_example = tone_examples.get(gpt_tone, tone_examples["존댓말"])
# Additional style instruction
style_instruction = f"\n6. Style: {gpt_style}" if gpt_style else ""
# Select prompt based on mode
if mode == TranslationMode.REWRITE:
# Build indexed timeline input with Chinese text
# Use segment numbers to handle duplicate timestamps
timeline_input = []
for i, seg in enumerate(segments):
mins = int(seg.start // 60)
secs = int(seg.start % 60)
timeline_input.append(f"[{i+1}] {mins}:{secs:02d} {seg.text}")
system_prompt = f"""당신은 생활용품 유튜브 쇼츠 자막 작가입니다.
중국어 원문의 "의미"만 참고하여, 한국인이 직접 말하는 것처럼 자연스러운 자막을 작성하세요.
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
🎯 핵심 원칙: 번역이 아니라 "재창작"
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
✅ 필수 규칙:
1. 한 문장 = 한 가지 정보 (두 개 이상 금지)
2. 중복 표현 절대 금지 ("편해요"가 이미 나왔으면 다시 안 씀)
3. {gpt_tone} 사용 ({tone_example})
4. 세그먼트 수 유지: 입력 {len(segments)}개 → 출력 {len(segments)}
5. 중국어 한자 금지, 순수 한글만
❌ 금지 표현 (번역투):
- "~할 수 있어요""~돼요", "~됩니다"
- "매우/아주/정말" 남용 → 꼭 필요할 때만
- "그것은/이것은""이거", "이건"
- "~하는 것이" → 직접 표현으로
- "편리해요/편해요" 반복 → 한 번만, 이후 다른 표현
- "좋아요/좋고요" 반복 → 구체적 장점으로 대체
🎵 쇼츠 리듬감:
- 짧게 끊어서
- 한 호흡에 하나씩
- 시청자가 따라 읽을 수 있게
📝 좋은 예시:
원문: "이 작은 박스 디자인이 참 좋네요. 평소에 씨앗 먹을 때 간편하게 먹을 수 있어요."
❌ 나쁜 번역: "이 작은 박스 디자인이 참 좋네요. 평소에 씨앗 먹을 때 간편하게 먹을 수 있어요."
✅ 좋은 재창작: "이 작은 박스, 생각보다 정말 잘 만들었어요."
원문: "테이블에 두거나 손에 들고 사용하기에도 좋고요. 침대에 누워서나 사무실에서도 간식이나 과일 먹기 정말 편해요."
❌ 나쁜 번역: "테이블에 두거나 손에 들고 사용하기에도 좋고요. 침대에 누워서나 사무실에서도 간식이나 과일 먹기 정말 편해요."
✅ 좋은 재창작 (2개로 분리):
- "테이블 위에서도, 침대에서도, 사무실에서도 사용하기 좋고"
- "과일 씻고 물기 빼는 데도 활용 가능합니다."
원문: "가정에서 필수 아이템이에요. 정말 유용하죠. 꼭 하나씩 가져야 할 제품이에요."
❌ 나쁜 번역: 그대로 3문장
✅ 좋은 재창작: "집에 하나 있으면 은근히 자주 쓰게 됩니다."{style_instruction}
출력 형식:
[번호] 시간 자막 내용
⚠️ 입력과 동일한 세그먼트 수({len(segments)}개)를 출력하세요!
⚠️ 각 [번호]는 입력과 1:1 대응해야 합니다!"""
# Use indexed timeline format for user content
combined_text = "[중국어 원문]\n\n" + "\n".join(timeline_input)
elif mode == TranslationMode.SUMMARIZE:
system_prompt = f"""You are: {gpt_role}
Task: Translate Chinese to SHORT Korean subtitles.
Length limits (자막 싱크!):
{chr(10).join(segment_info)}
Rules:
1. Use {gpt_tone} ({tone_example})
2. Summarize to core meaning - be BRIEF
3. Max one short sentence per segment
4. {len(segments)} segments separated by '---'{style_instruction}"""
else: # DIRECT mode
system_prompt = f"""You are: {gpt_role}
Task: Translate Chinese to Korean subtitles.
Length limits (자막 싱크!):
{chr(10).join(segment_info)}
Rules:
1. Use {gpt_tone} ({tone_example})
2. Keep translations SHORT and readable
3. {len(segments)} segments separated by '---'{style_instruction}"""
# Build API request
request_params = {
"model": settings.OPENAI_MODEL,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": combined_text}
],
"temperature": 0.65 if mode == TranslationMode.REWRITE else 0.3,
}
# Add max_tokens if specified (for cost control)
effective_max_tokens = max_tokens or settings.TRANSLATION_MAX_TOKENS
if effective_max_tokens:
# Use higher token limit for REWRITE mode
if mode == TranslationMode.REWRITE:
request_params["max_tokens"] = max(effective_max_tokens, 700)
else:
request_params["max_tokens"] = effective_max_tokens
response = client.chat.completions.create(**request_params)
translated_text = response.choices[0].message.content
# Parse based on mode
if mode == TranslationMode.REWRITE:
# Parse indexed timeline format: "[1] 0:00 자막\n[2] 0:02 자막\n..."
indexed_pattern = re.compile(r'^\[(\d+)\]\s*\d+:\d{2}\s+(.+)$', re.MULTILINE)
matches = indexed_pattern.findall(translated_text)
# Create mapping from segment index to translation
translations_by_index = {}
for idx, text in matches:
translations_by_index[int(idx)] = text.strip()
# Map translations back to segments by index (1-based)
for i, seg in enumerate(segments):
seg_num = i + 1 # 1-based index
if seg_num in translations_by_index:
seg.translated = translations_by_index[seg_num]
else:
# No matching translation found - try fallback to old timestamp-based parsing
seg.translated = ""
# Fallback: if no indexed matches, try old timestamp format
if not matches:
print("[Warning] No indexed format found, falling back to timestamp parsing")
timeline_pattern = re.compile(r'^(\d+):(\d{2})\s+(.+)$', re.MULTILINE)
timestamp_matches = timeline_pattern.findall(translated_text)
# Create mapping from timestamp to translation
translations_by_time = {}
for mins, secs, text in timestamp_matches:
time_sec = int(mins) * 60 + int(secs)
translations_by_time[time_sec] = text.strip()
# Track used translations to prevent duplicates
used_translations = set()
# Map translations back to segments by matching start times
for seg in segments:
start_sec = int(seg.start)
matched_time = None
# Try exact match first
if start_sec in translations_by_time and start_sec not in used_translations:
matched_time = start_sec
else:
# Try to find closest UNUSED match within 1 second
for t in range(start_sec - 1, start_sec + 2):
if t in translations_by_time and t not in used_translations:
matched_time = t
break
if matched_time is not None:
seg.translated = translations_by_time[matched_time]
used_translations.add(matched_time)
else:
seg.translated = ""
else:
# Original parsing for other modes
translated_parts = translated_text.split("---")
for i, seg in enumerate(segments):
if i < len(translated_parts):
seg.translated = translated_parts[i].strip()
else:
seg.translated = seg.text # Fallback to original
# Calculate token usage for logging
usage = response.usage
token_info = f"(tokens: {usage.prompt_tokens}+{usage.completion_tokens}={usage.total_tokens})"
# Post-processing: Shorten segments that exceed character limit
# Skip for REWRITE mode - the prompt handles length naturally
shortened_count = 0
if mode != TranslationMode.REWRITE:
chars_per_sec = 5
for i, seg in enumerate(segments):
if seg.translated:
duration = seg.end - seg.start
max_chars = int(duration * chars_per_sec)
current_len = len(seg.translated)
if current_len > max_chars * 1.3 and max_chars >= 5:
seg.translated = await shorten_text(client, seg.translated, max_chars)
shortened_count += 1
print(f"[Shorten] Seg {i+1}: {current_len}{len(seg.translated)}자 (제한:{max_chars}자)")
shorten_info = f" [축약:{shortened_count}개]" if shortened_count > 0 else ""
return True, f"Translation complete [{mode}] {token_info}{shorten_info}", segments
except Exception as e:
return False, f"Translation error: {str(e)}", segments
async def generate_shorts_script(
segments: List[TranscriptSegment],
style: str = "engaging",
max_tokens: int = 500,
) -> Tuple[bool, str, Optional[str]]:
"""
Generate a completely new Korean Shorts script from Chinese transcript.
Args:
segments: Original transcript segments
style: Script style (engaging, informative, funny, dramatic)
max_tokens: Maximum output tokens
Returns:
Tuple of (success, message, script)
"""
if not settings.OPENAI_API_KEY:
return False, "OpenAI API key not configured", None
try:
client = get_openai_client()
# Combine all text
full_text = " ".join([seg.text for seg in segments])
total_duration = segments[-1].end if segments else 0
style_guides = {
"engaging": "Use hooks, questions, and emotional expressions. Start with attention-grabbing line.",
"informative": "Focus on facts and clear explanations. Use simple, direct language.",
"funny": "Add humor, wordplay, and light-hearted tone. Include relatable jokes.",
"dramatic": "Build tension and suspense. Use impactful short sentences.",
}
style_guide = style_guides.get(style, style_guides["engaging"])
system_prompt = f"""You are a viral Korean YouTube Shorts script writer.
Create a COMPLETELY ORIGINAL Korean script inspired by the Chinese video content.
=== CRITICAL: ANTI-PLAGIARISM RULES ===
- This is NOT translation - it's ORIGINAL CONTENT CREATION
- NEVER copy sentence structures, word order, or phrasing from original
- Extract only the CORE IDEA, then write YOUR OWN script from scratch
- Imagine you're a Korean creator who just learned this interesting fact
- Add your own personality, reactions, and Korean cultural context
=======================================
Video duration: ~{int(total_duration)} seconds
Style: {style}
Guide: {style_guide}
Output format:
[0:00] 첫 번째 대사
[0:03] 두 번째 대사
...
Requirements:
- Write in POLITE FORMAL KOREAN (존댓말/경어) - friendly but respectful
- Each line: 2-3 seconds when spoken aloud
- Start with a HOOK that grabs attention
- Use polite Korean expressions: "이거 아세요?", "정말 신기하죠", "근데 여기서 중요한 건요"
- End with engagement: question, call-to-action, or surprise
- Make it feel like ORIGINAL Korean content, not a translation"""
response = client.chat.completions.create(
model=settings.OPENAI_MODEL,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Chinese transcript:\n{full_text}"}
],
temperature=0.7,
max_tokens=max_tokens,
)
script = response.choices[0].message.content
usage = response.usage
token_info = f"(tokens: {usage.total_tokens})"
return True, f"Script generated [{style}] {token_info}", script
except Exception as e:
return False, f"Script generation error: {str(e)}", None
async def translate_single(
text: str,
target_language: str = "Korean",
max_tokens: Optional[int] = None,
) -> Tuple[bool, str]:
"""Translate a single text."""
if not settings.OPENAI_API_KEY:
return False, text
try:
client = get_openai_client()
request_params = {
"model": settings.OPENAI_MODEL,
"messages": [
{
"role": "system",
"content": f"Translate to {target_language}. Only output the translation, nothing else."
},
{
"role": "user",
"content": text
}
],
"temperature": 0.3,
}
if max_tokens:
request_params["max_tokens"] = max_tokens
response = client.chat.completions.create(**request_params)
translated = response.choices[0].message.content
return True, translated.strip()
except Exception as e:
return False, text

View File

@@ -0,0 +1,659 @@
import subprocess
import asyncio
import os
from typing import Optional, Tuple
from app.config import settings
async def process_video(
input_path: str,
output_path: str,
subtitle_path: Optional[str] = None,
bgm_path: Optional[str] = None,
bgm_volume: float = 0.3,
keep_original_audio: bool = False,
intro_text: Optional[str] = None,
intro_duration: float = 0.7,
intro_font_size: int = 100,
) -> Tuple[bool, str]:
"""
Process video: remove audio, add subtitles, add BGM, add intro text.
Args:
input_path: Path to input video
output_path: Path for output video
subtitle_path: Path to ASS/SRT subtitle file
bgm_path: Path to BGM audio file
bgm_volume: Volume level for BGM (0.0 - 1.0)
keep_original_audio: Whether to keep original audio
intro_text: Text to display at the beginning of video (YouTube Shorts thumbnail)
intro_duration: How long to display intro text (seconds)
intro_font_size: Font size for intro text (100-120 recommended)
Returns:
Tuple of (success, message)
"""
if not os.path.exists(input_path):
return False, f"Input video not found: {input_path}"
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# Build FFmpeg command
cmd = ["ffmpeg", "-y"] # -y to overwrite
# Input video
cmd.extend(["-i", input_path])
# Input BGM if provided (stream_loop must come BEFORE -i)
if bgm_path and os.path.exists(bgm_path):
cmd.extend(["-stream_loop", "-1"]) # Loop BGM infinitely
cmd.extend(["-i", bgm_path])
# Build filter complex
filter_parts = []
audio_parts = []
# Audio handling
if keep_original_audio and bgm_path and os.path.exists(bgm_path):
# Mix original audio with BGM
filter_parts.append(f"[0:a]volume=1.0[original]")
filter_parts.append(f"[1:a]volume={bgm_volume}[bgm]")
filter_parts.append(f"[original][bgm]amix=inputs=2:duration=shortest[audio]")
audio_output = "[audio]"
elif bgm_path and os.path.exists(bgm_path):
# BGM only (no original audio)
filter_parts.append(f"[1:a]volume={bgm_volume}[audio]")
audio_output = "[audio]"
elif keep_original_audio:
# Original audio only
audio_output = "0:a"
else:
# No audio
audio_output = None
# Build video filter chain
video_filters = []
# Note: We no longer use tpad to add frozen frames, as it extends the video duration.
# Instead, intro text is simply overlaid on the existing video content.
# 2. Add subtitle overlay if provided
if subtitle_path and os.path.exists(subtitle_path):
escaped_path = subtitle_path.replace("\\", "/").replace(":", "\\:").replace("'", "\\'")
video_filters.append(f"ass='{escaped_path}'")
# 3. Add intro text overlay if provided (shown during frozen frame portion)
if intro_text:
# Find a suitable font - try common Korean fonts
font_options = [
"/System/Library/Fonts/Supplemental/AppleGothic.ttf", # macOS Korean
"/System/Library/Fonts/AppleSDGothicNeo.ttc", # macOS Korean
"/usr/share/fonts/truetype/nanum/NanumGothicBold.ttf", # Linux Korean
"/usr/share/fonts/opentype/noto/NotoSansCJK-Bold.ttc", # Linux CJK
]
font_file = None
for font in font_options:
if os.path.exists(font):
font_file = font.replace(":", "\\:")
break
# Adjust font size and split text if too long
# Shorts video is 1080 width, so ~10-12 chars fit comfortably at 100px
text_len = len(intro_text)
adjusted_font_size = intro_font_size
# Split into 2 lines if text is long (more than 10 chars)
lines = []
if text_len > 10:
# Find best split point near middle
mid = text_len // 2
split_pos = mid
for i in range(mid, max(0, mid - 5), -1):
if intro_text[i] in ' ,、,':
split_pos = i + 1
break
for i in range(mid, min(text_len, mid + 5)):
if intro_text[i] in ' ,、,':
split_pos = i + 1
break
line1 = intro_text[:split_pos].strip()
line2 = intro_text[split_pos:].strip()
if line2:
lines = [line1, line2]
else:
lines = [intro_text]
else:
lines = [intro_text]
# Adjust font size based on longest line length
max_line_len = max(len(line) for line in lines)
if max_line_len > 12:
adjusted_font_size = int(intro_font_size * 10 / max_line_len)
adjusted_font_size = max(50, min(adjusted_font_size, intro_font_size)) # Clamp between 50-100
# Add fade effect timing
fade_out_start = max(0.1, intro_duration - 0.3)
alpha_expr = f"if(gt(t,{fade_out_start}),(({intro_duration}-t)/0.3),1)"
# Create drawtext filter(s) for each line
line_height = adjusted_font_size + 20
total_height = line_height * len(lines)
for i, line in enumerate(lines):
escaped_text = line.replace("'", "\\'").replace(":", "\\:").replace("\\", "\\\\")
# Calculate y position for this line (centered overall)
if len(lines) == 1:
y_expr = "(h-text_h)/2"
else:
# Center the block of lines, then position each line
y_offset = int((i - (len(lines) - 1) / 2) * line_height)
y_expr = f"(h-text_h)/2+{y_offset}"
drawtext_parts = [
f"text='{escaped_text}'",
f"fontsize={adjusted_font_size}",
"fontcolor=white",
"x=(w-text_w)/2", # Center horizontally
f"y={y_expr}",
f"enable='lt(t,{intro_duration})'",
"borderw=3",
"bordercolor=black",
"box=1",
"boxcolor=black@0.6",
"boxborderw=15",
f"alpha='{alpha_expr}'",
]
if font_file:
drawtext_parts.insert(1, f"fontfile='{font_file}'")
video_filters.append(f"drawtext={':'.join(drawtext_parts)}")
# Combine video filters
video_filter_str = ",".join(video_filters) if video_filters else None
# Construct FFmpeg command
if filter_parts or video_filter_str:
if filter_parts and video_filter_str:
full_filter = ";".join(filter_parts) + f";[0:v]{video_filter_str}[vout]"
cmd.extend(["-filter_complex", full_filter])
cmd.extend(["-map", "[vout]"])
if audio_output and audio_output.startswith("["):
cmd.extend(["-map", audio_output])
elif audio_output:
cmd.extend(["-map", audio_output])
elif video_filter_str:
cmd.extend(["-vf", video_filter_str])
if bgm_path and os.path.exists(bgm_path):
cmd.extend(["-filter_complex", f"[1:a]volume={bgm_volume}[audio]"])
cmd.extend(["-map", "0:v", "-map", "[audio]"])
elif not keep_original_audio:
cmd.extend(["-an"]) # No audio
elif filter_parts:
cmd.extend(["-filter_complex", ";".join(filter_parts)])
cmd.extend(["-map", "0:v"])
if audio_output and audio_output.startswith("["):
cmd.extend(["-map", audio_output])
else:
if not keep_original_audio:
cmd.extend(["-an"])
# Output settings
cmd.extend([
"-c:v", "libx264",
"-preset", "medium",
"-crf", "23",
"-c:a", "aac",
"-b:a", "128k",
"-shortest",
output_path
])
try:
# Run FFmpeg in thread pool to avoid blocking the event loop
result = await asyncio.to_thread(
subprocess.run,
cmd,
capture_output=True,
text=True,
timeout=600, # 10 minute timeout
)
if result.returncode != 0:
error_msg = result.stderr[-500:] if result.stderr else "Unknown error"
return False, f"FFmpeg error: {error_msg}"
if os.path.exists(output_path):
return True, "Video processing complete"
else:
return False, "Output file not created"
except subprocess.TimeoutExpired:
return False, "Processing timed out"
except Exception as e:
return False, f"Processing error: {str(e)}"
async def get_video_duration(video_path: str) -> Optional[float]:
"""Get video duration in seconds."""
cmd = [
"ffprobe",
"-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
video_path
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
return float(result.stdout.strip())
except Exception:
pass
return None
async def get_video_info(video_path: str) -> Optional[dict]:
"""Get video information (duration, resolution, etc.)."""
import json as json_module
cmd = [
"ffprobe",
"-v", "error",
"-select_streams", "v:0",
"-show_entries", "stream=width,height,duration:format=duration",
"-of", "json",
video_path
]
try:
result = await asyncio.to_thread(
subprocess.run,
cmd,
capture_output=True,
text=True,
timeout=30,
)
if result.returncode == 0:
data = json_module.loads(result.stdout)
info = {}
# Get duration from format (more reliable)
if "format" in data and "duration" in data["format"]:
info["duration"] = float(data["format"]["duration"])
# Get resolution from stream
if "streams" in data and len(data["streams"]) > 0:
stream = data["streams"][0]
info["width"] = stream.get("width")
info["height"] = stream.get("height")
return info if info else None
except Exception:
pass
return None
async def trim_video(
input_path: str,
output_path: str,
start_time: float,
end_time: float,
) -> Tuple[bool, str]:
"""
Trim video to specified time range.
Args:
input_path: Path to input video
output_path: Path for output video
start_time: Start time in seconds
end_time: End time in seconds
Returns:
Tuple of (success, message)
"""
if not os.path.exists(input_path):
return False, f"Input video not found: {input_path}"
# Validate time range
duration = await get_video_duration(input_path)
if duration is None:
return False, "Could not get video duration"
if start_time < 0:
start_time = 0
if end_time > duration:
end_time = duration
if start_time >= end_time:
return False, f"Invalid time range: start ({start_time}) >= end ({end_time})"
os.makedirs(os.path.dirname(output_path), exist_ok=True)
trim_duration = end_time - start_time
# Log trim parameters for debugging
print(f"[Trim] Input: {input_path}")
print(f"[Trim] Original duration: {duration:.3f}s")
print(f"[Trim] Requested: start={start_time:.3f}s, end={end_time:.3f}s")
print(f"[Trim] Output duration should be: {trim_duration:.3f}s")
# Use -ss BEFORE -i for input seeking (faster and more reliable for end trimming)
# Combined with -t for accurate duration control
# -accurate_seek ensures frame-accurate seeking
cmd = [
"ffmpeg", "-y",
"-accurate_seek", # Enable accurate seeking
"-ss", str(start_time), # Input seeking (before -i)
"-i", input_path,
"-t", str(trim_duration), # Duration of output
"-c:v", "libx264", # Re-encode video for accurate cut
"-preset", "fast", # Fast encoding preset
"-crf", "18", # High quality (lower = better)
"-c:a", "aac", # Re-encode audio
"-b:a", "128k", # Audio bitrate
"-avoid_negative_ts", "make_zero", # Fix timestamp issues
output_path
]
print(f"[Trim] Command: {' '.join(cmd)}")
try:
result = await asyncio.to_thread(
subprocess.run,
cmd,
capture_output=True,
text=True,
timeout=120,
)
if result.returncode != 0:
error_msg = result.stderr[-300:] if result.stderr else "Unknown error"
print(f"[Trim] FFmpeg error: {error_msg}")
return False, f"Trim failed: {error_msg}"
if os.path.exists(output_path):
new_duration = await get_video_duration(output_path)
print(f"[Trim] Success! New duration: {new_duration:.3f}s (expected: {trim_duration:.3f}s)")
print(f"[Trim] Difference from expected: {abs(new_duration - trim_duration):.3f}s")
return True, f"Video trimmed successfully ({new_duration:.1f}s)"
else:
print("[Trim] Error: Output file not created")
return False, "Output file not created"
except subprocess.TimeoutExpired:
print("[Trim] Error: Timeout")
return False, "Trim operation timed out"
except Exception as e:
print(f"[Trim] Error: {str(e)}")
return False, f"Trim error: {str(e)}"
async def extract_frame(
video_path: str,
output_path: str,
timestamp: float,
) -> Tuple[bool, str]:
"""
Extract a single frame from video at specified timestamp.
Args:
video_path: Path to input video
output_path: Path for output image (jpg/png)
timestamp: Time in seconds
Returns:
Tuple of (success, message)
"""
if not os.path.exists(video_path):
return False, f"Video not found: {video_path}"
os.makedirs(os.path.dirname(output_path), exist_ok=True)
cmd = [
"ffmpeg", "-y",
"-ss", str(timestamp),
"-i", video_path,
"-frames:v", "1",
"-q:v", "2",
output_path
]
try:
result = await asyncio.to_thread(
subprocess.run,
cmd,
capture_output=True,
text=True,
timeout=30,
)
if result.returncode == 0 and os.path.exists(output_path):
return True, "Frame extracted"
return False, result.stderr[-200:] if result.stderr else "Unknown error"
except Exception as e:
return False, str(e)
async def get_audio_duration(audio_path: str) -> Optional[float]:
"""Get audio duration in seconds."""
return await get_video_duration(audio_path) # Same command works
async def extract_audio(video_path: str, output_path: str) -> Tuple[bool, str]:
"""Extract audio from video."""
cmd = [
"ffmpeg", "-y",
"-i", video_path,
"-vn",
"-acodec", "pcm_s16le",
"-ar", "16000",
"-ac", "1",
output_path
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode == 0:
return True, "Audio extracted"
return False, result.stderr
except Exception as e:
return False, str(e)
async def extract_audio_with_noise_reduction(
video_path: str,
output_path: str,
noise_reduction_level: str = "medium"
) -> Tuple[bool, str]:
"""
Extract audio from video with noise reduction for better STT accuracy.
Args:
video_path: Path to input video
output_path: Path for output audio (WAV format recommended)
noise_reduction_level: "light", "medium", or "heavy"
Returns:
Tuple of (success, message)
"""
if not os.path.exists(video_path):
return False, f"Video file not found: {video_path}"
# Build audio filter chain based on noise reduction level
filters = []
# 1. High-pass filter: Remove low frequency rumble (< 80Hz)
filters.append("highpass=f=80")
# 2. Low-pass filter: Remove high frequency hiss (> 8000Hz for speech)
filters.append("lowpass=f=8000")
if noise_reduction_level == "light":
# Light: Just basic frequency filtering
pass
elif noise_reduction_level == "medium":
# Medium: Add FFT-based denoiser
# afftdn: nr=noise reduction amount (0-100), nf=noise floor
filters.append("afftdn=nf=-25:nr=10:nt=w")
elif noise_reduction_level == "heavy":
# Heavy: More aggressive noise reduction
filters.append("afftdn=nf=-20:nr=20:nt=w")
# Add dynamic range compression to normalize volume
filters.append("acompressor=threshold=-20dB:ratio=4:attack=5:release=50")
# 3. Normalize audio levels
filters.append("loudnorm=I=-16:TP=-1.5:LRA=11")
filter_chain = ",".join(filters)
cmd = [
"ffmpeg", "-y",
"-i", video_path,
"-vn", # No video
"-af", filter_chain,
"-acodec", "pcm_s16le", # PCM format for Whisper
"-ar", "16000", # 16kHz sample rate (Whisper optimal)
"-ac", "1", # Mono
output_path
]
try:
# Run FFmpeg in thread pool to avoid blocking the event loop
result = await asyncio.to_thread(
subprocess.run,
cmd,
capture_output=True,
text=True,
timeout=120,
)
if result.returncode != 0:
error_msg = result.stderr[-300:] if result.stderr else "Unknown error"
return False, f"Audio extraction failed: {error_msg}"
if os.path.exists(output_path):
return True, f"Audio extracted with {noise_reduction_level} noise reduction"
else:
return False, "Output file not created"
except subprocess.TimeoutExpired:
return False, "Audio extraction timed out"
except Exception as e:
return False, f"Audio extraction error: {str(e)}"
async def analyze_audio_noise_level(audio_path: str) -> Optional[dict]:
"""
Analyze audio to detect noise level.
Returns dict with mean_volume, max_volume, noise_floor estimates.
"""
cmd = [
"ffmpeg",
"-i", audio_path,
"-af", "volumedetect",
"-f", "null",
"-"
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
stderr = result.stderr
# Parse volume detection output
info = {}
for line in stderr.split('\n'):
if 'mean_volume' in line:
info['mean_volume'] = float(line.split(':')[1].strip().replace(' dB', ''))
elif 'max_volume' in line:
info['max_volume'] = float(line.split(':')[1].strip().replace(' dB', ''))
return info if info else None
except Exception:
return None
async def has_audio_stream(video_path: str) -> bool:
"""
Check if video file has an audio stream.
Returns:
True if video has audio, False otherwise
"""
cmd = [
"ffprobe",
"-v", "error",
"-select_streams", "a", # Select only audio streams
"-show_entries", "stream=codec_type",
"-of", "csv=p=0",
video_path
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
# If there's audio, ffprobe will output "audio"
return "audio" in result.stdout.lower()
except Exception:
return False
async def get_audio_volume_info(video_path: str) -> Optional[dict]:
"""
Get audio volume information to detect silent audio.
Returns:
dict with mean_volume, or None if no audio or error
"""
# First check if audio stream exists
if not await has_audio_stream(video_path):
return None
cmd = [
"ffmpeg",
"-i", video_path,
"-af", "volumedetect",
"-f", "null",
"-"
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
stderr = result.stderr
info = {}
for line in stderr.split('\n'):
if 'mean_volume' in line:
info['mean_volume'] = float(line.split(':')[1].strip().replace(' dB', ''))
elif 'max_volume' in line:
info['max_volume'] = float(line.split(':')[1].strip().replace(' dB', ''))
return info if info else None
except Exception:
return None
def is_audio_silent(volume_info: Optional[dict], threshold_db: float = -50.0) -> bool:
"""
Check if audio is effectively silent (below threshold).
Args:
volume_info: dict from get_audio_volume_info
threshold_db: Volume below this is considered silent (default -50dB)
Returns:
True if silent or no audio, False otherwise
"""
if not volume_info:
return True
mean_volume = volume_info.get('mean_volume', -100)
return mean_volume < threshold_db