惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

Engineering at Meta
Engineering at Meta
博客园_首页
H
Help Net Security
WordPress大学
WordPress大学
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
罗磊的独立博客
博客园 - 三生石上(FineUI控件)
B
Blog
I
InfoQ
SecWiki News
SecWiki News
T
Tailwind CSS Blog
Spread Privacy
Spread Privacy
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
V
Vulnerabilities – Threatpost
N
Netflix TechBlog - Medium
P
Palo Alto Networks Blog
CTFtime.org: upcoming CTF events
CTFtime.org: upcoming CTF events
Vercel News
Vercel News
Threat Intelligence Blog | Flashpoint
Threat Intelligence Blog | Flashpoint
K
Kaspersky official blog
M
MIT News - Artificial intelligence
S
Schneier on Security
T
Threat Research - Cisco Blogs
F
Fortinet All Blogs
Cyberwarzone
Cyberwarzone
Scott Helme
Scott Helme
aimingoo的专栏
aimingoo的专栏
Martin Fowler
Martin Fowler
MyScale Blog
MyScale Blog
The Cloudflare Blog
Recent Announcements
Recent Announcements
Security Latest
Security Latest
G
GRAHAM CLULEY
IT之家
IT之家
Y
Y Combinator Blog
The Last Watchdog
The Last Watchdog
腾讯CDC
Google DeepMind News
Google DeepMind News
V
V2EX
S
Securelist
TaoSecurity Blog
TaoSecurity Blog
B
Blog RSS Feed
S
SegmentFault 最新的问题
博客园 - 叶小钗
P
Proofpoint News Feed
云风的 BLOG
云风的 BLOG
Project Zero
Project Zero
G
Google Developers Blog
Google DeepMind News
Google DeepMind News
F
Full Disclosure

Wslll

手搓一个ios记账应用:快捷指令、PWA应用、AI分析 一种针对长篇幅学术文章的特征提取循环翻译模式 开源分享:经过运行验证的自动SNP Calling脚本 使用glnexus进行joint call的一些经验 wslll blog:基于Python的自托管博客应用 wslll blog的介绍以及2025年不稳定运行的原因 使用SimpleM计算有效检验数量 期刊发布:基于重测序的原始群体和改良群体的遗传多样性研究 使用cloudflare worker转发openai api并设置关键词屏蔽 期刊发布:遗传改良对世界水产养殖业发展的推动作用 你好,世界!
一个AnthropicToOpenai的本地运行的API转换脚本(用于ClaudeCode等)
wslll · 2026-01-12 · via Wslll

前言

近期各类国产模型的 Coding 能力感觉都是突飞猛涨,因此我也是不断在测试、体验不同模型。**一般对于新的模型我都会用 NewAPI 统一管理。**但是在使用 claude code 调用时,因为有些模型默认是 OpenAI 格式的调用方式,因为懒得在 NewAPI 中修改设置,于是弄了一个下面这样的脚本。只需要在服务器本地后台运行,即可自动转换格式。

基于 Python 开发,支持调用历史记录,默认将 Anthropic 格式转换为 OpenAI 格式,同时支持基本的并发限制设置。

使用方式很简单:
python main.py --baseurl https://your_newapi_url --port 8080 --limit 30
这里就是说,自己可以设置调用地址,本地监听端口以及并发限制。然后运行即可。

代码

import uvicorn
import httpx
import json
import uuid
import asyncio
import argparse
import shutil
import os
import time
from datetime import datetime, timedelta
from pathlib import Path
from fastapi import FastAPI, Request, BackgroundTasks
from fastapi.responses import StreamingResponse, JSONResponse
from typing import List, Dict, Any
# ================= 命令行参数解析 =================
parser = argparse.ArgumentParser(description="Anthropic to OpenAI API Proxy with Logging")
parser.add_argument("--baseurl", type=str, required=True, help="Target OpenAI Base URL (e.g., https://api3.wlai.vip)")
parser.add_argument("--port", type=int, default=49091, help="Listen port")
parser.add_argument("--limit", type=int, default=10, help="Concurrency limit")
# 解析参数(如果脚本作为模块导入不执行)
if __name__ == "__main__":
args = parser.parse_args()
else:
# 兼容 uvicorn 命令行启动,设置默认值
args = argparse.Namespace(baseurl="https://api3.wlai.vip", port=49091, limit=10)
# ================= 全局配置 =================
# 处理 Base URL,确保拼接正确
BASE_URL = args.baseurl.rstrip("/")
if not BASE_URL.endswith("/v1/chat/completions"):
if BASE_URL.endswith("/v1"):
TARGET_URL = f"{BASE_URL}/chat/completions"
else:
TARGET_URL = f"{BASE_URL}/v1/chat/completions"
else:
TARGET_URL = BASE_URL
LISTEN_PORT = args.port
MAX_CONCURRENT = args.limit
LOG_RETENTION_DAYS = 7
LOG_BASE_DIR = Path("logs")
# 信号量
request_semaphore = asyncio.Semaphore(MAX_CONCURRENT)
app = FastAPI()
# ================= 日志与清理模块 =================
def cleanup_old_logs():
"""清理超过 7 天的日志目录"""
if not LOG_BASE_DIR.exists():
return

cutoff_date = datetime.now() - timedelta(days=LOG_RETENTION_DAYS)

# 遍历 logs 下的日期目录 (格式 YYYY-MM-DD)
for date_dir in LOG_BASE_DIR.iterdir():
if date_dir.is_dir():
try:
dir_date = datetime.strptime(date_dir.name, "%Y-%m-%d")
if dir_date < cutoff_date:
print(f"🧹 Cleaning up old logs: {date_dir}")
shutil.rmtree(date_dir)
except ValueError:
continue # 忽略不符合日期格式的目录
def save_request_log(request_id: str, request_data: dict, response_status: int, error_msg: str = None):
"""保存日志到文件系统"""
try:
now = datetime.now()
# 目录结构: logs/2023-10-27/14/
date_str = now.strftime("%Y-%m-%d")
hour_str = now.strftime("%H")

log_dir = LOG_BASE_DIR / date_str / hour_str
log_dir.mkdir(parents=True, exist_ok=True)

log_file = log_dir / f"{request_id}.json"

log_content = {
"timestamp": now.isoformat(),
"request_id": request_id,
"target_url": TARGET_URL,
"status_code": response_status,
"error": error_msg,
"request": {
"model": request_data.get("model"),
"max_tokens": request_data.get("max_tokens"),
"temperature": request_data.get("temperature"),
# 为了日志整洁,message 内容可以考虑截断,这里选择完整记录
"messages_count": len(request_data.get("messages", [])),
"full_body": request_data
}
}

with open(log_file, "w", encoding="utf-8") as f:
json.dump(log_content, f, ensure_ascii=False, indent=2)

except Exception as e:
print(f"⚠️ Failed to write log: {e}")
# ================= API 逻辑 =================
def convert_to_openai_messages(anthropic_body: Dict[str, Any]) -> List[Dict[str, str]]:
messages = []
if "system" in anthropic_body and anthropic_body["system"]:
system_content = anthropic_body["system"]
if isinstance(system_content, list):
text_parts = [item["text"] for item in system_content if item.get("type") == "text"]
system_text = "\n".join(text_parts)
else:
system_text = str(system_content)
messages.append({"role": "system", "content": system_text})

for msg in anthropic_body.get("messages", []):
role = msg["role"]
content = msg["content"]
final_content = ""
if isinstance(content, str):
final_content = content
elif isinstance(content, list):
text_parts = []
for item in content:
if item.get("type") == "text":
text_parts.append(item["text"])
# 简单处理图片:Claude code 一般只发文本,如果发图片需要更复杂的转换
final_content = "\n".join(text_parts)
messages.append({"role": role, "content": final_content})
return messages
def create_event(event_type: str, data: Dict[str, Any]) -> str:
return f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
@app.on_event("startup")
async def startup_event():
# 启动时清理日志
cleanup_old_logs()
@app.post("/v1/messages")
async def proxy_messages(request: Request, background_tasks: BackgroundTasks):
request_id = f"req_{uuid.uuid4().hex[:8]}"
anthropic_body = {}

try:
# 1. 获取请求体
anthropic_body = await request.json()

# 2. Header 处理
auth_header = request.headers.get("x-api-key") or request.headers.get("authorization")
if auth_header and not auth_header.startswith("Bearer "):
auth_header = f"Bearer {auth_header}"

headers = {
"Authorization": auth_header,
"Content-Type": "application/json"
}
# 3. 转换为 OpenAI 格式
openai_body = {
"model": anthropic_body.get("model"),
"messages": convert_to_openai_messages(anthropic_body),
"stream": True,
"max_tokens": anthropic_body.get("max_tokens", 4096),
}
if "stop_sequences" in anthropic_body:
openai_body["stop"] = anthropic_body["stop_sequences"]
if "temperature" in anthropic_body:
openai_body["temperature"] = anthropic_body["temperature"]
# 4. 上游请求处理 (带信号量和日志记录)
async def upstream_generator():
log_status = 200
log_error = None

# 记录进入队列的时间
queue_start = time.time()

async with request_semaphore:
# 计算等待时间(可选:如果等待太久可以打日志)
wait_time = time.time() - queue_start

# 重试机制
max_retries = 3
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=120.0) as client:
async with client.stream("POST", TARGET_URL, headers=headers, json=openai_body) as response:

# 429 处理
if response.status_code == 429:
print(f"⚠️ [429] Rate Limit. Retrying... ({attempt+1}/{max_retries})")
await asyncio.sleep(2 + attempt)
continue

# 错误处理
if response.status_code != 200:
log_status = response.status_code
error_content = await response.aread()
error_text = error_content.decode('utf-8')
log_error = error_text
print(f"❌ Error {response.status_code}: {error_text}")

# 记录失败日志
background_tasks.add_task(save_request_log, request_id, anthropic_body, log_status, log_error)

yield create_event("error", {
"type": "error",
"error": {"type": "api_error", "message": f"Upstream: {response.status_code}"}
})
return
# 成功连接
print(f"✅ [{request_id}] Streaming... (Waited: {wait_time:.2f}s)")

# 发送 Anthropic 协议头
yield create_event("message_start", {
"type": "message_start",
"message": {
"id": request_id,
"type": "message",
"role": "assistant",
"content": [],
"model": anthropic_body.get("model"),
"stop_reason": None,
"stop_sequence": None,
"usage": {"input_tokens": 0, "output_tokens": 0}
}
})
yield create_event("content_block_start", {
"type": "content_block_start",
"index": 0,
"content_block": {"type": "text", "text": ""}
})
# 流式转发
async for line in response.aiter_lines():
if not line.strip(): continue
if line.startswith("data: "):
data_str = line[6:]
if data_str.strip() == "[DONE]": continue
try:
chunk = json.loads(data_str)
if chunk.get("choices"):
content = chunk["choices"][0].get("delta", {}).get("content", "")
if content:
yield create_event("content_block_delta", {
"type": "content_block_delta",
"index": 0,
"delta": {"type": "text_delta", "text": content}
})
except:
continue
# 结束流
yield create_event("content_block_stop", {"type": "content_block_stop", "index": 0})
yield create_event("message_delta", {
"type": "message_delta",
"delta": {"stop_reason": "end_turn"},
"usage": {"output_tokens": 0}
})
yield create_event("message_stop", {"type": "message_stop"})

# 记录成功日志
background_tasks.add_task(save_request_log, request_id, anthropic_body, 200, None)
return
except Exception as e:
if attempt == max_retries - 1:
print(f"❌ Stream Exception: {e}")
log_status = 500
log_error = str(e)
background_tasks.add_task(save_request_log, request_id, anthropic_body, 500, str(e))
yield create_event("error", {"type": "error", "error": {"type": "api_error", "message": str(e)}})
await asyncio.sleep(1)
return StreamingResponse(upstream_generator(), media_type="text/event-stream")
except Exception as e:
print(f"Server Error: {e}")
# 记录严重错误日志
background_tasks.add_task(save_request_log, request_id, anthropic_body, 500, str(e))
return JSONResponse(status_code=500, content={"error": {"type": "server_error", "message": str(e)}})
if __name__ == "__main__":
print(f"🚀 Proxy Starting...")
print(f" Target: {TARGET_URL}")
print(f" Listen: 0.0.0.0:{LISTEN_PORT}")
print(f" Limit: {MAX_CONCURRENT} concurrent requests")
print(f" Logging: ./logs/ (7 days retention)")

uvicorn.run(app, host="0.0.0.0", port=LISTEN_PORT, log_level="warning")