콘텐츠로 이동

LLM 서빙 파이프라인 상세

본 문서는 LLM 서비스의 요청 처리 파이프라인을 단계별로 분석하고, 각 단계의 최적화 방법을 다룬다.


1. 파이프라인 개요

1.1 요청 처리 흐름도

                              [Client Request]
                                     |
                                     v
+-----------------------------------------------------------------------------------+
|  [1. Ingress Layer]                                                               |
|  +-------------+    +---------------+    +------------------+                     |
|  | Rate Limit  | -> | Auth/Session  | -> | Request Validate |                     |
|  +-------------+    +---------------+    +------------------+                     |
+-----------------------------------------------------------------------------------+
                                     |
                                     v
+-----------------------------------------------------------------------------------+
|  [2. Cache Layer]                                                                 |
|  +---------------+                                                                |
|  | Redis Lookup  | --cache hit--> [Return Cached Response]                        |
|  +---------------+                                                                |
|        |                                                                          |
|    cache miss                                                                     |
+-----------------------------------------------------------------------------------+
                                     |
                                     v
+-----------------------------------------------------------------------------------+
|  [3. Retrieval Layer]                                                             |
|  +--------------+    +--------------+    +---------------+    +---------------+   |
|  | Query Rewrite| -> | Embedding    | -> | Vector Search | -> | Reranking     |   |
|  +--------------+    | (Triton)     |    | (ES/Milvus)   |    | (Triton)      |   |
|                      +--------------+    +---------------+    +---------------+   |
+-----------------------------------------------------------------------------------+
                                     |
                                     v
+-----------------------------------------------------------------------------------+
|  [4. Prompt Layer]                                                                |
|  +----------------+    +------------------+    +------------------+               |
|  | Context Window | -> | Template Render  | -> | Token Counting   |               |
|  | Management     |    | (Jinja2)         |    | & Truncation     |               |
|  +----------------+    +------------------+    +------------------+               |
+-----------------------------------------------------------------------------------+
                                     |
                                     v
+-----------------------------------------------------------------------------------+
|  [5. Inference Layer]                                                             |
|  +---------------+    +---------------+    +---------------+                      |
|  | Request Queue | -> | vLLM Engine   | -> | Token Stream  |                      |
|  +---------------+    +---------------+    +---------------+                      |
+-----------------------------------------------------------------------------------+
                                     |
                                     v
+-----------------------------------------------------------------------------------+
|  [6. Post-processing Layer]                                                       |
|  +---------------+    +---------------+    +---------------+                      |
|  | Content Filter| -> | Format/Parse  | -> | Cache Update  |                      |
|  +---------------+    +---------------+    +---------------+                      |
+-----------------------------------------------------------------------------------+
                                     |
                                     v
                              [Client Response]

1.2 단계별 지연 시간 분포 (예시)

단계 평균 지연 P95 지연 비율
Ingress 2ms 5ms 1%
Cache Lookup 1ms 3ms 0.5%
Retrieval 50ms 120ms 15%
Prompt Construction 5ms 15ms 2%
Inference 800ms 2000ms 80%
Post-processing 10ms 30ms 1.5%
Total ~870ms ~2200ms 100%

2. 단계별 상세

2.1 Ingress Layer

Rate Limiting

import redis
from fastapi import HTTPException
from datetime import datetime

class RateLimiter:
    """
    Sliding Window Rate Limiter using Redis Sorted Set.
    """

    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.window_size = 60  # seconds
        self.max_requests = 100  # per window

    async def check(self, user_id: str) -> bool:
        key = f"ratelimit:{user_id}"
        now = datetime.now().timestamp()
        window_start = now - self.window_size

        pipe = self.redis.pipeline()

        # Remove old entries
        pipe.zremrangebyscore(key, 0, window_start)

        # Count current window
        pipe.zcard(key)

        # Add current request
        pipe.zadd(key, {str(now): now})

        # Set expiry
        pipe.expire(key, self.window_size)

        results = pipe.execute()
        current_count = results[1]

        if current_count >= self.max_requests:
            raise HTTPException(
                status_code=429,
                detail={
                    "error": "rate_limit_exceeded",
                    "retry_after": self.window_size,
                    "limit": self.max_requests
                }
            )

        return True

Request Validation

from pydantic import BaseModel, Field, validator
from typing import Optional

class ChatRequest(BaseModel):
    """Chat completion request schema."""

    messages: list[dict] = Field(..., min_items=1, max_items=100)
    model: str = Field(default="llama-3.1-8b-instruct")
    max_tokens: int = Field(default=1024, ge=1, le=4096)
    temperature: float = Field(default=0.7, ge=0.0, le=2.0)
    stream: bool = Field(default=True)

    # RAG options
    use_retrieval: bool = Field(default=True)
    retrieval_top_k: int = Field(default=5, ge=1, le=20)

    @validator('messages')
    def validate_messages(cls, v):
        for msg in v:
            if msg.get('role') not in ['system', 'user', 'assistant']:
                raise ValueError(f"Invalid role: {msg.get('role')}")
            if not msg.get('content'):
                raise ValueError("Message content cannot be empty")
        return v

    @validator('model')
    def validate_model(cls, v):
        allowed_models = [
            'llama-3.1-8b-instruct',
            'llama-3.1-70b-instruct',
            'qwen2.5-7b-instruct'
        ]
        if v not in allowed_models:
            raise ValueError(f"Model not available: {v}")
        return v

2.2 Cache Layer

캐시 키 생성

import hashlib
import json
from typing import Any

def generate_cache_key(
    messages: list[dict],
    model: str,
    temperature: float,
    context_docs: list[str] = None
) -> str:
    """
    캐시 키 생성.
    temperature > 0인 경우 캐시하지 않음 (비결정적 출력).
    """
    if temperature > 0:
        return None  # Non-deterministic, skip cache

    # Normalize and hash inputs
    cache_input = {
        "messages": messages,
        "model": model,
        "context": sorted(context_docs) if context_docs else []
    }

    serialized = json.dumps(cache_input, sort_keys=True, ensure_ascii=False)
    hash_value = hashlib.sha256(serialized.encode()).hexdigest()[:16]

    return f"chat:response:{hash_value}"


class ResponseCache:
    """Redis-based response cache with TTL."""

    def __init__(self, redis_client, default_ttl: int = 3600):
        self.redis = redis_client
        self.default_ttl = default_ttl

    async def get(self, key: str) -> Optional[dict]:
        if not key:
            return None

        cached = await self.redis.get(key)
        if cached:
            return json.loads(cached)
        return None

    async def set(self, key: str, value: dict, ttl: int = None):
        if not key:
            return

        await self.redis.setex(
            key,
            ttl or self.default_ttl,
            json.dumps(value, ensure_ascii=False)
        )

캐시 전략

케이스 캐시 여부 TTL 비고
temperature = 0 O 24h 결정적 출력
temperature > 0 X - 비결정적
동일 질문 + 동일 컨텍스트 O 1h 세션 내 반복
자주 묻는 질문 (FAQ) O 7d 수동 관리

2.3 Retrieval Layer

Query Rewriting

class QueryRewriter:
    """
    대화 컨텍스트를 고려한 쿼리 재작성.
    """

    def __init__(self, llm_client):
        self.llm = llm_client
        self.prompt_template = """
이전 대화:
{conversation_history}

현재 질문: {current_query}

위 대화 맥락을 고려하여, 검색에 적합한 독립적인 질문으로 재작성하세요.
대명사(그것, 이것 등)를 구체적인 명사로 바꾸세요.

재작성된 질문:"""

    async def rewrite(
        self,
        current_query: str,
        conversation_history: list[dict]
    ) -> str:
        # 대화가 짧으면 재작성 불필요
        if len(conversation_history) <= 1:
            return current_query

        history_text = "\n".join([
            f"{msg['role']}: {msg['content']}"
            for msg in conversation_history[-4:]  # 최근 4턴
        ])

        prompt = self.prompt_template.format(
            conversation_history=history_text,
            current_query=current_query
        )

        response = await self.llm.generate(
            prompt,
            max_tokens=100,
            temperature=0
        )

        return response.strip()
import tritonclient.grpc as grpcclient
import numpy as np
from elasticsearch import AsyncElasticsearch

class EmbeddingService:
    """Triton-based embedding service."""

    def __init__(self, triton_url: str, model_name: str = "embedding"):
        self.client = grpcclient.InferenceServerClient(triton_url)
        self.model_name = model_name
        self.tokenizer = AutoTokenizer.from_pretrained("BAAI/bge-m3")

    async def embed(self, texts: list[str]) -> np.ndarray:
        # Tokenize
        encoded = self.tokenizer(
            texts,
            padding=True,
            truncation=True,
            max_length=512,
            return_tensors="np"
        )

        # Prepare inputs
        inputs = [
            grpcclient.InferInput("input_ids", encoded["input_ids"].shape, "INT64"),
            grpcclient.InferInput("attention_mask", encoded["attention_mask"].shape, "INT64")
        ]
        inputs[0].set_data_from_numpy(encoded["input_ids"])
        inputs[1].set_data_from_numpy(encoded["attention_mask"])

        # Inference
        outputs = [grpcclient.InferRequestedOutput("embeddings")]
        response = self.client.infer(self.model_name, inputs, outputs=outputs)

        embeddings = response.as_numpy("embeddings")

        # Normalize
        norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
        return embeddings / norms


class VectorSearchService:
    """Elasticsearch-based vector search."""

    def __init__(self, es_client: AsyncElasticsearch, index_name: str):
        self.es = es_client
        self.index = index_name

    async def search(
        self,
        query_vector: np.ndarray,
        top_k: int = 10,
        filters: dict = None
    ) -> list[dict]:
        query = {
            "knn": {
                "field": "embedding",
                "query_vector": query_vector.tolist(),
                "k": top_k,
                "num_candidates": top_k * 10
            }
        }

        if filters:
            query["knn"]["filter"] = filters

        response = await self.es.search(
            index=self.index,
            body=query,
            size=top_k,
            _source=["content", "metadata", "title"]
        )

        results = []
        for hit in response["hits"]["hits"]:
            results.append({
                "id": hit["_id"],
                "score": hit["_score"],
                "content": hit["_source"]["content"],
                "metadata": hit["_source"].get("metadata", {}),
                "title": hit["_source"].get("title", "")
            })

        return results

Reranking

class RerankerService:
    """Cross-encoder based reranker using Triton."""

    def __init__(self, triton_url: str, model_name: str = "reranker"):
        self.client = grpcclient.InferenceServerClient(triton_url)
        self.model_name = model_name
        self.tokenizer = AutoTokenizer.from_pretrained("BAAI/bge-reranker-v2-m3")

    async def rerank(
        self,
        query: str,
        documents: list[dict],
        top_k: int = 5
    ) -> list[dict]:
        if not documents:
            return []

        # Prepare query-document pairs
        pairs = [(query, doc["content"]) for doc in documents]

        # Tokenize
        encoded = self.tokenizer(
            pairs,
            padding=True,
            truncation=True,
            max_length=512,
            return_tensors="np"
        )

        # Inference
        inputs = [
            grpcclient.InferInput("input_ids", encoded["input_ids"].shape, "INT64"),
            grpcclient.InferInput("attention_mask", encoded["attention_mask"].shape, "INT64")
        ]
        inputs[0].set_data_from_numpy(encoded["input_ids"])
        inputs[1].set_data_from_numpy(encoded["attention_mask"])

        outputs = [grpcclient.InferRequestedOutput("scores")]
        response = self.client.infer(self.model_name, inputs, outputs=outputs)

        scores = response.as_numpy("scores").flatten()

        # Sort by score
        scored_docs = list(zip(documents, scores))
        scored_docs.sort(key=lambda x: x[1], reverse=True)

        # Return top-k with scores
        results = []
        for doc, score in scored_docs[:top_k]:
            doc["rerank_score"] = float(score)
            results.append(doc)

        return results

2.4 Prompt Layer

Context Window Management

import tiktoken

class ContextWindowManager:
    """
    컨텍스트 윈도우 관리.
    모델의 최대 토큰 수를 초과하지 않도록 관리.
    """

    def __init__(self, model_name: str, max_context: int = 8192):
        self.max_context = max_context
        self.encoder = tiktoken.get_encoding("cl100k_base")

        # Reserve tokens for output
        self.output_reserve = 1024
        self.available_context = max_context - self.output_reserve

    def count_tokens(self, text: str) -> int:
        return len(self.encoder.encode(text))

    def fit_context(
        self,
        system_prompt: str,
        messages: list[dict],
        retrieved_docs: list[dict],
        max_docs: int = 5
    ) -> tuple[str, list[dict], list[dict]]:
        """
        컨텍스트 윈도우에 맞게 내용 조정.
        우선순위: system > recent messages > retrieved docs
        """
        # 1. System prompt (필수)
        system_tokens = self.count_tokens(system_prompt)
        remaining = self.available_context - system_tokens

        # 2. Messages (최근 것부터, 최소 현재 질문은 포함)
        fitted_messages = []
        for msg in reversed(messages):
            msg_tokens = self.count_tokens(msg["content"])
            if msg_tokens <= remaining:
                fitted_messages.insert(0, msg)
                remaining -= msg_tokens
            else:
                break

        # 3. Retrieved docs (남은 공간에 맞게)
        fitted_docs = []
        for doc in retrieved_docs[:max_docs]:
            doc_tokens = self.count_tokens(doc["content"])
            if doc_tokens <= remaining:
                fitted_docs.append(doc)
                remaining -= doc_tokens
            else:
                # 문서가 너무 길면 잘라서 포함
                truncated = self._truncate_to_tokens(doc["content"], remaining - 50)
                if truncated:
                    doc["content"] = truncated
                    fitted_docs.append(doc)
                break

        return system_prompt, fitted_messages, fitted_docs

    def _truncate_to_tokens(self, text: str, max_tokens: int) -> str:
        tokens = self.encoder.encode(text)
        if len(tokens) <= max_tokens:
            return text
        return self.encoder.decode(tokens[:max_tokens])

Prompt Template

from jinja2 import Template

class PromptBuilder:
    """RAG 프롬프트 생성기."""

    SYSTEM_TEMPLATE = """당신은 빈집 정보를 제공하는 AI 어시스턴트입니다.
아래 참고 자료를 바탕으로 질문에 정확하게 답변하세요.

규칙:
- 참고 자료에 없는 내용은 추측하지 마세요
- 확실하지 않은 경우 "확인이 필요합니다"라고 답변하세요
- 답변은 간결하고 명확하게 작성하세요

{% if retrieved_docs %}
## 참고 자료

{% for doc in retrieved_docs %}
### [{{ loop.index }}] {{ doc.title | default("문서") }}
{{ doc.content }}
{% if doc.metadata.source %}
출처: {{ doc.metadata.source }}
{% endif %}

{% endfor %}
{% endif %}
"""

    def __init__(self):
        self.system_template = Template(self.SYSTEM_TEMPLATE)

    def build(
        self,
        messages: list[dict],
        retrieved_docs: list[dict] = None
    ) -> list[dict]:
        """OpenAI 형식의 메시지 리스트 생성."""

        # System message with context
        system_content = self.system_template.render(
            retrieved_docs=retrieved_docs or []
        )

        result = [{"role": "system", "content": system_content}]

        # Add conversation messages
        for msg in messages:
            result.append({
                "role": msg["role"],
                "content": msg["content"]
            })

        return result

2.5 Inference Layer

vLLM Integration

from openai import AsyncOpenAI
from typing import AsyncGenerator

class VLLMClient:
    """vLLM OpenAI-compatible client."""

    def __init__(self, base_url: str):
        self.client = AsyncOpenAI(
            base_url=base_url,
            api_key="dummy"  # vLLM doesn't require key
        )

    async def generate(
        self,
        messages: list[dict],
        model: str = "llama-3.1-8b-instruct",
        max_tokens: int = 1024,
        temperature: float = 0.7,
        stream: bool = True
    ) -> AsyncGenerator[str, None]:
        """스트리밍 생성."""

        response = await self.client.chat.completions.create(
            model=model,
            messages=messages,
            max_tokens=max_tokens,
            temperature=temperature,
            stream=stream,
            # vLLM specific options
            extra_body={
                "use_beam_search": False,
                "top_k": 50,
                "repetition_penalty": 1.1
            }
        )

        if stream:
            async for chunk in response:
                if chunk.choices[0].delta.content:
                    yield chunk.choices[0].delta.content
        else:
            yield response.choices[0].message.content

    async def generate_batch(
        self,
        batch_messages: list[list[dict]],
        **kwargs
    ) -> list[str]:
        """배치 생성 (비스트리밍)."""

        tasks = [
            self.client.chat.completions.create(
                messages=messages,
                stream=False,
                **kwargs
            )
            for messages in batch_messages
        ]

        responses = await asyncio.gather(*tasks)
        return [r.choices[0].message.content for r in responses]

Request Queue Management

import asyncio
from dataclasses import dataclass, field
from typing import Any
import time

@dataclass(order=True)
class PrioritizedRequest:
    """우선순위 기반 요청."""
    priority: int
    timestamp: float = field(compare=False)
    request_id: str = field(compare=False)
    data: Any = field(compare=False)

class RequestQueue:
    """
    우선순위 큐 기반 요청 관리.
    - Priority 0: 실시간 채팅
    - Priority 1: 배치 작업
    - Priority 2: 백그라운드 처리
    """

    def __init__(self, max_size: int = 1000):
        self.queue = asyncio.PriorityQueue(maxsize=max_size)
        self.metrics = {
            "enqueued": 0,
            "dequeued": 0,
            "dropped": 0
        }

    async def enqueue(
        self,
        request: dict,
        priority: int = 0,
        timeout: float = 30.0
    ) -> str:
        """요청을 큐에 추가."""
        request_id = f"{time.time():.6f}"

        item = PrioritizedRequest(
            priority=priority,
            timestamp=time.time(),
            request_id=request_id,
            data=request
        )

        try:
            await asyncio.wait_for(
                self.queue.put(item),
                timeout=timeout
            )
            self.metrics["enqueued"] += 1
            return request_id
        except asyncio.TimeoutError:
            self.metrics["dropped"] += 1
            raise Exception("Queue full, request dropped")

    async def dequeue(self) -> PrioritizedRequest:
        """큐에서 요청 가져오기."""
        item = await self.queue.get()
        self.metrics["dequeued"] += 1
        return item

    def size(self) -> int:
        return self.queue.qsize()

2.6 Post-processing Layer

Content Filtering

import re
from typing import Optional

class ContentFilter:
    """응답 필터링 및 검증."""

    def __init__(self):
        # 민감 정보 패턴
        self.pii_patterns = [
            r'\b\d{6}-?\d{7}\b',  # 주민등록번호
            r'\b\d{3}-\d{4}-\d{4}\b',  # 전화번호
            r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',  # 이메일
        ]

        # 금지 패턴
        self.blocked_patterns = [
            r'(?i)(폭발물|마약|무기)\s*(제조|만들)',
        ]

    def filter(self, text: str) -> tuple[str, list[str]]:
        """
        텍스트 필터링.
        Returns: (필터링된 텍스트, 감지된 이슈 목록)
        """
        issues = []
        filtered = text

        # PII 마스킹
        for pattern in self.pii_patterns:
            if re.search(pattern, filtered):
                issues.append(f"PII detected: {pattern}")
                filtered = re.sub(pattern, "[MASKED]", filtered)

        # 금지 콘텐츠 검사
        for pattern in self.blocked_patterns:
            if re.search(pattern, filtered):
                issues.append(f"Blocked content: {pattern}")
                return "[응답이 필터링되었습니다]", issues

        return filtered, issues


class ResponseFormatter:
    """응답 포맷팅."""

    @staticmethod
    def format_sources(docs: list[dict]) -> str:
        """참고 자료 출처 포맷팅."""
        if not docs:
            return ""

        sources = []
        for i, doc in enumerate(docs, 1):
            source = doc.get("metadata", {}).get("source", "Unknown")
            title = doc.get("title", f"문서 {i}")
            sources.append(f"[{i}] {title} ({source})")

        return "\n\n---\n참고 자료:\n" + "\n".join(sources)

    @staticmethod
    def format_streaming_chunk(
        content: str,
        is_first: bool = False,
        is_last: bool = False,
        metadata: dict = None
    ) -> dict:
        """SSE 스트리밍 청크 포맷."""
        chunk = {
            "type": "content",
            "content": content
        }

        if is_first:
            chunk["type"] = "start"

        if is_last:
            chunk["type"] = "end"
            if metadata:
                chunk["metadata"] = metadata

        return chunk

3. 전체 파이프라인 통합

3.1 통합 핸들러

class ChatHandler:
    """통합 채팅 핸들러."""

    def __init__(
        self,
        rate_limiter: RateLimiter,
        cache: ResponseCache,
        query_rewriter: QueryRewriter,
        embedding_service: EmbeddingService,
        vector_search: VectorSearchService,
        reranker: RerankerService,
        context_manager: ContextWindowManager,
        prompt_builder: PromptBuilder,
        vllm_client: VLLMClient,
        content_filter: ContentFilter
    ):
        self.rate_limiter = rate_limiter
        self.cache = cache
        self.query_rewriter = query_rewriter
        self.embedding = embedding_service
        self.search = vector_search
        self.reranker = reranker
        self.context = context_manager
        self.prompt = prompt_builder
        self.llm = vllm_client
        self.filter = content_filter

    async def handle(
        self,
        request: ChatRequest,
        user_id: str
    ) -> AsyncGenerator[dict, None]:
        """요청 처리 메인 파이프라인."""

        start_time = time.time()
        metrics = {"stages": {}}

        # 1. Rate Limiting
        await self.rate_limiter.check(user_id)
        metrics["stages"]["rate_limit"] = time.time() - start_time

        # 2. Cache Check
        cache_key = generate_cache_key(
            request.messages,
            request.model,
            request.temperature
        )
        cached = await self.cache.get(cache_key)
        if cached:
            yield {"type": "cached", "content": cached["content"]}
            return
        metrics["stages"]["cache_check"] = time.time() - start_time

        # 3. Retrieval (if enabled)
        retrieved_docs = []
        if request.use_retrieval:
            # 3a. Query Rewrite
            current_query = request.messages[-1]["content"]
            rewritten_query = await self.query_rewriter.rewrite(
                current_query,
                request.messages
            )

            # 3b. Embedding
            query_embedding = await self.embedding.embed([rewritten_query])

            # 3c. Vector Search
            candidates = await self.search.search(
                query_embedding[0],
                top_k=request.retrieval_top_k * 2
            )

            # 3d. Reranking
            retrieved_docs = await self.reranker.rerank(
                rewritten_query,
                candidates,
                top_k=request.retrieval_top_k
            )
        metrics["stages"]["retrieval"] = time.time() - start_time

        # 4. Prompt Construction
        system, messages, docs = self.context.fit_context(
            self.prompt.SYSTEM_TEMPLATE,
            request.messages,
            retrieved_docs
        )
        final_messages = self.prompt.build(messages, docs)
        metrics["stages"]["prompt"] = time.time() - start_time

        # 5. Inference (Streaming)
        yield {"type": "start", "metadata": {"model": request.model}}

        full_response = []
        async for chunk in self.llm.generate(
            final_messages,
            model=request.model,
            max_tokens=request.max_tokens,
            temperature=request.temperature,
            stream=True
        ):
            # 6. Content Filtering (per chunk)
            filtered_chunk, issues = self.filter.filter(chunk)
            full_response.append(filtered_chunk)

            yield {"type": "content", "content": filtered_chunk}

        # Final processing
        final_content = "".join(full_response)
        metrics["stages"]["inference"] = time.time() - start_time

        # Cache update
        if cache_key:
            await self.cache.set(cache_key, {"content": final_content})

        # Add sources
        sources = ResponseFormatter.format_sources(retrieved_docs)
        if sources:
            yield {"type": "sources", "content": sources}

        metrics["total_time"] = time.time() - start_time
        yield {"type": "end", "metadata": metrics}

3.2 FastAPI 엔드포인트

from fastapi import FastAPI, Request, Depends
from fastapi.responses import StreamingResponse
import json

app = FastAPI()

@app.post("/v1/chat/completions")
async def chat_completions(
    request: ChatRequest,
    user_id: str = Depends(get_current_user),
    handler: ChatHandler = Depends(get_chat_handler)
):
    """OpenAI 호환 Chat Completion 엔드포인트."""

    if request.stream:
        return StreamingResponse(
            stream_response(handler, request, user_id),
            media_type="text/event-stream"
        )
    else:
        return await non_stream_response(handler, request, user_id)


async def stream_response(handler, request, user_id):
    """SSE 스트리밍 응답 생성."""
    async for chunk in handler.handle(request, user_id):
        data = json.dumps(chunk, ensure_ascii=False)
        yield f"data: {data}\n\n"
    yield "data: [DONE]\n\n"

4. 최적화 포인트

4.1 단계별 최적화

단계 최적화 방법 기대 효과
Ingress Connection pooling, Keep-alive 연결 오버헤드 감소
Cache 캐시 예열, 압축 Hit rate 향상, 메모리 절약
Retrieval 인덱스 최적화, 배치 임베딩 검색 지연 50% 감소
Prompt 토큰 캐싱, 템플릿 사전 컴파일 구성 시간 단축
Inference Continuous batching, Speculative decoding 처리량 2-3배 향상
Post 비동기 로깅, 병렬 필터링 응답 시간 개선

4.2 병목 식별 및 해결

병목 식별 프로세스:

1. 프로파일링
   |
   v
2. 단계별 지연 측정
   |
   v
3. 병목 지점 식별
   |
   +-- Inference (80%+) --> GPU 업그레이드, 모델 최적화
   |
   +-- Retrieval (15%+) --> 인덱스 튜닝, 캐싱 강화
   |
   +-- Network (5%+) --> 위치 최적화, 압축

4.3 Inference 최적화 상세

# vLLM 최적화 설정
VLLM_OPTIMIZATION = {
    # Continuous Batching
    "max_num_seqs": 256,
    "max_num_batched_tokens": 32768,

    # Memory Optimization
    "gpu_memory_utilization": 0.9,
    "swap_space": 4,  # GB

    # KV Cache Optimization
    "block_size": 16,
    "enable_prefix_caching": True,

    # Quantization
    "quantization": "awq",  # or "gptq", "squeezellm"
    "dtype": "float16",

    # Speculative Decoding (draft model)
    "speculative_model": "draft-model-path",
    "num_speculative_tokens": 5,
}

4.4 성능 벤치마크 기준

메트릭 목표값 측정 방법
TTFT (Time to First Token) < 500ms P95 기준
TPOT (Time per Output Token) < 50ms 평균
Throughput > 100 req/min 동시 10 사용자
Cache Hit Rate > 30% 일일 평균
Error Rate < 0.1% 5xx 응답 비율

5. 트러블슈팅 가이드

5.1 일반적인 문제

증상 원인 해결
TTFT 급증 KV Cache 부족 GPU 메모리 확보, 배치 크기 조절
OOM 에러 긴 컨텍스트 max_model_len 제한, 청킹
검색 품질 저하 임베딩 불일치 동일 모델 사용, 재인덱싱
스트리밍 끊김 네트워크 타임아웃 Keep-alive, 버퍼 조정

5.2 디버깅 체크리스트

1. 로그 확인
   - API 서버: request_id 추적
   - vLLM: batch 상태, 큐 깊이
   - ES: slow query 로그

2. 메트릭 확인
   - GPU 사용률/메모리
   - 단계별 지연 시간
   - 에러율 추이

3. 프로파일링
   - py-spy (CPU)
   - nvidia-smi (GPU)
   - ES profile API

문서 버전: 1.0 최종 수정: 2025-01