LLM 서빙 파이프라인 상세
본 문서는 LLM 서비스의 요청 처리 파이프라인을 단계별로 분석하고, 각 단계의 최적화 방법을 다룬다.
1. 파이프라인 개요
1.1 요청 처리 흐름도
[Client Request]
|
v
+-----------------------------------------------------------------------------------+
| [1. Ingress Layer] |
| +-------------+ +---------------+ +------------------+ |
| | Rate Limit | -> | Auth/Session | -> | Request Validate | |
| +-------------+ +---------------+ +------------------+ |
+-----------------------------------------------------------------------------------+
|
v
+-----------------------------------------------------------------------------------+
| [2. Cache Layer] |
| +---------------+ |
| | Redis Lookup | --cache hit--> [Return Cached Response] |
| +---------------+ |
| | |
| cache miss |
+-----------------------------------------------------------------------------------+
|
v
+-----------------------------------------------------------------------------------+
| [3. Retrieval Layer] |
| +--------------+ +--------------+ +---------------+ +---------------+ |
| | Query Rewrite| -> | Embedding | -> | Vector Search | -> | Reranking | |
| +--------------+ | (Triton) | | (ES/Milvus) | | (Triton) | |
| +--------------+ +---------------+ +---------------+ |
+-----------------------------------------------------------------------------------+
|
v
+-----------------------------------------------------------------------------------+
| [4. Prompt Layer] |
| +----------------+ +------------------+ +------------------+ |
| | Context Window | -> | Template Render | -> | Token Counting | |
| | Management | | (Jinja2) | | & Truncation | |
| +----------------+ +------------------+ +------------------+ |
+-----------------------------------------------------------------------------------+
|
v
+-----------------------------------------------------------------------------------+
| [5. Inference Layer] |
| +---------------+ +---------------+ +---------------+ |
| | Request Queue | -> | vLLM Engine | -> | Token Stream | |
| +---------------+ +---------------+ +---------------+ |
+-----------------------------------------------------------------------------------+
|
v
+-----------------------------------------------------------------------------------+
| [6. Post-processing Layer] |
| +---------------+ +---------------+ +---------------+ |
| | Content Filter| -> | Format/Parse | -> | Cache Update | |
| +---------------+ +---------------+ +---------------+ |
+-----------------------------------------------------------------------------------+
|
v
[Client Response]
1.2 단계별 지연 시간 분포 (예시)
| 단계 |
평균 지연 |
P95 지연 |
비율 |
| Ingress |
2ms |
5ms |
1% |
| Cache Lookup |
1ms |
3ms |
0.5% |
| Retrieval |
50ms |
120ms |
15% |
| Prompt Construction |
5ms |
15ms |
2% |
| Inference |
800ms |
2000ms |
80% |
| Post-processing |
10ms |
30ms |
1.5% |
| Total |
~870ms |
~2200ms |
100% |
2. 단계별 상세
2.1 Ingress Layer
Rate Limiting
import redis
from fastapi import HTTPException
from datetime import datetime
class RateLimiter:
"""
Sliding Window Rate Limiter using Redis Sorted Set.
"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.window_size = 60 # seconds
self.max_requests = 100 # per window
async def check(self, user_id: str) -> bool:
key = f"ratelimit:{user_id}"
now = datetime.now().timestamp()
window_start = now - self.window_size
pipe = self.redis.pipeline()
# Remove old entries
pipe.zremrangebyscore(key, 0, window_start)
# Count current window
pipe.zcard(key)
# Add current request
pipe.zadd(key, {str(now): now})
# Set expiry
pipe.expire(key, self.window_size)
results = pipe.execute()
current_count = results[1]
if current_count >= self.max_requests:
raise HTTPException(
status_code=429,
detail={
"error": "rate_limit_exceeded",
"retry_after": self.window_size,
"limit": self.max_requests
}
)
return True
Request Validation
from pydantic import BaseModel, Field, validator
from typing import Optional
class ChatRequest(BaseModel):
"""Chat completion request schema."""
messages: list[dict] = Field(..., min_items=1, max_items=100)
model: str = Field(default="llama-3.1-8b-instruct")
max_tokens: int = Field(default=1024, ge=1, le=4096)
temperature: float = Field(default=0.7, ge=0.0, le=2.0)
stream: bool = Field(default=True)
# RAG options
use_retrieval: bool = Field(default=True)
retrieval_top_k: int = Field(default=5, ge=1, le=20)
@validator('messages')
def validate_messages(cls, v):
for msg in v:
if msg.get('role') not in ['system', 'user', 'assistant']:
raise ValueError(f"Invalid role: {msg.get('role')}")
if not msg.get('content'):
raise ValueError("Message content cannot be empty")
return v
@validator('model')
def validate_model(cls, v):
allowed_models = [
'llama-3.1-8b-instruct',
'llama-3.1-70b-instruct',
'qwen2.5-7b-instruct'
]
if v not in allowed_models:
raise ValueError(f"Model not available: {v}")
return v
2.2 Cache Layer
캐시 키 생성
import hashlib
import json
from typing import Any
def generate_cache_key(
messages: list[dict],
model: str,
temperature: float,
context_docs: list[str] = None
) -> str:
"""
캐시 키 생성.
temperature > 0인 경우 캐시하지 않음 (비결정적 출력).
"""
if temperature > 0:
return None # Non-deterministic, skip cache
# Normalize and hash inputs
cache_input = {
"messages": messages,
"model": model,
"context": sorted(context_docs) if context_docs else []
}
serialized = json.dumps(cache_input, sort_keys=True, ensure_ascii=False)
hash_value = hashlib.sha256(serialized.encode()).hexdigest()[:16]
return f"chat:response:{hash_value}"
class ResponseCache:
"""Redis-based response cache with TTL."""
def __init__(self, redis_client, default_ttl: int = 3600):
self.redis = redis_client
self.default_ttl = default_ttl
async def get(self, key: str) -> Optional[dict]:
if not key:
return None
cached = await self.redis.get(key)
if cached:
return json.loads(cached)
return None
async def set(self, key: str, value: dict, ttl: int = None):
if not key:
return
await self.redis.setex(
key,
ttl or self.default_ttl,
json.dumps(value, ensure_ascii=False)
)
캐시 전략
| 케이스 |
캐시 여부 |
TTL |
비고 |
| temperature = 0 |
O |
24h |
결정적 출력 |
| temperature > 0 |
X |
- |
비결정적 |
| 동일 질문 + 동일 컨텍스트 |
O |
1h |
세션 내 반복 |
| 자주 묻는 질문 (FAQ) |
O |
7d |
수동 관리 |
2.3 Retrieval Layer
Query Rewriting
class QueryRewriter:
"""
대화 컨텍스트를 고려한 쿼리 재작성.
"""
def __init__(self, llm_client):
self.llm = llm_client
self.prompt_template = """
이전 대화:
{conversation_history}
현재 질문: {current_query}
위 대화 맥락을 고려하여, 검색에 적합한 독립적인 질문으로 재작성하세요.
대명사(그것, 이것 등)를 구체적인 명사로 바꾸세요.
재작성된 질문:"""
async def rewrite(
self,
current_query: str,
conversation_history: list[dict]
) -> str:
# 대화가 짧으면 재작성 불필요
if len(conversation_history) <= 1:
return current_query
history_text = "\n".join([
f"{msg['role']}: {msg['content']}"
for msg in conversation_history[-4:] # 최근 4턴
])
prompt = self.prompt_template.format(
conversation_history=history_text,
current_query=current_query
)
response = await self.llm.generate(
prompt,
max_tokens=100,
temperature=0
)
return response.strip()
Embedding & Vector Search
import tritonclient.grpc as grpcclient
import numpy as np
from elasticsearch import AsyncElasticsearch
class EmbeddingService:
"""Triton-based embedding service."""
def __init__(self, triton_url: str, model_name: str = "embedding"):
self.client = grpcclient.InferenceServerClient(triton_url)
self.model_name = model_name
self.tokenizer = AutoTokenizer.from_pretrained("BAAI/bge-m3")
async def embed(self, texts: list[str]) -> np.ndarray:
# Tokenize
encoded = self.tokenizer(
texts,
padding=True,
truncation=True,
max_length=512,
return_tensors="np"
)
# Prepare inputs
inputs = [
grpcclient.InferInput("input_ids", encoded["input_ids"].shape, "INT64"),
grpcclient.InferInput("attention_mask", encoded["attention_mask"].shape, "INT64")
]
inputs[0].set_data_from_numpy(encoded["input_ids"])
inputs[1].set_data_from_numpy(encoded["attention_mask"])
# Inference
outputs = [grpcclient.InferRequestedOutput("embeddings")]
response = self.client.infer(self.model_name, inputs, outputs=outputs)
embeddings = response.as_numpy("embeddings")
# Normalize
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
return embeddings / norms
class VectorSearchService:
"""Elasticsearch-based vector search."""
def __init__(self, es_client: AsyncElasticsearch, index_name: str):
self.es = es_client
self.index = index_name
async def search(
self,
query_vector: np.ndarray,
top_k: int = 10,
filters: dict = None
) -> list[dict]:
query = {
"knn": {
"field": "embedding",
"query_vector": query_vector.tolist(),
"k": top_k,
"num_candidates": top_k * 10
}
}
if filters:
query["knn"]["filter"] = filters
response = await self.es.search(
index=self.index,
body=query,
size=top_k,
_source=["content", "metadata", "title"]
)
results = []
for hit in response["hits"]["hits"]:
results.append({
"id": hit["_id"],
"score": hit["_score"],
"content": hit["_source"]["content"],
"metadata": hit["_source"].get("metadata", {}),
"title": hit["_source"].get("title", "")
})
return results
Reranking
class RerankerService:
"""Cross-encoder based reranker using Triton."""
def __init__(self, triton_url: str, model_name: str = "reranker"):
self.client = grpcclient.InferenceServerClient(triton_url)
self.model_name = model_name
self.tokenizer = AutoTokenizer.from_pretrained("BAAI/bge-reranker-v2-m3")
async def rerank(
self,
query: str,
documents: list[dict],
top_k: int = 5
) -> list[dict]:
if not documents:
return []
# Prepare query-document pairs
pairs = [(query, doc["content"]) for doc in documents]
# Tokenize
encoded = self.tokenizer(
pairs,
padding=True,
truncation=True,
max_length=512,
return_tensors="np"
)
# Inference
inputs = [
grpcclient.InferInput("input_ids", encoded["input_ids"].shape, "INT64"),
grpcclient.InferInput("attention_mask", encoded["attention_mask"].shape, "INT64")
]
inputs[0].set_data_from_numpy(encoded["input_ids"])
inputs[1].set_data_from_numpy(encoded["attention_mask"])
outputs = [grpcclient.InferRequestedOutput("scores")]
response = self.client.infer(self.model_name, inputs, outputs=outputs)
scores = response.as_numpy("scores").flatten()
# Sort by score
scored_docs = list(zip(documents, scores))
scored_docs.sort(key=lambda x: x[1], reverse=True)
# Return top-k with scores
results = []
for doc, score in scored_docs[:top_k]:
doc["rerank_score"] = float(score)
results.append(doc)
return results
2.4 Prompt Layer
Context Window Management
import tiktoken
class ContextWindowManager:
"""
컨텍스트 윈도우 관리.
모델의 최대 토큰 수를 초과하지 않도록 관리.
"""
def __init__(self, model_name: str, max_context: int = 8192):
self.max_context = max_context
self.encoder = tiktoken.get_encoding("cl100k_base")
# Reserve tokens for output
self.output_reserve = 1024
self.available_context = max_context - self.output_reserve
def count_tokens(self, text: str) -> int:
return len(self.encoder.encode(text))
def fit_context(
self,
system_prompt: str,
messages: list[dict],
retrieved_docs: list[dict],
max_docs: int = 5
) -> tuple[str, list[dict], list[dict]]:
"""
컨텍스트 윈도우에 맞게 내용 조정.
우선순위: system > recent messages > retrieved docs
"""
# 1. System prompt (필수)
system_tokens = self.count_tokens(system_prompt)
remaining = self.available_context - system_tokens
# 2. Messages (최근 것부터, 최소 현재 질문은 포함)
fitted_messages = []
for msg in reversed(messages):
msg_tokens = self.count_tokens(msg["content"])
if msg_tokens <= remaining:
fitted_messages.insert(0, msg)
remaining -= msg_tokens
else:
break
# 3. Retrieved docs (남은 공간에 맞게)
fitted_docs = []
for doc in retrieved_docs[:max_docs]:
doc_tokens = self.count_tokens(doc["content"])
if doc_tokens <= remaining:
fitted_docs.append(doc)
remaining -= doc_tokens
else:
# 문서가 너무 길면 잘라서 포함
truncated = self._truncate_to_tokens(doc["content"], remaining - 50)
if truncated:
doc["content"] = truncated
fitted_docs.append(doc)
break
return system_prompt, fitted_messages, fitted_docs
def _truncate_to_tokens(self, text: str, max_tokens: int) -> str:
tokens = self.encoder.encode(text)
if len(tokens) <= max_tokens:
return text
return self.encoder.decode(tokens[:max_tokens])
Prompt Template
from jinja2 import Template
class PromptBuilder:
"""RAG 프롬프트 생성기."""
SYSTEM_TEMPLATE = """당신은 빈집 정보를 제공하는 AI 어시스턴트입니다.
아래 참고 자료를 바탕으로 질문에 정확하게 답변하세요.
규칙:
- 참고 자료에 없는 내용은 추측하지 마세요
- 확실하지 않은 경우 "확인이 필요합니다"라고 답변하세요
- 답변은 간결하고 명확하게 작성하세요
{% if retrieved_docs %}
## 참고 자료
{% for doc in retrieved_docs %}
### [{{ loop.index }}] {{ doc.title | default("문서") }}
{{ doc.content }}
{% if doc.metadata.source %}
출처: {{ doc.metadata.source }}
{% endif %}
{% endfor %}
{% endif %}
"""
def __init__(self):
self.system_template = Template(self.SYSTEM_TEMPLATE)
def build(
self,
messages: list[dict],
retrieved_docs: list[dict] = None
) -> list[dict]:
"""OpenAI 형식의 메시지 리스트 생성."""
# System message with context
system_content = self.system_template.render(
retrieved_docs=retrieved_docs or []
)
result = [{"role": "system", "content": system_content}]
# Add conversation messages
for msg in messages:
result.append({
"role": msg["role"],
"content": msg["content"]
})
return result
2.5 Inference Layer
vLLM Integration
from openai import AsyncOpenAI
from typing import AsyncGenerator
class VLLMClient:
"""vLLM OpenAI-compatible client."""
def __init__(self, base_url: str):
self.client = AsyncOpenAI(
base_url=base_url,
api_key="dummy" # vLLM doesn't require key
)
async def generate(
self,
messages: list[dict],
model: str = "llama-3.1-8b-instruct",
max_tokens: int = 1024,
temperature: float = 0.7,
stream: bool = True
) -> AsyncGenerator[str, None]:
"""스트리밍 생성."""
response = await self.client.chat.completions.create(
model=model,
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
stream=stream,
# vLLM specific options
extra_body={
"use_beam_search": False,
"top_k": 50,
"repetition_penalty": 1.1
}
)
if stream:
async for chunk in response:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
else:
yield response.choices[0].message.content
async def generate_batch(
self,
batch_messages: list[list[dict]],
**kwargs
) -> list[str]:
"""배치 생성 (비스트리밍)."""
tasks = [
self.client.chat.completions.create(
messages=messages,
stream=False,
**kwargs
)
for messages in batch_messages
]
responses = await asyncio.gather(*tasks)
return [r.choices[0].message.content for r in responses]
Request Queue Management
import asyncio
from dataclasses import dataclass, field
from typing import Any
import time
@dataclass(order=True)
class PrioritizedRequest:
"""우선순위 기반 요청."""
priority: int
timestamp: float = field(compare=False)
request_id: str = field(compare=False)
data: Any = field(compare=False)
class RequestQueue:
"""
우선순위 큐 기반 요청 관리.
- Priority 0: 실시간 채팅
- Priority 1: 배치 작업
- Priority 2: 백그라운드 처리
"""
def __init__(self, max_size: int = 1000):
self.queue = asyncio.PriorityQueue(maxsize=max_size)
self.metrics = {
"enqueued": 0,
"dequeued": 0,
"dropped": 0
}
async def enqueue(
self,
request: dict,
priority: int = 0,
timeout: float = 30.0
) -> str:
"""요청을 큐에 추가."""
request_id = f"{time.time():.6f}"
item = PrioritizedRequest(
priority=priority,
timestamp=time.time(),
request_id=request_id,
data=request
)
try:
await asyncio.wait_for(
self.queue.put(item),
timeout=timeout
)
self.metrics["enqueued"] += 1
return request_id
except asyncio.TimeoutError:
self.metrics["dropped"] += 1
raise Exception("Queue full, request dropped")
async def dequeue(self) -> PrioritizedRequest:
"""큐에서 요청 가져오기."""
item = await self.queue.get()
self.metrics["dequeued"] += 1
return item
def size(self) -> int:
return self.queue.qsize()
2.6 Post-processing Layer
Content Filtering
import re
from typing import Optional
class ContentFilter:
"""응답 필터링 및 검증."""
def __init__(self):
# 민감 정보 패턴
self.pii_patterns = [
r'\b\d{6}-?\d{7}\b', # 주민등록번호
r'\b\d{3}-\d{4}-\d{4}\b', # 전화번호
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', # 이메일
]
# 금지 패턴
self.blocked_patterns = [
r'(?i)(폭발물|마약|무기)\s*(제조|만들)',
]
def filter(self, text: str) -> tuple[str, list[str]]:
"""
텍스트 필터링.
Returns: (필터링된 텍스트, 감지된 이슈 목록)
"""
issues = []
filtered = text
# PII 마스킹
for pattern in self.pii_patterns:
if re.search(pattern, filtered):
issues.append(f"PII detected: {pattern}")
filtered = re.sub(pattern, "[MASKED]", filtered)
# 금지 콘텐츠 검사
for pattern in self.blocked_patterns:
if re.search(pattern, filtered):
issues.append(f"Blocked content: {pattern}")
return "[응답이 필터링되었습니다]", issues
return filtered, issues
class ResponseFormatter:
"""응답 포맷팅."""
@staticmethod
def format_sources(docs: list[dict]) -> str:
"""참고 자료 출처 포맷팅."""
if not docs:
return ""
sources = []
for i, doc in enumerate(docs, 1):
source = doc.get("metadata", {}).get("source", "Unknown")
title = doc.get("title", f"문서 {i}")
sources.append(f"[{i}] {title} ({source})")
return "\n\n---\n참고 자료:\n" + "\n".join(sources)
@staticmethod
def format_streaming_chunk(
content: str,
is_first: bool = False,
is_last: bool = False,
metadata: dict = None
) -> dict:
"""SSE 스트리밍 청크 포맷."""
chunk = {
"type": "content",
"content": content
}
if is_first:
chunk["type"] = "start"
if is_last:
chunk["type"] = "end"
if metadata:
chunk["metadata"] = metadata
return chunk
3. 전체 파이프라인 통합
3.1 통합 핸들러
class ChatHandler:
"""통합 채팅 핸들러."""
def __init__(
self,
rate_limiter: RateLimiter,
cache: ResponseCache,
query_rewriter: QueryRewriter,
embedding_service: EmbeddingService,
vector_search: VectorSearchService,
reranker: RerankerService,
context_manager: ContextWindowManager,
prompt_builder: PromptBuilder,
vllm_client: VLLMClient,
content_filter: ContentFilter
):
self.rate_limiter = rate_limiter
self.cache = cache
self.query_rewriter = query_rewriter
self.embedding = embedding_service
self.search = vector_search
self.reranker = reranker
self.context = context_manager
self.prompt = prompt_builder
self.llm = vllm_client
self.filter = content_filter
async def handle(
self,
request: ChatRequest,
user_id: str
) -> AsyncGenerator[dict, None]:
"""요청 처리 메인 파이프라인."""
start_time = time.time()
metrics = {"stages": {}}
# 1. Rate Limiting
await self.rate_limiter.check(user_id)
metrics["stages"]["rate_limit"] = time.time() - start_time
# 2. Cache Check
cache_key = generate_cache_key(
request.messages,
request.model,
request.temperature
)
cached = await self.cache.get(cache_key)
if cached:
yield {"type": "cached", "content": cached["content"]}
return
metrics["stages"]["cache_check"] = time.time() - start_time
# 3. Retrieval (if enabled)
retrieved_docs = []
if request.use_retrieval:
# 3a. Query Rewrite
current_query = request.messages[-1]["content"]
rewritten_query = await self.query_rewriter.rewrite(
current_query,
request.messages
)
# 3b. Embedding
query_embedding = await self.embedding.embed([rewritten_query])
# 3c. Vector Search
candidates = await self.search.search(
query_embedding[0],
top_k=request.retrieval_top_k * 2
)
# 3d. Reranking
retrieved_docs = await self.reranker.rerank(
rewritten_query,
candidates,
top_k=request.retrieval_top_k
)
metrics["stages"]["retrieval"] = time.time() - start_time
# 4. Prompt Construction
system, messages, docs = self.context.fit_context(
self.prompt.SYSTEM_TEMPLATE,
request.messages,
retrieved_docs
)
final_messages = self.prompt.build(messages, docs)
metrics["stages"]["prompt"] = time.time() - start_time
# 5. Inference (Streaming)
yield {"type": "start", "metadata": {"model": request.model}}
full_response = []
async for chunk in self.llm.generate(
final_messages,
model=request.model,
max_tokens=request.max_tokens,
temperature=request.temperature,
stream=True
):
# 6. Content Filtering (per chunk)
filtered_chunk, issues = self.filter.filter(chunk)
full_response.append(filtered_chunk)
yield {"type": "content", "content": filtered_chunk}
# Final processing
final_content = "".join(full_response)
metrics["stages"]["inference"] = time.time() - start_time
# Cache update
if cache_key:
await self.cache.set(cache_key, {"content": final_content})
# Add sources
sources = ResponseFormatter.format_sources(retrieved_docs)
if sources:
yield {"type": "sources", "content": sources}
metrics["total_time"] = time.time() - start_time
yield {"type": "end", "metadata": metrics}
3.2 FastAPI 엔드포인트
from fastapi import FastAPI, Request, Depends
from fastapi.responses import StreamingResponse
import json
app = FastAPI()
@app.post("/v1/chat/completions")
async def chat_completions(
request: ChatRequest,
user_id: str = Depends(get_current_user),
handler: ChatHandler = Depends(get_chat_handler)
):
"""OpenAI 호환 Chat Completion 엔드포인트."""
if request.stream:
return StreamingResponse(
stream_response(handler, request, user_id),
media_type="text/event-stream"
)
else:
return await non_stream_response(handler, request, user_id)
async def stream_response(handler, request, user_id):
"""SSE 스트리밍 응답 생성."""
async for chunk in handler.handle(request, user_id):
data = json.dumps(chunk, ensure_ascii=False)
yield f"data: {data}\n\n"
yield "data: [DONE]\n\n"
4. 최적화 포인트
4.1 단계별 최적화
| 단계 |
최적화 방법 |
기대 효과 |
| Ingress |
Connection pooling, Keep-alive |
연결 오버헤드 감소 |
| Cache |
캐시 예열, 압축 |
Hit rate 향상, 메모리 절약 |
| Retrieval |
인덱스 최적화, 배치 임베딩 |
검색 지연 50% 감소 |
| Prompt |
토큰 캐싱, 템플릿 사전 컴파일 |
구성 시간 단축 |
| Inference |
Continuous batching, Speculative decoding |
처리량 2-3배 향상 |
| Post |
비동기 로깅, 병렬 필터링 |
응답 시간 개선 |
4.2 병목 식별 및 해결
병목 식별 프로세스:
1. 프로파일링
|
v
2. 단계별 지연 측정
|
v
3. 병목 지점 식별
|
+-- Inference (80%+) --> GPU 업그레이드, 모델 최적화
|
+-- Retrieval (15%+) --> 인덱스 튜닝, 캐싱 강화
|
+-- Network (5%+) --> 위치 최적화, 압축
4.3 Inference 최적화 상세
# vLLM 최적화 설정
VLLM_OPTIMIZATION = {
# Continuous Batching
"max_num_seqs": 256,
"max_num_batched_tokens": 32768,
# Memory Optimization
"gpu_memory_utilization": 0.9,
"swap_space": 4, # GB
# KV Cache Optimization
"block_size": 16,
"enable_prefix_caching": True,
# Quantization
"quantization": "awq", # or "gptq", "squeezellm"
"dtype": "float16",
# Speculative Decoding (draft model)
"speculative_model": "draft-model-path",
"num_speculative_tokens": 5,
}
4.4 성능 벤치마크 기준
| 메트릭 |
목표값 |
측정 방법 |
| TTFT (Time to First Token) |
< 500ms |
P95 기준 |
| TPOT (Time per Output Token) |
< 50ms |
평균 |
| Throughput |
> 100 req/min |
동시 10 사용자 |
| Cache Hit Rate |
> 30% |
일일 평균 |
| Error Rate |
< 0.1% |
5xx 응답 비율 |
5. 트러블슈팅 가이드
5.1 일반적인 문제
| 증상 |
원인 |
해결 |
| TTFT 급증 |
KV Cache 부족 |
GPU 메모리 확보, 배치 크기 조절 |
| OOM 에러 |
긴 컨텍스트 |
max_model_len 제한, 청킹 |
| 검색 품질 저하 |
임베딩 불일치 |
동일 모델 사용, 재인덱싱 |
| 스트리밍 끊김 |
네트워크 타임아웃 |
Keep-alive, 버퍼 조정 |
5.2 디버깅 체크리스트
1. 로그 확인
- API 서버: request_id 추적
- vLLM: batch 상태, 큐 깊이
- ES: slow query 로그
2. 메트릭 확인
- GPU 사용률/메모리
- 단계별 지연 시간
- 에러율 추이
3. 프로파일링
- py-spy (CPU)
- nvidia-smi (GPU)
- ES profile API
문서 버전: 1.0
최종 수정: 2025-01