Comment sections and user-submitted content are an attack surface. Spam bots, coordinated harassment, phishing links disguised as helpful replies — if you ship a public-facing form or discussion feature, you will encounter all of these within days. Rule-based filters (regex, keyword lists) have ~60-70% precision at best and generate constant maintenance overhead. An LLM-based classifier can handle nuanced toxic content, context-dependent spam, and subtle manipulation that keyword filters miss entirely.
This tutorial builds a complete moderation pipeline in Python: receive a comment, classify it with an LLM, cache repeated inputs, process batches efficiently, and route borderline cases to a human review queue. The same architecture works for form submissions, support tickets, forum posts, and any other user-generated text. For organizations managing content at scale, this pairs well with the broader security controls described in practical security guides.
Architecture overview
User comment
│
▼
Cache lookup (Redis/dict) ──hit──▶ cached decision
│ miss
▼
Batch accumulator (up to 20 items or 500ms)
│
▼
LLM classifier (structured JSON output)
│
├── safe ──────────────▶ publish immediately
├── spam/toxic ────────▶ auto-reject + log
└── borderline (< 0.75)▶ human review queue
Setup
pip install openai redis pydantic python-dotenv
For local development, run Redis:
docker run -d -p 6379:6379 redis:alpine
Data models
# models.py
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
import time
class ContentCategory(str, Enum):
SAFE = "safe"
SPAM = "spam"
TOXIC = "toxic"
PHISHING = "phishing"
BORDERLINE = "borderline"
@dataclass
class ModerationResult:
category: ContentCategory
confidence: float # 0.0 – 1.0
reasoning: str # one sentence
flags: list[str] # e.g. ["contains_url", "promotional_language"]
needs_human_review: bool
processing_time_ms: float
cache_hit: bool = False
@dataclass
class PendingComment:
comment_id: str
text: str
user_id: str
submitted_at: float = field(default_factory=time.time)
context: Optional[str] = None # e.g. article slug or thread title
LLM classifier with structured output
# classifier.py
import json
import hashlib
import time
import logging
from typing import Optional
from openai import OpenAI
from models import ContentCategory, ModerationResult
logger = logging.getLogger(__name__)
llm_client = OpenAI(
api_key="your_api_key",
base_url="https://api.your-llm-provider.com/v1",
)
SYSTEM_PROMPT = """You are a content moderation classifier. Analyze submitted text and return a JSON object with exactly these fields:
- "category": one of "safe", "spam", "toxic", "phishing", "borderline"
- "confidence": float 0.0-1.0 (your certainty in the classification)
- "reasoning": one sentence explaining the decision
- "flags": array of strings identifying specific issues (empty array if safe)
Categories:
- safe: legitimate user content, on-topic discussion, genuine questions
- spam: promotional content, repeated phrases, unsolicited advertising, SEO link drops
- toxic: harassment, hate speech, threats, personal attacks, profanity targeting users
- phishing: credential harvesting, fake login prompts, deceptive links, scam patterns
- borderline: ambiguous content that requires human judgment
Return ONLY the JSON object. No prose."""
def classify_single(text: str, context: Optional[str] = None,
model: str = "gpt-4o-mini") -> dict:
"""Call LLM for a single text. Returns raw parsed dict."""
user_content = f"Text to classify:\n{text}"
if context:
user_content = f"Context: {context}\n\n{user_content}"
response = llm_client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_content},
],
temperature=0.0, # deterministic for classification
max_tokens=200,
response_format={"type": "json_object"}, # forces JSON output
)
raw = response.choices[0].message.content
return json.loads(raw)
def classify_batch(texts: list[dict], model: str = "gpt-4o-mini") -> list[dict]:
"""
Classify multiple texts in a single API call.
texts: [{"id": str, "text": str, "context": Optional[str]}]
Returns results in the same order.
"""
if not texts:
return []
# Build a numbered batch prompt
items_block = "\n\n".join(
f'Item {i+1} (id={item["id"]}):\n{item["text"][:800]}'
+ (f'\nContext: {item["context"]}' if item.get("context") else "")
for i, item in enumerate(texts)
)
batch_prompt = f"""Classify each of the following {len(texts)} items.
Return a JSON array where each element corresponds to one item, in order.
Each element must have: id, category, confidence, reasoning, flags.
{items_block}"""
response = llm_client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT.replace(
"Return ONLY the JSON object.",
"Return ONLY a JSON array of objects, one per item."
)},
{"role": "user", "content": batch_prompt},
],
temperature=0.0,
max_tokens=100 * len(texts),
response_format={"type": "json_object"},
)
raw = response.choices[0].message.content
parsed = json.loads(raw)
# The model may return {"results": [...]} or a bare array
if isinstance(parsed, dict):
for key in ("results", "items", "classifications"):
if key in parsed and isinstance(parsed[key], list):
return parsed[key]
if isinstance(parsed, list):
return parsed
logger.warning("Unexpected batch response structure: %s", raw[:200])
return []
Cost estimation
Before running at scale, understand what you're paying.
# cost.py
# Approximate token counts for moderation
SYSTEM_PROMPT_TOKENS = 180 # fixed per call
TOKENS_PER_COMMENT = 60 # average user comment
OUTPUT_TOKENS = 80 # JSON response
def estimate_cost(num_comments: int,
batch_size: int = 20,
input_price_per_1k: float = 0.00015, # gpt-4o-mini pricing
output_price_per_1k: float = 0.0006) -> dict:
"""
Estimate API cost for moderating num_comments.
Batch calls amortize the system prompt cost.
"""
num_batches = -(-num_comments // batch_size) # ceil division
# Per batch: 1 system prompt + all comments
input_tokens_per_batch = SYSTEM_PROMPT_TOKENS + (TOKENS_PER_COMMENT * batch_size)
output_tokens_per_batch = OUTPUT_TOKENS * batch_size
total_input = input_tokens_per_batch * num_batches
total_output = output_tokens_per_batch * num_batches
cost = (total_input / 1000 * input_price_per_1k +
total_output / 1000 * output_price_per_1k)
return {
"comments": num_comments,
"batches": num_batches,
"total_input_tokens": total_input,
"total_output_tokens": total_output,
"estimated_cost_usd": round(cost, 4),
"cost_per_comment_usd": round(cost / num_comments, 6),
}
# Example
print(estimate_cost(10_000))
# → {'comments': 10000, 'batches': 500, ..., 'estimated_cost_usd': 0.516, ...}
10,000 comments for ~$0.52 with batching. Without batching (single calls), the system prompt overhead alone triples the cost.
Caching layer
Comments are often duplicates — bots submit the same message, users paste the same promotional text. Cache identical inputs permanently; cache near-identical inputs with normalized hashing.
# cache.py
import hashlib
import json
import redis
from typing import Optional
class ModerationCache:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.r = redis.from_url(redis_url, decode_responses=True)
self.ttl_safe = 3600 * 24 * 7 # cache safe results for 7 days
self.ttl_unsafe = 3600 * 24 * 30 # cache rejections for 30 days
def _make_key(self, text: str) -> str:
"""Normalize and hash for cache key."""
normalized = " ".join(text.lower().split()) # collapse whitespace
return "moderation:" + hashlib.sha256(normalized.encode()).hexdigest()
def get(self, text: str) -> Optional[dict]:
key = self._make_key(text)
cached = self.r.get(key)
if cached:
return json.loads(cached)
return None
def set(self, text: str, result: dict):
key = self._make_key(text)
is_safe = result.get("category") == "safe"
ttl = self.ttl_safe if is_safe else self.ttl_unsafe
self.r.setex(key, ttl, json.dumps(result))
def get_many(self, texts: list[str]) -> dict[str, Optional[dict]]:
"""Batch cache lookup."""
keys = [self._make_key(t) for t in texts]
values = self.r.mget(keys)
return {
text: json.loads(val) if val else None
for text, val in zip(texts, values)
}
Human review queue
# review_queue.py
import json
import time
import redis
from dataclasses import asdict
from models import PendingComment, ModerationResult
QUEUE_KEY = "moderation:human_review"
class HumanReviewQueue:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.r = redis.from_url(redis_url, decode_responses=True)
def push(self, comment: PendingComment, result: ModerationResult):
entry = {
"comment": asdict(comment),
"moderation": {
"category": result.category,
"confidence": result.confidence,
"reasoning": result.reasoning,
"flags": result.flags,
},
"queued_at": time.time(),
}
self.r.lpush(QUEUE_KEY, json.dumps(entry))
def pop(self, count: int = 10) -> list[dict]:
"""Fetch next items for human reviewers."""
items = []
for _ in range(count):
raw = self.r.rpop(QUEUE_KEY)
if raw is None:
break
items.append(json.loads(raw))
return items
def queue_length(self) -> int:
return self.r.llen(QUEUE_KEY)
The full pipeline
# pipeline.py
import time
import logging
from typing import Optional
from models import ContentCategory, ModerationResult, PendingComment
from classifier import classify_single, classify_batch
from cache import ModerationCache
from review_queue import HumanReviewQueue
logger = logging.getLogger(__name__)
CONFIDENCE_THRESHOLD = 0.75 # below this → human review
class ModerationPipeline:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.cache = ModerationCache(redis_url)
self.review_queue = HumanReviewQueue(redis_url)
def _parse_result(self, raw: dict, start_time: float,
cache_hit: bool = False) -> ModerationResult:
confidence = float(raw.get("confidence", 0.5))
category_str = raw.get("category", "borderline").lower()
try:
category = ContentCategory(category_str)
except ValueError:
category = ContentCategory.BORDERLINE
needs_review = (
category == ContentCategory.BORDERLINE
or confidence < CONFIDENCE_THRESHOLD
)
return ModerationResult(
category=category,
confidence=confidence,
reasoning=raw.get("reasoning", ""),
flags=raw.get("flags", []),
needs_human_review=needs_review,
processing_time_ms=(time.time() - start_time) * 1000,
cache_hit=cache_hit,
)
def moderate(self, comment: PendingComment) -> ModerationResult:
"""Moderate a single comment with caching."""
start = time.time()
# Cache lookup
cached = self.cache.get(comment.text)
if cached:
result = self._parse_result(cached, start, cache_hit=True)
if result.needs_human_review:
self.review_queue.push(comment, result)
return result
# LLM classification
try:
raw = classify_single(comment.text, context=comment.context)
except Exception as e:
logger.error("LLM classification failed: %s", e)
# Fail safe: send to human review
fallback = ModerationResult(
category=ContentCategory.BORDERLINE,
confidence=0.0,
reasoning=f"Classification failed: {e}",
flags=["llm_error"],
needs_human_review=True,
processing_time_ms=(time.time() - start) * 1000,
)
self.review_queue.push(comment, fallback)
return fallback
self.cache.set(comment.text, raw)
result = self._parse_result(raw, start)
if result.needs_human_review:
self.review_queue.push(comment, result)
return result
def moderate_batch(self, comments: list[PendingComment]) -> list[ModerationResult]:
"""Moderate a batch efficiently, using cache for hits."""
start = time.time()
# Bulk cache lookup
cache_results = self.cache.get_many([c.text for c in comments])
to_classify = []
results: dict[str, ModerationResult] = {}
for comment in comments:
cached = cache_results.get(comment.text)
if cached:
r = self._parse_result(cached, start, cache_hit=True)
results[comment.comment_id] = r
if r.needs_human_review:
self.review_queue.push(comment, r)
else:
to_classify.append(comment)
# Batch LLM call for cache misses
if to_classify:
batch_input = [
{"id": c.comment_id, "text": c.text, "context": c.context}
for c in to_classify
]
try:
raw_results = classify_batch(batch_input)
raw_by_id = {r.get("id", ""): r for r in raw_results}
except Exception as e:
logger.error("Batch classification failed: %s", e)
raw_by_id = {}
for comment in to_classify:
raw = raw_by_id.get(comment.comment_id, {
"category": "borderline",
"confidence": 0.0,
"reasoning": "batch_classification_failed",
"flags": ["llm_error"],
})
self.cache.set(comment.text, raw)
r = self._parse_result(raw, start)
results[comment.comment_id] = r
if r.needs_human_review:
self.review_queue.push(comment, r)
return [results[c.comment_id] for c in comments if c.comment_id in results]
Usage example
# main.py
from pipeline import ModerationPipeline
from models import PendingComment
pipeline = ModerationPipeline()
# Single comment
comment = PendingComment(
comment_id="cmt_001",
text="Great article! Check out my FREE SEO tool at bit.ly/xyz — doubles traffic guaranteed!",
user_id="user_42",
context="Article: Introduction to NIS 2"
)
result = pipeline.moderate(comment)
print(f"Category: {result.category.value}")
print(f"Confidence: {result.confidence:.0%}")
print(f"Flags: {result.flags}")
print(f"Needs review: {result.needs_human_review}")
print(f"Time: {result.processing_time_ms:.0f}ms (cached: {result.cache_hit})")
# Batch processing
batch = [
PendingComment("cmt_002", "This helped me understand the topic, thanks!", "user_1"),
PendingComment("cmt_003", "CLICK HERE NOW!!! WIN €500 AMAZON VOUCHER", "user_2"),
PendingComment("cmt_004", "I disagree with point 3, here's why...", "user_3"),
]
results = pipeline.moderate_batch(batch)
for comment, result in zip(batch, results):
print(f"{comment.comment_id}: {result.category.value} ({result.confidence:.0%})")
Deployment considerations
Action mapping: Define what happens for each category in your application layer, not the pipeline. The pipeline only classifies. Auto-reject spam and phishing. Auto-approve high-confidence safe comments. Hold borderline and low-confidence results.
Review queue throughput: Track your queue_length() over time. If it grows faster than your reviewers clear it, lower the CONFIDENCE_THRESHOLD to 0.80 or add a second LLM pass for borderlines using a stronger model.
Monitoring: Log every classification with its confidence, category, and processing time. Alert when LLM error rate exceeds 2% or when cache hit rate drops below 20% (the latter suggests unusually high content diversity, which may indicate a bot campaign).
False positive rate: Sample 1% of auto-approved comments for human spot-check. If you're seeing more than 1 in 500 false negatives, tighten the confidence threshold or add domain-specific examples to the system prompt.
This pipeline handles tens of thousands of comments per day for under $5 in API costs, with the cache absorbing 30–60% of volume in typical deployments.




















