





















同时解决了重复图连续上传的bug。
计算MD5(CPU密集型)
查询数据库MD5验重(尽量一次批量查询、避免多次查询)
图片处理(CPU密集型)
图片写入(重复的图片不写入、只返回消息提醒)
数据库写入(数据库通常为并发瓶颈,且需要返回自增ID所以无法利用批量插入!)
收集结果,并按照顺序返回(实际上也希望按照顺序写库!)
使用try-with-resources确保流关闭,并使用日志记录异常
顺序保证,似乎瓶颈是写库
并行流vs线程池?
存图和写库是IO密集型,所以可以使用线程池来提高效率
<!--1.请求异步处理-->
<!--2.任务并发处理-->
<!--2.1 线程池-->
<!--2.2 并行调用-->
<!--2.3 任务封装起手式-->
<!--进度提醒-->
<!--异常捕获-->
<!--日志记录-->
初步想法是:图片验重、图片处理、图片存储采用并发无序的方式,但写库需要保证原有顺序,且需要兼顾自增+批量处理(需要调整xml)
关于并发+顺序的问题:https://www.cnblogs.com/rever/p/14768553.html
有几种实现方式:
Thread.join()
单线程线程池(意义?)
使用volatile关键字修饰的信号量实现
使用Lock和信号量实现
这涉及到了可见性,原子性,有序性的概念,还有自旋的概念:https://www.cnblogs.com/james0/p/9280144.html
2.1JMM-java memory model
解决并发过程中如何处理可见性,原子性,有序性的问题
runnable ,thread。
并发编程中的2个关键问题:
a. 线程间如何通信 --wait() notify() notifyAll()
a) 共享内存 -隐式通信
b) 消息传递 - 显示通信
b. 线程之间如何同步
在共享内存的并发模型中,同步是显示做的; synchronized
在消息传递的并发模型中,由于小时发送必须在消息接受之前,所以同步是隐式的
mybatis的批量插入:https://www.cnblogs.com/Marydon20170307/p/17747109.html
<insert id="batchInsertUsers" parameterType="java.util.List">
INSERT INTO user_info (name, age, created_time)
VALUES
<foreach collection="list" item="user" separator=",">
(#{user.name}, #{user.age}, NOW())
</foreach>
</insert>
避免内存溢出
分批次插入(如每1000条/次),并控制每次提交后等待时间,防止内存占用过高1。事务管理
批量插入建议使用SqlSession的commit方法,确保数据一致性1。性能优化
数据库端配置:调整innodb_buffer_pool_size、innodb_log_file_size等参数1。
应用端配置:使用连接池(如HikariCP),并设置合理的批处理大小(如1000条/次)
Thumbnails库支持并发处理图片吗?是的
当前链的筛选器或servlet不支持异步操作
20-Sep-2025 20:11:08.916 警告 [catalina-exec-39] org.apache.catalina.connector.Request.startAsync 无法启动async,因为处理链中的下列类不支持async[com.zhuyu.login.servlet.GalleryServlet]
java.lang.IllegalStateException: 当前链的筛选器或servlet不支持异步操作。Servlet显示声明异步即可:
@WebServlet(value = "/galleryServlet",asyncSupported = true) <insert id="batchAddImgs" parameterType="java.util.List" useGeneratedKeys="true" keyProperty="id">
insert into gallery(cid,uid,src,thumbSrc,MD5,imgtype,w,h)
VALUES
<foreach collection="list" item="g" separator=",">
(#{g.cid},#{g.uid},#{g.src},#{g.thumbSrc},#{g.MD5},#{g.imgtype},#{g.w},#{g.h})
</foreach>
</insert>批量查询传参失败,修改如下
<select id="batchImgById" parameterType="java.util.List" resultType="com.zhuyu.login.javabean.Gallery">
select * from gallery where id in
<foreach collection="list" item="gid" separator="," open="(" close=")">
#{gid}
</foreach>
</select>{"error":"处理失败: ### Error querying database. Cause: java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near ' (5254) , (5255) , (525' at line 4 ### The error may exist in galleryMapper.xml ### The error may involve com.zhuyu.login.dao.GalleryDao.batchImgById-Inline ### The error occurred while setting parameters ### SQL: select * from gallery where id in (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) , (?) ### Cause: java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near ' (5254) , (5255) , (525' at line 4"}修改jdbc连接,支持批量操作(该操作实际上是jdbc自己的批处理,针对的是多条sql语句insert、update、delete的优化,而非本案通过foreach拼接的单条sql。)
jdbc:mysql://localhost:3306/mybatis?rewriteBatchedStatements=true该优化需要结合ExecutorType.BATCH使用如下,这里暂不修改:
SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH)验重操作失败,错误写法:
ConcurrentHashMap<String,Boolean> md5s;
boolean isDuplicate = false;
if(md5s.contains(md5)){
isDuplicate = true;
}else{
md5s.put(md5,true);
}根据ds的说法,这段代码的问题在于检查然后行动(check-then-act) 模式不是原子操作。在多线程环境下,可能发生以下情况:
线程A检查md5s.contains(md5),返回false
线程B检查md5s.contains(md5),也返回false
线程A执行md5s.put(md5,true)
线程B也执行md5s.put(md5,true),但实际上这是重复的
这样两个线程都会认为图片不重复,导致重复图片被处理。
正确写法:
ConcurrentHashMap<String,Boolean> md5s;
boolean isDuplicate = false;
// 使用putIfAbsent的返回值判断是否已存在(保证原子性)
if(md5s.putIfAbsent(md5, Boolean.TRUE) != null) {
isDuplicate = true;
}// 伪代码
for loop{
future = completionService.take();
result = future.get();
if(result.true){
galleries.add(result.getGallery());
}
}修改逻辑是:图片可以异步处理,但是返回的result可以暂时按照顺序存储:
// 伪代码
GalleryUploadProcessResult[] orderedResults = new GalleryUploadProcessResult[futures.size()];
for (future:futures){
future = completionService.take();
result = future.get();
// 将结果放入正确的位置
orderedResults[result.getOriginalIndex()] = result;
}
for (result:results){
if(result.true){
galleries.add(result.getGallery());
}
}排查后端:
前端排查:
之所以称worker的debug为玄学,是因为单纯用firefox不行,换到Chrome也不行,但是通过Chrome的控制台直接查看worker日志,发现广播成功了、但是由于chrome更严格的同源策略,发现在project.zhuyu.xyz页面的监听无法捕捉到iframe(www.zhuyu.xyz)的广播。然后再次用Firefox测试,它就可行了。。。。🤦♂️
图片并发处理的基本思路是:线程池配置 -> 构造异步上下文AsyncContext -> 通过异步监听器AsyncListener管理、ExecutorService提交异步 -> 通过UploadTask(实现了Runnable)进行流程的定义 -> 请求解析、路径处理、进度更新 -> 并发任务封装与实现:Future -> CompletionService -> ExecutorCompletionService(ExecutorService)-> 真正的图片处理模块FileProcessor,并发存图结束后,再根据验重情况进行批量写库
线上测试暂时OK,可以看到最卡顿的阶段依然是图片处理阶段,而非网络(带宽)、数据库。最近某云在搞2核升级4核的免费活动,再想想要不要升级呢。
最后附上后端完全的代码实现如下:
// 添加类级别变量(线程池配置)
private static final Logger logger = LoggerFactory.getLogger(GalleryServlet.class);
private static final int MAX_UPLOAD_THREADS = Runtime.getRuntime().availableProcessors() * 2;
private static final ExecutorService uploadExecutor = Executors.newFixedThreadPool(MAX_UPLOAD_THREADS);
private static final ConcurrentMap<String, UploadProgress> progressMap = new ConcurrentHashMap<>();
// 传图接口
protected void addTry(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException, FileUploadException {
// 启用异步处理
final AsyncContext asyncContext = req.startAsync(req, resp);
asyncContext.setTimeout(30 * 60 * 1000); // 30分钟超时
// 生成进度ID
String progressId = UUID.randomUUID().toString();
progressMap.put(progressId, new UploadProgress(0, "初始化上传"));
//md5暂存(重复图检测)
ConcurrentHashMap<String,Boolean> md5s = new ConcurrentHashMap<>();
// 设置异步监听器
asyncContext.addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent event) throws IOException {
progressMap.remove(progressId);
}
@Override
public void onTimeout(AsyncEvent event) throws IOException {
logger.error("上传处理超时: {}", progressId);
progressMap.remove(progressId);
try {
event.getAsyncContext().getResponse().getWriter().write(
"{\"error\":\"处理超时\"}");
} catch (IOException e) {
logger.error("超时响应失败", e);
}
}
@Override
public void onError(AsyncEvent event) throws IOException {
logger.error("上传处理错误: {}", progressId, event.getThrowable());
progressMap.remove(progressId);
}
@Override
public void onStartAsync(AsyncEvent event) throws IOException {
// 不需要实现
}
});
// 提交异步任务
uploadExecutor.submit(new UploadTask(asyncContext, progressId,md5s));
}
// 上传任务类
private class UploadTask implements Runnable {
private final AsyncContext asyncContext;
private final String progressId;
private int uid = 0;
ConcurrentHashMap<String,Boolean> md5s;
public UploadTask(AsyncContext asyncContext, String progressId,ConcurrentHashMap<String,Boolean> md5s) {
this.asyncContext = asyncContext;
this.progressId = progressId;
this.md5s = md5s;
}
@Override
public void run() {
HttpServletRequest req = (HttpServletRequest) asyncContext.getRequest();
HttpServletResponse resp = (HttpServletResponse) asyncContext.getResponse();
try {
req.setCharacterEncoding("utf-8");
uid = Integer.parseInt(req.getParameter("uid"));
String imgtype = req.getParameter("gallerytype");
Integer cid = Integer.parseInt(req.getParameter("cid"));
String basePath = FileUtil.getBasePath(req);
String path = "media/caseGallery/"+cid+"/"+imgtype;
// 更新进度
updateProgress(10, "创建目录结构");
// 创建目录
FileUtil.checkExist(basePath + path + "/original");
FileUtil.checkExist(basePath + path + "/thumb");
// 解析请求
DiskFileItemFactory diskFileItemFactory = new DiskFileItemFactory();
ServletFileUpload servletFileUpload = new ServletFileUpload(diskFileItemFactory);
List<FileItem> listFile = servletFileUpload.parseRequest(req);
int totalFiles = listFile.size();
// 更新进度
updateProgress(20, "开始处理 " + totalFiles + " 个文件");
// 使用CompletionService实现并发处理
CompletionService<GalleryUploadProcessResult> completionService =
new ExecutorCompletionService<>(uploadExecutor);
List<Future<GalleryUploadProcessResult>> futures = new ArrayList<>();
List<Gallery> galleryList = new ArrayList<>();
String msg = "";
// 提交所有文件处理任务
for (int i = 0; i < listFile.size(); i++) {
FileItem img = listFile.get(i);
futures.add(completionService.submit(
new FileProcessor(img, i, path, basePath, uid, cid, imgtype)));
}
//galleries
List<Gallery> galleries = new ArrayList<>();
// 创建一个数组来存储结果,保持原始顺序
GalleryUploadProcessResult[] orderedResults = new GalleryUploadProcessResult[futures.size()];
// 等待所有任务完成并收集结果
for (int i = 0; i < futures.size(); i++) {
try {
// 更新进度
int progress = 20 + (int)((i * 60.0) / futures.size());
updateProgress(progress, "处理文件中 (" + (i+1) + "/" + futures.size() + ")");
Future<GalleryUploadProcessResult> future = completionService.take();
GalleryUploadProcessResult result = future.get();
// 将结果放入正确的位置
orderedResults[result.getOriginalIndex()] = result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("处理被中断", e);
msg += "\n处理被中断";
} catch (ExecutionException e) {
logger.error("处理执行错误", e);
msg += "\n处理执行错误: " + e.getMessage();
}
}
//顺序处理
for (int i = 0;i < orderedResults.length;i++ ){
GalleryUploadProcessResult result = orderedResults[i];
if (result.isSuccess()) {
if (result.isDuplicate()) {
msg += "\n存在重复上传,已经自动忽略:" + result.getFileName();
logger.info("重复文件已跳过: {}", result.getFileName());
} else {
galleries.add(result.getGallery());
}
} else {
logger.error("文件处理失败: {}", result.getFileName(), result.getError());
msg += "\n文件处理失败: " + result.getFileName() + " - " + result.getError().getMessage();
}
}
// 更新进度
updateProgress(90, "完成文件处理,正在写库");
//批量写库
if(!CollectionUtil.isEmpty(galleries)){
List<Integer> gids = galleryService.batchAddImgs(galleries);
galleryList = galleryService.batchImgById(gids);
}
// 准备响应
Map<String,Object> map = new HashMap<>();
map.put("cid", cid);
map.put("imgtype", imgtype);
int count = galleryService.imgCount(map);
map.put("count", count);
map.put("uploaded", galleryList);
map.put("msg", msg);
map.put("progressId", progressId);
String json = JsonBuilderUtil.collectionToJson(map);
resp.getWriter().write(json);
// 更新进度
updateProgress(100, "上传完成");
} catch (Exception e) {
logger.error("上传处理失败", e);
try {
resp.getWriter().write("{\"error\":\"处理失败: " + e.getMessage() + "\"}");
} catch (IOException ex) {
logger.error("发送错误响应失败", ex);
}
} finally {
asyncContext.complete();
}
}
private void updateProgress(int progress, String message) {
String msgJson = "{\"socketJsonType\":\"galleryUploadProcess\",\"content\":\""+progress+"%\",\"statusCode\":0}";
try {
ServerManager.broadCast(""+uid,msgJson);
}catch (Exception e){
e.printStackTrace();
}
}
// 文件处理器
private class FileProcessor implements Callable<GalleryUploadProcessResult> {
private final FileItem img;
private final int index;
private final String path;
private final String basePath;
private final Integer uid;
private final Integer cid;
private final String imgtype;
public FileProcessor(FileItem img, int index, String path, String basePath,
Integer uid, Integer cid, String imgtype) {
this.img = img;
this.index = index;
this.path = path;
this.basePath = basePath;
this.uid = uid;
this.cid = cid;
this.imgtype = imgtype;
}
@Override
public GalleryUploadProcessResult call() {
GalleryUploadProcessResult result = new GalleryUploadProcessResult();
result.setFileName(img.getName());
try {
// 计算MD5并检查重复
String md5;
try (InputStream inputStream = img.getInputStream()) {
md5 = DigestUtils.md5Hex(inputStream);
}
boolean isDuplicate = false;
// 使用putIfAbsent的返回值判断是否已存在(注意保证原子性)
if(md5s.putIfAbsent(md5, Boolean.TRUE) != null) {
isDuplicate = true;
}
// 检查是否重复
String existingImg = "";
if (!isDuplicate){
existingImg = galleryService.imgByMD5(md5);
}
if (isDuplicate || (existingImg != null && !existingImg.isEmpty())) {
result.setDuplicate(true);
result.setSuccess(true);
return result;
}
// 保存图片
Gallery gallery = saveImg(img, index, path, basePath);
gallery.setCid(cid);
gallery.setUid(uid);
gallery.setMD5(md5);
gallery.setImgtype(Byte.parseByte(imgtype));
result.setGallery(gallery);
result.setSuccess(true);
} catch (Exception e) {
result.setSuccess(false);
result.setError(e);
logger.error("处理文件失败: {}", img.getName(), e);
}
return result;
}
}
}
此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。