mirror of
https://github.com/won-ktds/smarketing-backend.git
synced 2026-06-12 20:39:09 +00:00
release
This commit is contained in:
@@ -0,0 +1 @@
|
||||
# Package initialization file
|
||||
@@ -0,0 +1,237 @@
|
||||
"""
|
||||
AI 클라이언트 유틸리티
|
||||
Claude AI 및 OpenAI API 호출을 담당
|
||||
"""
|
||||
import os
|
||||
import base64
|
||||
import requests
|
||||
from typing import Optional, List
|
||||
import anthropic
|
||||
import openai
|
||||
from PIL import Image
|
||||
import io
|
||||
from utils.blob_storage import BlobStorageClient
|
||||
|
||||
|
||||
class AIClient:
|
||||
"""AI API 클라이언트 클래스"""
|
||||
|
||||
def __init__(self):
|
||||
"""AI 클라이언트 초기화"""
|
||||
self.claude_api_key = os.getenv('CLAUDE_API_KEY')
|
||||
self.openai_api_key = os.getenv('OPENAI_API_KEY')
|
||||
|
||||
# Blob Storage 클라이언트 초기화
|
||||
self.blob_client = BlobStorageClient()
|
||||
|
||||
# Claude 클라이언트 초기화
|
||||
if self.claude_api_key:
|
||||
self.claude_client = anthropic.Anthropic(api_key=self.claude_api_key)
|
||||
else:
|
||||
self.claude_client = None
|
||||
|
||||
# OpenAI 클라이언트 초기화
|
||||
if self.openai_api_key:
|
||||
self.openai_client = openai.OpenAI(api_key=self.openai_api_key)
|
||||
else:
|
||||
self.openai_client = None
|
||||
|
||||
def download_image_from_url(self, image_url: str) -> str:
|
||||
"""
|
||||
URL에서 이미지를 다운로드하여 임시 파일로 저장
|
||||
Args:
|
||||
image_url: 다운로드할 이미지 URL
|
||||
Returns:
|
||||
임시 저장된 파일 경로
|
||||
"""
|
||||
try:
|
||||
response = requests.get(image_url, timeout=30)
|
||||
response.raise_for_status()
|
||||
|
||||
# 임시 파일로 저장
|
||||
import tempfile
|
||||
import uuid
|
||||
|
||||
file_extension = image_url.split('.')[-1] if '.' in image_url else 'jpg'
|
||||
temp_filename = f"temp_{uuid.uuid4()}.{file_extension}"
|
||||
temp_path = os.path.join('uploads', 'temp', temp_filename)
|
||||
|
||||
# 디렉토리 생성
|
||||
os.makedirs(os.path.dirname(temp_path), exist_ok=True)
|
||||
|
||||
with open(temp_path, 'wb') as f:
|
||||
f.write(response.content)
|
||||
|
||||
return temp_path
|
||||
|
||||
except Exception as e:
|
||||
print(f"이미지 다운로드 실패 {image_url}: {e}")
|
||||
return None
|
||||
|
||||
def generate_image_with_openai(self, prompt: str, size: str = "1024x1536") -> str:
|
||||
"""
|
||||
gpt를 사용하여 이미지 생성
|
||||
Args:
|
||||
prompt: 이미지 생성 프롬프트
|
||||
size: 이미지 크기 (1024x1536)
|
||||
Returns:
|
||||
Azure Blob Storage에 저장된 이미지 URL
|
||||
"""
|
||||
try:
|
||||
if not self.openai_client:
|
||||
raise Exception("OpenAI API 키가 설정되지 않았습니다.")
|
||||
|
||||
response = self.openai_client.images.generate(
|
||||
model="gpt-image-1",
|
||||
prompt=prompt,
|
||||
size=size,
|
||||
n=1,
|
||||
)
|
||||
|
||||
# base64 이미지 데이터 추출
|
||||
b64_data = response.data[0].b64_json
|
||||
image_data = base64.b64decode(b64_data)
|
||||
|
||||
# Azure Blob Storage에 업로드
|
||||
blob_url = self.blob_client.upload_image(image_data, 'png')
|
||||
|
||||
print(f"✅ 이미지 생성 및 업로드 완료: {blob_url}")
|
||||
return blob_url
|
||||
|
||||
except Exception as e:
|
||||
raise Exception(f"이미지 생성 실패: {str(e)}")
|
||||
|
||||
def generate_text(self, prompt: str, max_tokens: int = 1000) -> str:
|
||||
"""
|
||||
텍스트 생성 (Claude 우선, 실패시 OpenAI 사용)
|
||||
"""
|
||||
# Claude AI 시도
|
||||
if self.claude_client:
|
||||
try:
|
||||
response = self.claude_client.messages.create(
|
||||
model="claude-3-5-sonnet-20240620",
|
||||
max_tokens=max_tokens,
|
||||
messages=[
|
||||
{"role": "user", "content": prompt}
|
||||
]
|
||||
)
|
||||
return response.content[0].text
|
||||
except Exception as e:
|
||||
print(f"Claude AI 호출 실패: {e}")
|
||||
|
||||
# OpenAI 시도
|
||||
if self.openai_client:
|
||||
try:
|
||||
response = self.openai_client.chat.completions.create(
|
||||
model="gpt-4o",
|
||||
messages=[
|
||||
{"role": "user", "content": prompt}
|
||||
],
|
||||
max_tokens=max_tokens
|
||||
)
|
||||
return response.choices[0].message.content
|
||||
except Exception as e:
|
||||
print(f"OpenAI 호출 실패: {e}")
|
||||
|
||||
# 기본 응답
|
||||
return self._generate_fallback_content(prompt)
|
||||
|
||||
def analyze_image(self, image_path: str) -> str:
|
||||
"""
|
||||
이미지 분석 및 설명 생성
|
||||
"""
|
||||
try:
|
||||
# 이미지를 base64로 인코딩
|
||||
image_base64 = self._encode_image_to_base64(image_path)
|
||||
|
||||
# Claude Vision API 시도
|
||||
if self.claude_client:
|
||||
try:
|
||||
response = self.claude_client.messages.create(
|
||||
model="claude-3-5-sonnet-20240620",
|
||||
max_tokens=500,
|
||||
messages=[
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{
|
||||
"type": "text",
|
||||
"text": "이 이미지를 보고 음식점 마케팅에 활용할 수 있도록 매력적으로 설명해주세요. 음식이라면 맛있어 보이는 특징을, 매장이라면 분위기를, 이벤트라면 특별함을 강조해서 한국어로 50자 이내로 설명해주세요."
|
||||
},
|
||||
{
|
||||
"type": "image",
|
||||
"source": {
|
||||
"type": "base64",
|
||||
"media_type": "image/jpeg",
|
||||
"data": image_base64
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
)
|
||||
return response.content[0].text
|
||||
except Exception as e:
|
||||
print(f"Claude 이미지 분석 실패: {e}")
|
||||
|
||||
# OpenAI Vision API 시도
|
||||
if self.openai_client:
|
||||
try:
|
||||
response = self.openai_client.chat.completions.create(
|
||||
model="gpt-4o",
|
||||
messages=[
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{
|
||||
"type": "text",
|
||||
"text": "이 이미지를 보고 음식점 마케팅에 활용할 수 있도록 매력적으로 설명해주세요. 한국어로 50자 이내로 설명해주세요."
|
||||
},
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {
|
||||
"url": f"data:image/jpeg;base64,{image_base64}"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
max_tokens=300
|
||||
)
|
||||
return response.choices[0].message.content
|
||||
except Exception as e:
|
||||
print(f"OpenAI 이미지 분석 실패: {e}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"이미지 분석 전체 실패: {e}")
|
||||
|
||||
return "맛있고 매력적인 음식점의 특별한 순간"
|
||||
|
||||
def _encode_image_to_base64(self, image_path: str) -> str:
|
||||
"""이미지 파일을 base64로 인코딩"""
|
||||
with open(image_path, "rb") as image_file:
|
||||
image = Image.open(image_file)
|
||||
if image.width > 1024 or image.height > 1024:
|
||||
image.thumbnail((1024, 1024), Image.Resampling.LANCZOS)
|
||||
|
||||
if image.mode == 'RGBA':
|
||||
background = Image.new('RGB', image.size, (255, 255, 255))
|
||||
background.paste(image, mask=image.split()[-1])
|
||||
image = background
|
||||
|
||||
img_buffer = io.BytesIO()
|
||||
image.save(img_buffer, format='JPEG', quality=85)
|
||||
img_buffer.seek(0)
|
||||
return base64.b64encode(img_buffer.getvalue()).decode('utf-8')
|
||||
|
||||
def _generate_fallback_content(self, prompt: str) -> str:
|
||||
"""AI 서비스 실패시 기본 콘텐츠 생성"""
|
||||
if "콘텐츠" in prompt or "게시글" in prompt:
|
||||
return """안녕하세요! 오늘도 맛있는 하루 되세요 😊
|
||||
우리 가게의 특별한 메뉴를 소개합니다!
|
||||
정성껏 준비한 음식으로 여러분을 맞이하겠습니다.
|
||||
많은 관심과 사랑 부탁드려요!"""
|
||||
elif "포스터" in prompt:
|
||||
return "특별한 이벤트\n지금 바로 확인하세요\n우리 가게에서 만나요\n놓치지 마세요!"
|
||||
else:
|
||||
return "안녕하세요! 우리 가게를 찾아주셔서 감사합니다."
|
||||
@@ -0,0 +1,117 @@
|
||||
"""
|
||||
Azure Blob Storage 유틸리티
|
||||
이미지 업로드 및 URL 생성 기능 제공
|
||||
"""
|
||||
import os
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
from azure.storage.blob import BlobServiceClient, ContentSettings
|
||||
from config.config import Config
|
||||
|
||||
|
||||
class BlobStorageClient:
|
||||
"""Azure Blob Storage 클라이언트 클래스"""
|
||||
|
||||
def __init__(self):
|
||||
"""Blob Storage 클라이언트 초기화"""
|
||||
self.account_name = Config.AZURE_STORAGE_ACCOUNT_NAME
|
||||
self.account_key = Config.AZURE_STORAGE_ACCOUNT_KEY
|
||||
self.container_name = Config.AZURE_STORAGE_CONTAINER_NAME
|
||||
|
||||
if not self.account_key:
|
||||
raise ValueError("Azure Storage Account Key가 설정되지 않았습니다.")
|
||||
|
||||
# Connection String 생성
|
||||
connection_string = f"DefaultEndpointsProtocol=https;AccountName={self.account_name};AccountKey={self.account_key};EndpointSuffix=core.windows.net"
|
||||
|
||||
# Blob Service Client 초기화
|
||||
self.blob_service_client = BlobServiceClient.from_connection_string(connection_string)
|
||||
|
||||
def upload_image(self, image_data: bytes, file_extension: str = 'png') -> str:
|
||||
"""
|
||||
이미지를 Blob Storage에 업로드
|
||||
|
||||
Args:
|
||||
image_data: 업로드할 이미지 바이트 데이터
|
||||
file_extension: 파일 확장자 (기본값: 'png')
|
||||
|
||||
Returns:
|
||||
업로드된 이미지의 Public URL
|
||||
"""
|
||||
try:
|
||||
# 파일명 생성: poster_YYYYMMDDHHMMSS.png
|
||||
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
|
||||
blob_name = f"poster_{timestamp}.{file_extension}"
|
||||
|
||||
# Content Type 설정
|
||||
content_settings = ContentSettings(content_type=f'image/{file_extension}')
|
||||
|
||||
# Blob 업로드
|
||||
blob_client = self.blob_service_client.get_blob_client(
|
||||
container=self.container_name,
|
||||
blob=blob_name
|
||||
)
|
||||
|
||||
blob_client.upload_blob(
|
||||
image_data,
|
||||
content_settings=content_settings,
|
||||
overwrite=True
|
||||
)
|
||||
|
||||
# Public URL 생성
|
||||
blob_url = f"https://{self.account_name}.blob.core.windows.net/{self.container_name}/{blob_name}"
|
||||
|
||||
print(f"✅ 이미지 업로드 완료: {blob_url}")
|
||||
return blob_url
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Blob Storage 업로드 실패: {str(e)}")
|
||||
raise Exception(f"이미지 업로드 실패: {str(e)}")
|
||||
|
||||
def upload_file(self, file_path: str) -> str:
|
||||
"""
|
||||
로컬 파일을 Blob Storage에 업로드
|
||||
|
||||
Args:
|
||||
file_path: 업로드할 로컬 파일 경로
|
||||
|
||||
Returns:
|
||||
업로드된 파일의 Public URL
|
||||
"""
|
||||
try:
|
||||
# 파일 확장자 추출
|
||||
file_extension = os.path.splitext(file_path)[1][1:].lower()
|
||||
|
||||
# 파일 읽기
|
||||
with open(file_path, 'rb') as file:
|
||||
file_data = file.read()
|
||||
|
||||
# 업로드
|
||||
return self.upload_image(file_data, file_extension)
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 파일 업로드 실패: {str(e)}")
|
||||
raise Exception(f"파일 업로드 실패: {str(e)}")
|
||||
|
||||
def delete_blob(self, blob_name: str) -> bool:
|
||||
"""
|
||||
Blob 삭제
|
||||
|
||||
Args:
|
||||
blob_name: 삭제할 Blob 이름
|
||||
|
||||
Returns:
|
||||
삭제 성공 여부
|
||||
"""
|
||||
try:
|
||||
blob_client = self.blob_service_client.get_blob_client(
|
||||
container=self.container_name,
|
||||
blob=blob_name
|
||||
)
|
||||
blob_client.delete_blob()
|
||||
print(f"✅ Blob 삭제 완료: {blob_name}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Blob 삭제 실패: {str(e)}")
|
||||
return False
|
||||
@@ -0,0 +1,166 @@
|
||||
"""
|
||||
이미지 처리 유틸리티
|
||||
이미지 분석, 변환, 최적화 기능 제공
|
||||
"""
|
||||
import os
|
||||
from typing import Dict, Any, Tuple
|
||||
from PIL import Image, ImageOps
|
||||
import io
|
||||
class ImageProcessor:
|
||||
"""이미지 처리 클래스"""
|
||||
def __init__(self):
|
||||
"""이미지 프로세서 초기화"""
|
||||
self.supported_formats = {'JPEG', 'PNG', 'WEBP', 'GIF'}
|
||||
self.max_size = (2048, 2048) # 최대 크기
|
||||
self.thumbnail_size = (400, 400) # 썸네일 크기
|
||||
def get_image_info(self, image_path: str) -> Dict[str, Any]:
|
||||
"""
|
||||
이미지 기본 정보 추출
|
||||
Args:
|
||||
image_path: 이미지 파일 경로
|
||||
Returns:
|
||||
이미지 정보 딕셔너리
|
||||
"""
|
||||
try:
|
||||
with Image.open(image_path) as image:
|
||||
info = {
|
||||
'filename': os.path.basename(image_path),
|
||||
'format': image.format,
|
||||
'mode': image.mode,
|
||||
'size': image.size,
|
||||
'width': image.width,
|
||||
'height': image.height,
|
||||
'file_size': os.path.getsize(image_path),
|
||||
'aspect_ratio': round(image.width / image.height, 2) if image.height > 0 else 0
|
||||
}
|
||||
# 이미지 특성 분석
|
||||
info['is_landscape'] = image.width > image.height
|
||||
info['is_portrait'] = image.height > image.width
|
||||
info['is_square'] = abs(image.width - image.height) < 50
|
||||
return info
|
||||
except Exception as e:
|
||||
return {
|
||||
'filename': os.path.basename(image_path),
|
||||
'error': str(e)
|
||||
}
|
||||
def resize_image(self, image_path: str, target_size: Tuple[int, int],
|
||||
maintain_aspect: bool = True) -> Image.Image:
|
||||
"""
|
||||
이미지 크기 조정
|
||||
Args:
|
||||
image_path: 원본 이미지 경로
|
||||
target_size: 목표 크기 (width, height)
|
||||
maintain_aspect: 종횡비 유지 여부
|
||||
Returns:
|
||||
리사이즈된 PIL 이미지
|
||||
"""
|
||||
try:
|
||||
with Image.open(image_path) as image:
|
||||
if maintain_aspect:
|
||||
# 종횡비 유지하며 리사이즈
|
||||
image.thumbnail(target_size, Image.Resampling.LANCZOS)
|
||||
return image.copy()
|
||||
else:
|
||||
# 강제 리사이즈
|
||||
return image.resize(target_size, Image.Resampling.LANCZOS)
|
||||
except Exception as e:
|
||||
raise Exception(f"이미지 리사이즈 실패: {str(e)}")
|
||||
def optimize_image(self, image_path: str, quality: int = 85) -> bytes:
|
||||
"""
|
||||
이미지 최적화 (파일 크기 줄이기)
|
||||
Args:
|
||||
image_path: 원본 이미지 경로
|
||||
quality: JPEG 품질 (1-100)
|
||||
Returns:
|
||||
최적화된 이미지 바이트
|
||||
"""
|
||||
try:
|
||||
with Image.open(image_path) as image:
|
||||
# RGBA를 RGB로 변환 (JPEG 저장을 위해)
|
||||
if image.mode == 'RGBA':
|
||||
background = Image.new('RGB', image.size, (255, 255, 255))
|
||||
background.paste(image, mask=image.split()[-1])
|
||||
image = background
|
||||
# 크기가 너무 크면 줄이기
|
||||
if image.width > self.max_size[0] or image.height > self.max_size[1]:
|
||||
image.thumbnail(self.max_size, Image.Resampling.LANCZOS)
|
||||
# 바이트 스트림으로 저장
|
||||
img_buffer = io.BytesIO()
|
||||
image.save(img_buffer, format='JPEG', quality=quality, optimize=True)
|
||||
return img_buffer.getvalue()
|
||||
except Exception as e:
|
||||
raise Exception(f"이미지 최적화 실패: {str(e)}")
|
||||
def create_thumbnail(self, image_path: str, size: Tuple[int, int] = None) -> Image.Image:
|
||||
"""
|
||||
썸네일 생성
|
||||
Args:
|
||||
image_path: 원본 이미지 경로
|
||||
size: 썸네일 크기 (기본값: self.thumbnail_size)
|
||||
Returns:
|
||||
썸네일 PIL 이미지
|
||||
"""
|
||||
if size is None:
|
||||
size = self.thumbnail_size
|
||||
try:
|
||||
with Image.open(image_path) as image:
|
||||
# 정사각형 썸네일 생성
|
||||
thumbnail = ImageOps.fit(image, size, Image.Resampling.LANCZOS)
|
||||
return thumbnail
|
||||
except Exception as e:
|
||||
raise Exception(f"썸네일 생성 실패: {str(e)}")
|
||||
def analyze_colors(self, image_path: str, num_colors: int = 5) -> list:
|
||||
"""
|
||||
이미지의 주요 색상 추출
|
||||
Args:
|
||||
image_path: 이미지 파일 경로
|
||||
num_colors: 추출할 색상 개수
|
||||
Returns:
|
||||
주요 색상 리스트 [(R, G, B), ...]
|
||||
"""
|
||||
try:
|
||||
with Image.open(image_path) as image:
|
||||
# RGB로 변환
|
||||
if image.mode != 'RGB':
|
||||
image = image.convert('RGB')
|
||||
# 이미지 크기 줄여서 처리 속도 향상
|
||||
image.thumbnail((150, 150))
|
||||
# 색상 히스토그램 생성
|
||||
colors = image.getcolors(maxcolors=256*256*256)
|
||||
if colors:
|
||||
# 빈도순으로 정렬
|
||||
colors.sort(key=lambda x: x[0], reverse=True)
|
||||
# 상위 색상들 반환
|
||||
dominant_colors = []
|
||||
for count, color in colors[:num_colors]:
|
||||
dominant_colors.append(color)
|
||||
return dominant_colors
|
||||
return [(128, 128, 128)] # 기본 회색
|
||||
except Exception as e:
|
||||
print(f"색상 분석 실패: {e}")
|
||||
return [(128, 128, 128)] # 기본 회색
|
||||
def is_food_image(self, image_path: str) -> bool:
|
||||
"""
|
||||
음식 이미지 여부 간단 판별
|
||||
(실제로는 AI 모델이 필요하지만, 여기서는 기본적인 휴리스틱 사용)
|
||||
Args:
|
||||
image_path: 이미지 파일 경로
|
||||
Returns:
|
||||
음식 이미지 여부
|
||||
"""
|
||||
try:
|
||||
# 파일명에서 키워드 확인
|
||||
filename = os.path.basename(image_path).lower()
|
||||
food_keywords = ['food', 'meal', 'dish', 'menu', '음식', '메뉴', '요리']
|
||||
for keyword in food_keywords:
|
||||
if keyword in filename:
|
||||
return True
|
||||
# 색상 분석으로 간단 판별 (음식은 따뜻한 색조가 많음)
|
||||
colors = self.analyze_colors(image_path, 3)
|
||||
warm_color_count = 0
|
||||
for r, g, b in colors:
|
||||
# 따뜻한 색상 (빨강, 노랑, 주황 계열) 확인
|
||||
if r > 150 or (r > g and r > b):
|
||||
warm_color_count += 1
|
||||
return warm_color_count >= 2
|
||||
except:
|
||||
return False
|
||||
Reference in New Issue
Block a user