""" Thumbnail Generator Service Generates YouTube Shorts thumbnails with: 1. Frame extraction from video 2. GPT-generated catchphrase 3. Text overlay with styling """ import os import subprocess import asyncio from typing import Optional, Tuple, List from openai import OpenAI from PIL import Image, ImageDraw, ImageFont from app.config import settings from app.models.schemas import TranscriptSegment def get_openai_client() -> OpenAI: """Get OpenAI client.""" return OpenAI(api_key=settings.OPENAI_API_KEY) async def extract_frame( video_path: str, output_path: str, timestamp: float = 2.0, ) -> Tuple[bool, str]: """ Extract a single frame from video. Args: video_path: Path to video file output_path: Path to save thumbnail image timestamp: Time in seconds to extract frame Returns: Tuple of (success, message) """ try: cmd = [ "ffmpeg", "-y", "-ss", str(timestamp), "-i", video_path, "-vframes", "1", "-q:v", "2", # High quality JPEG output_path ] process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) _, stderr = await process.communicate() if process.returncode != 0: return False, f"FFmpeg error: {stderr.decode()[:200]}" if not os.path.exists(output_path): return False, "Frame extraction failed - no output file" return True, "Frame extracted successfully" except Exception as e: return False, f"Frame extraction error: {str(e)}" async def generate_catchphrase( transcript: List[TranscriptSegment], style: str = "homeshopping", ) -> Tuple[bool, str, str]: """ Generate a catchy thumbnail text using GPT. Args: transcript: List of transcript segments (with translations) style: Style of catchphrase (homeshopping, viral, informative) Returns: Tuple of (success, message, catchphrase) """ if not settings.OPENAI_API_KEY: return False, "OpenAI API key not configured", "" try: client = get_openai_client() # Combine translated text if transcript and transcript[0].translated: full_text = " ".join([seg.translated for seg in transcript if seg.translated]) else: full_text = " ".join([seg.text for seg in transcript]) style_guides = { "homeshopping": """홈쇼핑 스타일의 임팩트 있는 문구를 만드세요. - "이거 하나면 끝!" 같은 강렬한 어필 - 혜택/효과 강조 - 숫자 활용 (예: "10초만에", "50% 절약") - 질문형도 OK (예: "아직도 힘들게?")""", "viral": """바이럴 쇼츠 스타일의 호기심 유발 문구를 만드세요. - 궁금증 유발 - 반전/놀라움 암시 - 이모지 1-2개 사용 가능""", "informative": """정보성 콘텐츠 스타일의 명확한 문구를 만드세요. - 핵심 정보 전달 - 간결하고 명확하게""", } style_guide = style_guides.get(style, style_guides["homeshopping"]) system_prompt = f"""당신은 YouTube Shorts 썸네일 문구 전문가입니다. {style_guide} 규칙: - 반드시 15자 이내! - 한 줄로 작성 - 한글만 사용 (영어/한자 금지) - 출력은 문구만! (설명 없이) 예시 출력: 이거 하나면 끝! 10초면 완성! 아직도 힘들게? 진짜 이게 돼요?""" response = client.chat.completions.create( model=settings.OPENAI_MODEL, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": f"다음 영상 내용으로 썸네일 문구를 만들어주세요:\n\n{full_text[:500]}"} ], temperature=0.8, max_tokens=50, ) catchphrase = response.choices[0].message.content.strip() # Clean up catchphrase = catchphrase.strip('"\'""''') # Ensure max length if len(catchphrase) > 20: catchphrase = catchphrase[:20] return True, "Catchphrase generated", catchphrase except Exception as e: return False, f"GPT error: {str(e)}", "" def add_text_overlay( image_path: str, output_path: str, text: str, font_size: int = 80, font_color: str = "#FFFFFF", stroke_color: str = "#000000", stroke_width: int = 4, position: str = "center", font_name: str = "NanumGothicBold", ) -> Tuple[bool, str]: """ Add text overlay to image using PIL. Args: image_path: Input image path output_path: Output image path text: Text to overlay font_size: Font size in pixels font_color: Text color (hex) stroke_color: Outline color (hex) stroke_width: Outline thickness position: Text position (top, center, bottom) font_name: Font family name Returns: Tuple of (success, message) """ try: # Open image img = Image.open(image_path) draw = ImageDraw.Draw(img) img_width, img_height = img.size # Maximum text width (90% of image width) max_text_width = int(img_width * 0.9) # Try to load font def load_font(size): font_paths = [ f"/usr/share/fonts/truetype/nanum/{font_name}.ttf", f"/usr/share/fonts/opentype/nanum/{font_name}.otf", f"/System/Library/Fonts/{font_name}.ttf", f"/Library/Fonts/{font_name}.ttf", f"~/Library/Fonts/{font_name}.ttf", f"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", ] for path in font_paths: expanded_path = os.path.expanduser(path) if os.path.exists(expanded_path): try: return ImageFont.truetype(expanded_path, size) except: continue return None font = load_font(font_size) if font is None: font = ImageFont.load_default() font_size = 40 # Check text width and adjust if necessary bbox = draw.textbbox((0, 0), text, font=font) text_width = bbox[2] - bbox[0] lines = [text] if text_width > max_text_width: # Try splitting into 2 lines first mid = len(text) // 2 # Find best split point near middle (at space or comma if exists) split_pos = mid for i in range(mid, max(0, mid - 5), -1): if text[i] in ' ,、,': split_pos = i + 1 break for i in range(mid, min(len(text), mid + 5)): if text[i] in ' ,、,': split_pos = i + 1 break # Split text into 2 lines line1 = text[:split_pos].strip() line2 = text[split_pos:].strip() lines = [line1, line2] if line2 else [line1] # Check if 2-line version fits max_line_width = max( draw.textbbox((0, 0), line, font=font)[2] - draw.textbbox((0, 0), line, font=font)[0] for line in lines ) # If still too wide, reduce font size while max_line_width > max_text_width and font_size > 40: font_size -= 5 font = load_font(font_size) if font is None: font = ImageFont.load_default() break max_line_width = max( draw.textbbox((0, 0), line, font=font)[2] - draw.textbbox((0, 0), line, font=font)[0] for line in lines ) # Calculate total text height for multi-line line_height = font_size + 10 total_height = line_height * len(lines) # Calculate starting y position if position == "top": start_y = img_height // 6 elif position == "bottom": start_y = img_height - img_height // 4 - total_height else: # center start_y = (img_height - total_height) // 2 # Convert hex colors to RGB def hex_to_rgb(hex_color): hex_color = hex_color.lstrip('#') return tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4)) text_rgb = hex_to_rgb(font_color) stroke_rgb = hex_to_rgb(stroke_color) # Draw each line for i, line in enumerate(lines): bbox = draw.textbbox((0, 0), line, font=font) line_width = bbox[2] - bbox[0] # Account for left bearing (bbox[0]) to prevent first character cut-off # Some fonts/characters have non-zero left offset x = (img_width - line_width) // 2 - bbox[0] y = start_y + i * line_height # Draw text with stroke (outline) for dx in range(-stroke_width, stroke_width + 1): for dy in range(-stroke_width, stroke_width + 1): if dx != 0 or dy != 0: draw.text((x + dx, y + dy), line, font=font, fill=stroke_rgb) # Draw main text draw.text((x, y), line, font=font, fill=text_rgb) # Save img.save(output_path, "JPEG", quality=95) return True, "Text overlay added" except Exception as e: return False, f"Text overlay error: {str(e)}" async def generate_thumbnail( job_id: str, video_path: str, transcript: List[TranscriptSegment], timestamp: float = 2.0, style: str = "homeshopping", custom_text: Optional[str] = None, font_size: int = 80, position: str = "center", ) -> Tuple[bool, str, Optional[str]]: """ Generate a complete thumbnail with text overlay. Args: job_id: Job ID for naming video_path: Path to video file transcript: Transcript segments timestamp: Time to extract frame style: Catchphrase style custom_text: Custom text (skip GPT generation) font_size: Font size position: Text position Returns: Tuple of (success, message, thumbnail_path) """ # Paths frame_path = os.path.join(settings.PROCESSED_DIR, f"{job_id}_frame.jpg") thumbnail_path = os.path.join(settings.PROCESSED_DIR, f"{job_id}_thumbnail.jpg") # Step 1: Extract frame success, msg = await extract_frame(video_path, frame_path, timestamp) if not success: return False, msg, None # Step 2: Generate or use custom text if custom_text: catchphrase = custom_text else: success, msg, catchphrase = await generate_catchphrase(transcript, style) if not success: # Fallback: use first translation catchphrase = transcript[0].translated if transcript and transcript[0].translated else "확인해보세요!" # Step 3: Add text overlay success, msg = add_text_overlay( frame_path, thumbnail_path, catchphrase, font_size=font_size, position=position, ) if not success: return False, msg, None # Cleanup frame if os.path.exists(frame_path): os.remove(frame_path) return True, f"Thumbnail generated: {catchphrase}", thumbnail_path async def get_video_timestamps(video_path: str, count: int = 5) -> List[float]: """ Get evenly distributed timestamps from video for thumbnail selection. Args: video_path: Path to video count: Number of timestamps to return Returns: List of timestamps in seconds """ try: cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", video_path ] result = subprocess.run(cmd, capture_output=True, text=True) duration = float(result.stdout.strip()) # Generate evenly distributed timestamps (skip first and last 10%) start = duration * 0.1 end = duration * 0.9 step = (end - start) / (count - 1) if count > 1 else 0 timestamps = [start + i * step for i in range(count)] return timestamps except Exception: return [1.0, 3.0, 5.0, 7.0, 10.0] # Fallback