惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

P
Privacy International News Feed
Martin Fowler
Martin Fowler
D
Docker
Y
Y Combinator Blog
云风的 BLOG
云风的 BLOG
U
Unit 42
T
Tailwind CSS Blog
J
Java Code Geeks
G
Google Developers Blog
MongoDB | Blog
MongoDB | Blog
阮一峰的网络日志
阮一峰的网络日志
WordPress大学
WordPress大学
月光博客
月光博客
大猫的无限游戏
大猫的无限游戏
美团技术团队
F
Fortinet All Blogs
N
News and Events Feed by Topic
Exploit-DB.com RSS Feed
Exploit-DB.com RSS Feed
Hacker News - Newest:
Hacker News - Newest: "LLM"
The GitHub Blog
The GitHub Blog
cs.AI updates on arXiv.org
cs.AI updates on arXiv.org
Recorded Future
Recorded Future
N
Netflix TechBlog - Medium
Google DeepMind News
Google DeepMind News
Hacker News: Ask HN
Hacker News: Ask HN
L
LINUX DO - 最新话题
Microsoft Security Blog
Microsoft Security Blog
N
News and Events Feed by Topic
I
Intezer
TaoSecurity Blog
TaoSecurity Blog
NISL@THU
NISL@THU
小众软件
小众软件
博客园 - 聂微东
博客园 - Franky
有赞技术团队
有赞技术团队
P
Palo Alto Networks Blog
爱范儿
爱范儿
H
Hacker News: Front Page
C
Cyber Attacks, Cyber Crime and Cyber Security
C
Cisco Blogs
P
Proofpoint News Feed
I
InfoQ
Google DeepMind News
Google DeepMind News
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
Vercel News
Vercel News
H
Heimdal Security Blog
C
Cybersecurity and Infrastructure Security Agency CISA
Application and Cybersecurity Blog
Application and Cybersecurity Blog
Threat Intelligence Blog | Flashpoint
Threat Intelligence Blog | Flashpoint
量子位

LINUX DO - 最新话题

谷歌云盘下载700g数据集,求方法 OpenAI推出了100美元的Pro订阅后,plus的Codex 5小时限额大幅缩水 之前买的super grok居然还没掉 关于CPA认证文件周限 佬们,默认CDK的要求是什么等级啊? 最新版本的微信群聊机器人方案 有没有人知道如何free号没有封,那么是否可以循环使用,因为我看主要是周限 L站改版了?吓我一跳,我以为我浏览器崩了 淘宝这种宽带可信吗,500兆移动宽带月费8元到2099年 docker内部应用访问宿主机mysql和redis时被拒绝connection refuse Erp全栈想转行做Ai有什么推荐的吗 boost有bug 佬们,有没有靠谱点的 Plus 购买渠道 大妈,狗妈用的 lg 服务有源头开源项目吗? 有人有能过验证码打码的嘛 上次帖里好像发过通过大模型来打码的 gpt plus 封号似乎也太快了点,一天就给封号了 按流量/token收费的国产官方AI推荐 我算是知道了为什么Oracle总是ABC了 佬友们帮我分析一下 ChatGPT Team账号只有一个人使用和4个席位邀请满了使用的总额度是一样的吗? gpt-free 10个带rt CPA反代claude是默认1m吗? 我终于敢说我做出来windows上tmux的替代了,目标windows/全平台最强的终端Ai编程工具 claude pro升级max,除了原来的$20,好像还能再领一次$100 关于AI agent的知识框架 独乐乐不如众乐乐,分享一下我的的AI对话程序 佬们自建网站支付问题是怎么解决的 怎么能让gpt模仿claude风格输出 codex free已经死了,下一个会是plus或者team吗 请问chatgpt pro里的fast模式,速度快了,降智吗 天才程序员想要复活,还有可用的codex公益站么 里斯本丸沉没照进现代了 [富可敌国] [一叶知秋API]友仔们 我们换域名了~~ 记得更新一下哦 有点莫名其妙,被阿里云警告了 从道观回家之前,我和师兄问道 【picpi 皮皮公益站】为了防止有人拿去卖,邀请码发放规则更新。 美国 FAA: 我们需要你,游戏玩家,来当空管吧 vibe时用文言省tok吗? 有没有用? 会降表现吗? Codex CLI 官方这个 imagegen 的 Skill 到底是干啥的?哪有对应工具啊? 求问关于尼区和美区开通Claude 换设备登录telegram国内号码老账号 需要收费咋办? 发现hotmail的额度特别耐用 最近还有能正常用的claude中转站吗? 避雷闲鱼上面的CC中转站 现在cursor的优势是什么呢? OpenAI 回应马斯克要求罢免奥尔特曼:搞法律突袭,扰乱诉讼 谁在吹opencode go套餐啊,又慢量又少 【SamAltman】奥特曼被燃烧瓶袭击后的回应 咸鱼上359买的claude MAX 5x ,美国假家宽,看看能活几天 想问问跳蚤市场开的Pro和Plus 虚拟卡链接求助 [开源插件] 做了一个适合科研佬的GPT插件 【AI小说】拿AI跑了一部小说,佬们看看质量怎么样 总是能在首页看到opus4.6鞭尸推送 这个别名邮箱可以注册gpt 一个人在外地的话,佬们周末都做什么 你们ddg还能行不 获取不到新的邮箱 了····· claude code修复codex windows升级0.120.0 无法打开问题 我现在Zeabur上搭建了CPA服务,怎么再接入new api来做分发 杭州有么有佬友在搞AI应用这块的,四年前端转AI开发 汇丰、渣打两家银行获得香港稳定币牌照 【开源推广】 AIUsage:聚合多个 AI 平台配额与用量的 高颜值 macOS端 CPA看板 APP Newapi吃服务器内存多吗 中行跨境通疑限制无卡连续交易 或为应对盗刷 突然不能用表情回应话题了 codex是不是降额度了 反馈关于 “快问快答”标签的乱象 opencode版本1.4.3 无法上传图片问题 想问一下怎么解决这个问题,就是终端太多? codex更新到0.120.0之后无法加载以前的会话 sub2api怎么部署? 分享一个自用的南京继续教育平台视频自动播放下一集的油猴脚本 zotero9出来了 Claude正在向我推销付费项目,那能让你轻易得逞嘛 甲骨文用脚本开出来4个2+12咋办啊佬们,我还是免费号 各个厂的coding plan lite都绝版了? claude code 20美金账户问题 联通元景套餐续费问题 ai时代下的一些思考(诚邀大家讨论) 出境易GPT订阅pro求助 今年到目前股市的操作。 刚收到短信之前跑路的那家可以兑换了 佬们都用境外服务器做什么呢? 甲骨文4+24 求助领pro时候报错-付款页面出错。请重试。如果问题依然存在,请访问help.openai.com。 cloudflare 浏览器渲染增加了 CDP与mcp支持 SUB2API 导入 rt 时报错显示 Request failed with status code 502 如何解决 讨论一下怎么整理笔记 codex0.120.0更新后无法启动,回退 0.119.0正常使用 冰佬的公益站也不行了吗 三角洲直接给我封了10年 有佬友知道怎么起诉么 88VIP邀请 经过排查大概确定反重力代理报错问题了 【求助】openrouter 今年4月用国内visa卡充值后导致封禁,无法使用外国模型 奥特曼家被炸 自用,高信息量回复收集 求助sub2api分组问题 【新人报道】注册成功了 分享100个codex free账号 招聘 深圳客户端开发(flutter) 20k+
openwebui接入gpt-image-2的PIPE函数
alizoed · 2026-04-22 · via LINUX DO - 最新话题
Vibe的,目前我自己测试没问题,可以修改传入的quality还有分辨率 适合不习惯用那个绘图功能,喜欢单独选模型用的佬友,可填base url,要适配dall-e-3格式的API的 title: BLTCY GPT-Image-2 description: Minimal OpenAI-compatible GPT-Image-2 generation and multi-turn editing for Open WebUI author: Codex version: 0.2.0 licence: MIT “”" import base64 import hashlib import io import json import logging import mimetypes import re import time import uuid from typing import Any, AsyncIterable, Literal, Optional import httpx from fastapi import BackgroundTasks, Request, UploadFile from httpx import Response from open_webui.env import GLOBAL_LOG_LEVEL from open_webui.models.users import UserModel, Users from open_webui.routers.files import get_file_content_by_id, upload_file from pydantic import BaseModel, Field from starlette.datastructures import Headers from starlette.responses import StreamingResponse logger = logging.getLogger(name) logger.setLevel(GLOBAL_LOG_LEVEL) MODEL_ID = “gpt-image-2” DEFAULT_TIMEOUT = 600 MAX_REFERENCE_IMAGES = 4 class APIException(Exception): def init(self, status: int, content: str, response: Optional[Response] = None): self._status = status self._content = content self._response = response def __str__(self) -> str: try: data = json.loads(self._content) if isinstance(data, dict): if isinstance(data.get("error"), dict): return data["error"].get("message") or self._content if data.get("message"): return str(data["message"]) except Exception: pass if self._response is not None: try: self._response.raise_for_status() except Exception as err: return str(err) return self._content or "Unknown API error" class Pipe: class Valves(BaseModel): base_url: str = Field( default=“https://api.openai.com/v1”, title=“Base URL”, description=“OpenAI-compatible base URL”, ) api_key: str = Field(default=“”, title=“API Key”) default_quality: Literal[“”, “auto”, “low”, “medium”, “high”] = Field( default=“”, title=“默认质量”, description=“留空表示不传;可填 auto/low/medium/high”, ) default_size: str = Field( default=“”, title=“默认分辨率”, description=“留空表示不传;可填 auto 或 1024x1024/1536x1024 等”, ) def __init__(self): self.valves = self.Valves() def pipes(self): return [{"id": MODEL_ID, "name": MODEL_ID}] async def pipe( self, body: dict, __user__: dict, __request__: Request, ) -> StreamingResponse: return StreamingResponse( self._pipe(body=body, __user__=__user__, __request__=__request__), media_type="text/event-stream", ) async def _pipe( self, body: dict, __user__: dict, __request__: Request ) -> AsyncIterable[str]: if not self.valves.api_key: raise APIException(status=401, content="请先配置 API Key。") user = Users.get_user_by_id(__user__["id"]) context = await self._build_context( user=user, body=body, max_reference_images=MAX_REFERENCE_IMAGES, ) options = self._resolve_request_options() prompt = context["prompt"].strip() if not prompt and context["reference_images"]: prompt = "请基于参考图继续编辑" if not prompt: raise APIException(status=400, content="没有解析到可用的提示词。") endpoint, json_payload, form_data, files = self._build_request( prompt=prompt, reference_images=context["reference_images"], options=options, ) async with httpx.AsyncClient( base_url=self.valves.base_url.rstrip("/"), headers={"Authorization": f"Bearer {self.valves.api_key}"}, trust_env=True, timeout=DEFAULT_TIMEOUT, ) as client: if files: response = await client.post(url=endpoint, data=form_data, files=files) else: response = await client.post(url=endpoint, json=json_payload) if response.status_code != 200: raise APIException( status=response.status_code, content=response.text, response=response, ) data = response.json() results = await self._parse_response( data=data, __request__=__request__, user=user, ) usage = self._build_usage(data.get("usage")) content = "\n\n".join(results) is_stream = bool(body.get("stream")) if not is_stream: yield self._format_data( is_stream=False, model=MODEL_ID, content=content, usage=usage, finish_reason="stop", ) return yield self._format_data( is_stream=True, model=MODEL_ID, content=content, usage=None, finish_reason=None if usage else "stop", ) if usage: yield self._format_data( is_stream=True, model=MODEL_ID, content=None, usage=usage, finish_reason="stop", ) yield "data: [DONE]\n\n" async def _build_context( self, user: UserModel, body: dict, max_reference_images: int, ) -> dict[str, Any]: messages = [m for m in body.get("messages", []) if m.get("role") != "system"] recent_messages = messages[-6:] latest_user_texts: list[str] = [] reference_images: list[dict[str, Any]] = [] seen_digests: set[str] = set() for message in recent_messages: texts, images = await self._extract_message_parts( user=user, message_content=message.get("content"), ) if message.get("role") == "user": latest_user_texts = [text for text in texts if text.strip()] for image in images: if image["digest"] in seen_digests: continue seen_digests.add(image["digest"]) reference_images.append(image) if len(reference_images) > max_reference_images: reference_images = reference_images[-max_reference_images:] return { "prompt": "\n".join(latest_user_texts).strip(), "reference_images": reference_images, } async def _extract_message_parts( self, user: UserModel, message_content: Any, ) -> tuple[list[str], list[dict[str, Any]]]: texts: list[str] = [] images: list[dict[str, Any]] = [] if isinstance(message_content, str): file_ids = self._extract_generated_file_ids(message_content) for file_id in file_ids: image = await self._load_image_from_openwebui_file( user=user, file_id=file_id ) if image: images.append(image) cleaned_text = re.sub(r"!\[[^\]]*\]\([^)]+\)", " ", message_content) for line in cleaned_text.splitlines(): line = line.strip() if line: texts.append(line) return texts, images if isinstance(message_content, list): for item in message_content: item_type = item.get("type") if item_type == "text": text = (item.get("text") or "").strip() if text: texts.append(text) continue if item_type == "image_url": image_url = (item.get("image_url") or {}).get("url", "") image = await self._load_image_from_url( user=user, image_url=image_url ) if image: images.append(image) return texts, images return texts, images async def _load_image_from_url( self, user: UserModel, image_url: str, ) -> Optional[dict[str, Any]]: if not image_url: return None if image_url.startswith("data:"): header, encoded = image_url.split(",", 1) mime_type = header.split(";")[0].split(":")[1] binary = base64.b64decode(encoded) return self._build_image_reference( binary=binary, mime_type=mime_type, filename=f"image-{uuid.uuid4().hex}{self._guess_extension(mime_type)}", ) file_id = self._extract_file_id_from_url(image_url) if file_id: return await self._load_image_from_openwebui_file( user=user, file_id=file_id ) if image_url.startswith(("http://", "https://")): async with httpx.AsyncClient( trust_env=True, timeout=DEFAULT_TIMEOUT, ) as client: response = await client.get(image_url) response.raise_for_status() mime_type = response.headers.get("content-type", "image/png").split( ";" )[0] return self._build_image_reference( binary=response.content, mime_type=mime_type, filename=f"image-{uuid.uuid4().hex}{self._guess_extension(mime_type)}", ) return None async def _load_image_from_openwebui_file( self, user: UserModel, file_id: str, ) -> Optional[dict[str, Any]]: try: file_response = await get_file_content_by_id(id=file_id, user=user) except Exception: return None mime_type = mimetypes.guess_type(file_response.path)[0] or "image/png" with open(file_response.path, "rb") as file: binary = file.read() return self._build_image_reference( binary=binary, mime_type=mime_type, filename=getattr(file_response, "filename", None) or f"image-{file_id}{self._guess_extension(mime_type)}", ) def _build_image_reference( self, binary: bytes, mime_type: str, filename: str, ) -> dict[str, Any]: return { "binary": binary, "mime_type": mime_type, "filename": filename, "digest": hashlib.sha256(binary).hexdigest(), } def _resolve_request_options(self) -> dict[str, Any]: options = { "quality": self.valves.default_quality, "size": self.valves.default_size, } size = options["size"] or "" if size and size != "auto": self._validate_size_string(size) self._validate_gpt_image_2_size(size) return options def _validate_size_string(self, size: str) -> None: if size == "auto": return if re.fullmatch(r"\d+x\d+", size): return raise APIException( status=400, content="size 格式不合法,应为 auto 或 1024x1024 这种格式。", ) def _validate_gpt_image_2_size(self, size: str) -> None: width_str, height_str = size.split("x", 1) width = int(width_str) height = int(height_str) long_edge = max(width, height) short_edge = min(width, height) total_pixels = width * height if long_edge > 3840: raise APIException( status=400, content="gpt-image-2 的 size 最大边不能超过 3840。", ) if width % 16 != 0 or height % 16 != 0: raise APIException( status=400, content="gpt-image-2 的宽高都必须是 16 的倍数。", ) if long_edge / short_edge > 3: raise APIException( status=400, content="gpt-image-2 的长宽比不能超过 3:1。", ) if total_pixels < 655360 or total_pixels > 8294400: raise APIException( status=400, content="gpt-image-2 的总像素必须在 655,360 到 8,294,400 之间。", ) def _build_request( self, prompt: str, reference_images: list[dict[str, Any]], options: dict[str, Any], ) -> tuple[ str, Optional[dict[str, Any]], Optional[dict[str, Any]], list[tuple[str, tuple[str, bytes, str]]], ]: if reference_images: data = { "model": MODEL_ID, "prompt": prompt, } self._assign_optional_request_fields(data, options) files: list[tuple[str, tuple[str, bytes, str]]] = [] field_name = "image[]" if len(reference_images) > 1 else "image" for image in reference_images: files.append( ( field_name, (image["filename"], image["binary"], image["mime_type"]), ) ) return "/images/edits", None, data, files payload = { "model": MODEL_ID, "prompt": prompt, } self._assign_optional_request_fields(payload, options) return "/images/generations", payload, None, [] def _assign_optional_request_fields( self, payload: dict[str, Any], options: dict[str, Any], ) -> None: if options.get("quality"): payload["quality"] = options["quality"] if options.get("size"): payload["size"] = options["size"] async def _parse_response( self, data: dict, __request__: Request, user: UserModel, ) -> list[str]: results: list[str] = [] revised_prompts: list[str] = [] for item in data.get("data", []): revised_prompt = item.get("revised_prompt") if revised_prompt and revised_prompt not in revised_prompts: revised_prompts.append(revised_prompt) if item.get("b64_json"): results.append( self._upload_image( __request__=__request__, user=user, image_data=item["b64_json"], mime_type=self._get_response_mime_type(item), ) ) continue if item.get("url"): image_data, mime_type = await self._download_image(item["url"]) results.append( self._upload_image( __request__=__request__, user=user, image_data=image_data, mime_type=mime_type, ) ) if revised_prompts: results.insert(0, "模型重写后的提示词:\n" + "\n".join(revised_prompts)) if not results: raise APIException(status=500, content="接口没有返回可用图片。") return results async def _download_image(self, url: str) -> tuple[str, str]: async with httpx.AsyncClient( trust_env=True, timeout=DEFAULT_TIMEOUT, ) as client: response = await client.get(url) response.raise_for_status() mime_type = response.headers.get("content-type", "image/png").split(";")[0] encoded = base64.b64encode(response.content).decode("utf-8") return encoded, mime_type def _build_usage(self, usage: Optional[dict[str, Any]]) -> Optional[dict[str, Any]]: if not usage: return None prompt_tokens = usage.get("input_tokens", 0) completion_tokens = usage.get("output_tokens", 0) total_tokens = usage.get("total_tokens", prompt_tokens + completion_tokens) metadata = { key: value for key, value in usage.items() if key not in {"input_tokens", "output_tokens", "total_tokens"} } return { "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens, "prompt_tokens_details": usage.get("input_tokens_details", {}), "metadata": metadata, } def _upload_image( self, __request__: Request, user: UserModel, image_data: str, mime_type: str, ) -> str: extension = self._guess_extension(mime_type) file_item = upload_file( request=__request__, background_tasks=BackgroundTasks(), file=UploadFile( file=io.BytesIO(base64.b64decode(image_data)), filename=f"generated-image-{uuid.uuid4().hex}{extension}", headers=Headers({"content-type": mime_type}), ), process=False, user=user, metadata={"mime_type": mime_type}, ) image_url = __request__.app.url_path_for( "get_file_content_by_id", id=file_item.id ) return f"![bltcy-image-{file_item.id}]({image_url})" def _format_data( self, is_stream: bool, model: Optional[str], content: Optional[str], usage: Optional[dict[str, Any]], finish_reason: Optional[str], ) -> str: data: dict[str, Any] = { "id": f"chat.{uuid.uuid4().hex}", "object": "chat.completion.chunk", "choices": [], "created": int(time.time()), "model": model, } if content is not None: data["choices"] = [ { "finish_reason": finish_reason, "index": 0, "delta" if is_stream else "message": { "content": content, }, } ] if usage: data["usage"] = usage return f"data: {json.dumps(data, ensure_ascii=False)}\n\n" def _extract_generated_file_ids(self, text: str) -> list[str]: return re.findall( r"!\[bltcy-image-([^\]]+)\]", text, ) def _extract_file_id_from_url(self, url: str) -> Optional[str]: match = re.search(r"/files/([^/]+)/content", url) if match: return match.group(1) return None def _guess_extension(self, mime_type: str) -> str: return mimetypes.guess_extension(mime_type) or ".png" def _get_response_mime_type(self, item: dict[str, Any]) -> str: if item.get("mime_type"): return item["mime_type"] output_format = item.get("output_format") if output_format == "jpeg": return "image/jpeg" if output_format == "webp": return "image/webp" return "image/png" 1 个帖子 - 1 位参与者 阅读完整话题