惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

T
The Exploit Database - CXSecurity.com
A
Arctic Wolf
K
Kaspersky official blog
T
Threat Research - Cisco Blogs
PCI Perspectives
PCI Perspectives
www.infosecurity-magazine.com
www.infosecurity-magazine.com
P
Privacy International News Feed
K
KPMG report finds enterprise disconnect between AI and its ROI | CIO
U
Unit 42
cs.CL updates on arXiv.org
cs.CL updates on arXiv.org
Simon Willison's Weblog
Simon Willison's Weblog
P
Privacy & Cybersecurity Law Blog
O
OpenAI News
量子位
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
C
Cisco Blogs
AWS News Blog
AWS News Blog
Vercel News
Vercel News
Microsoft Security Blog
Microsoft Security Blog
Threat Intelligence Blog | Flashpoint
Threat Intelligence Blog | Flashpoint
美团技术团队
T
Threatpost
S
Schneier on Security
Cyber Security Advisories - MS-ISAC
Cyber Security Advisories - MS-ISAC
C
Cyber Attacks, Cyber Crime and Cyber Security
Last Week in AI
Last Week in AI
C
CERT Recently Published Vulnerability Notes
Blog — PlanetScale
Blog — PlanetScale
C
Cybersecurity and Infrastructure Security Agency CISA
F
Full Disclosure
博客园_首页
N
Netflix TechBlog - Medium
Security Latest
Security Latest
有赞技术团队
有赞技术团队
Google DeepMind News
Google DeepMind News
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
The Register - Security
The Register - Security
Application and Cybersecurity Blog
Application and Cybersecurity Blog
Recent Announcements
Recent Announcements
博客园 - Franky
P
Palo Alto Networks Blog
Project Zero
Project Zero
CTFtime.org: upcoming CTF events
CTFtime.org: upcoming CTF events
H
Help Net Security
Hacker News: Ask HN
Hacker News: Ask HN
Cisco Talos Blog
Cisco Talos Blog
H
Heimdal Security Blog
The Hacker News
The Hacker News
博客园 - 【当耐特】
GbyAI
GbyAI

宋浩志的博客

Windows安装nacos Edge用户登录一直转圈 使用coding持续集成SpringBoot项目 使用acme.sh来申请SSL证书 CentOS安装Nginx 使用Windows内置虚拟机Hyper-v安装CentOS ruoyi-vue项目集成flyway实现自动创建表 Docker常用命令记录 vue学习笔记四-列表渲染 vue学习笔记三条件渲染 vue学习笔记二模板语法 vue学习笔记一创建项目
SpringBoot整合Socket
宋浩志 · 2022-10-27 · via 宋浩志的博客

前言

前段时间公司一个物联网项目需要通过TCP连接设备收发消息,现在我把代码整理出来,分享一下。

源代码已发布在GitHub

使用ServerSocket绑定IP和端口,

TcpSocket实现Java的Runnable的类,在run方法中使用Accept监听端口是否有客户端发送连接请求,如果有连接来了就创建SocketReceive对象然后将他扔给线程池执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.example.socket_demo.socket;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author Administrator
*/
@Component
@Slf4j
public class TcpSocket implements Runnable {
public Integer port;
private ServerSocket server;
private ExecutorService threadPool;

public TcpSocket() {
try {
port = 8081;
threadPool = Executors.newCachedThreadPool();
server = new ServerSocket(port);
} catch (Exception e) {
log.error(e.getMessage());
}
}

@Override
public void run() {
while (true) {
try {
Socket socket = server.accept();
if (socket != null) {
SocketReceive socketReceive = new SocketReceive(socket);
threadPool.submit(socketReceive);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

在线连接

AllClientsMap类存放了所有的在线连接,通过hostAddresskeySocketValue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package com.example.socket_demo.socket;

import lombok.extern.slf4j.Slf4j;

import java.net.Socket;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

@Slf4j
public class AllClientsMap {

/**
* 所有已连接设备
*/
private static final ConcurrentMap<String, Socket> ALLCLIENTS = new ConcurrentHashMap<>();

/**
* 返回设备列表
*
* @return
*/
public static ConcurrentMap<String, Socket> getAllClients() {
return ALLCLIENTS;
}

/**
* 通过key获取客户端
*
* @return
*/
public static Socket getSocketByKey(String key) {
return ALLCLIENTS.get(key);
}

/**
* 添加设备到列表
*
* @param key
* @param socket
*/
public static void put(String key, Socket socket) {
ALLCLIENTS.put(key, socket);
log.info("设备Key:{}========ip:{}已加入列表", key, socket.getInetAddress().getHostAddress());
}

/**
* 移除设备
*
* @param key
*/
public static void remove(String key) {
ALLCLIENTS.remove(key);
log.info("已移除设备Key:{}", key);
}

/**
* 返回已连接设备数量
*
* @return
*/
public static int size() {
log.info("当前设备数:{}", ALLCLIENTS.size());
return ALLCLIENTS.size();
}

/**
* 打印信息
*
* @return
*/
public static void print() {
log.info("当前设备列表信息:长度:{}", ALLCLIENTS.size());
ALLCLIENTS.forEach((key, socket) -> {
log.info("设备Key:{}========ip:{}", key, socket.getInetAddress().getHostAddress());
});
}

/**
* 是否包含
*
* @param key
* @return
*/
public static boolean contains(String key) {
return ALLCLIENTS.containsKey(key);
}
}

创建SocketReceive

SocketReceive类中,我们可以执行相关的接收消息,以及业务操作;在第64行代码的位置,可以通过ApplicationContext获取Spring Bean执行业务代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package com.example.socket_demo.socket;

import lombok.extern.slf4j.Slf4j;

import java.io.DataInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;

@Slf4j
public class SocketReceive implements Runnable {
private Socket socket;

public SocketReceive() {
}

public SocketReceive(Socket socket) {
this.socket = socket;
}

@Override
public void run() {
while (true) {
if (null == socket) {
log.info("socket为空");
return;
}
boolean isClosed = socket.isClosed();
String hostAddress = socket.getInetAddress().getHostAddress();
if (isClosed) {
log.info("socket检测到关闭了");
if (AllClientsMap.contains(hostAddress)) {
AllClientsMap.remove(hostAddress);
AllClientsMap.print();
}
return;
}
String hostAddress = socket.getInetAddress().getHostAddress();
try {
//建立客户端信息输入流
DataInputStream in = new DataInputStream(socket.getInputStream());
//定义字节数组读取数据
byte[] bytes = new byte[1024];
int len = in.read(bytes);
if (len == -1) {
return;
}
//定义一个新数组copy,解决读取出来的数据字节不够全是0的问题
byte[] bytes1 = new byte[len];
System.arraycopy(bytes, 0, bytes1, 0, len);
log.info("客户端传的byte字节数组:" + printBytesByStringBuilder(bytes1));
String s = new String(bytes1);
log.info("客户端传的byte字节数组转换成字符串打印:" + s);
//转换hex数据
String data = byteArrayToHex(bytes1);
log.info("接收的16进制数据:" + data);
//如果服务端没有保存该socket
if(!AllClientsMap.contains(hostAddress)){
AllClientsMap.put(hostAddress, socket);
AllClientsMap.print();
}
log.debug("客户端" + hostAddress + "发送数据:{}", data);
//执行业务
System.out.println("执行业务");

//从map中获取客户端发送消息
response(AllClientsMap.getSocketByKey(hostAddress), data);

} catch (IOException e) {
if (AllClientsMap.contains(hostAddress)) {
AllClientsMap.remove(hostAddress);
AllClientsMap.print();
}
try {
socket.close();
log.error("{}断开连接", hostAddress);
return;
} catch (IOException ioException) {
log.error(ioException.getMessage());
}
}
}
}

/**
* 根据字节数组,输出对应的格式化字符串
*
* @param bytes 字节数组
* @return 字节数组字符串
*/
public static String printBytesByStringBuilder(byte[] bytes) {
StringBuilder stringBuilder = new StringBuilder();
for (byte aByte : bytes) {
stringBuilder.append(byte2String(aByte));
}
return stringBuilder.toString();
}

public static String byte2String(byte b) {
return String.format("%02x ", b);
}

/**
* 向socket发送消息
*
* @param socket 对应socket
* @param msg 消息
*/
public static void response(Socket socket, String msg) {
log.debug("向设备IP:{}发送消息:{}", socket.getInetAddress().getHostAddress(), msg);
OutputStream outputStream = null;
try {
outputStream = socket.getOutputStream();
outputStream.write(hexStringToByteArray(msg));
} catch (IOException e) {
try {
socket.close();
} catch (IOException ioException) {
log.error(ioException.getMessage());
}
log.error(e.getMessage());
}
}

/**
* 字节数组转字符串
*
* @param bytes
* @return
*/
public static String byteArrayToHex(byte[] bytes) {
StringBuilder result = new StringBuilder();
for (int index = 0, len = bytes.length; index <= len - 1; index += 1) {
int char1 = ((bytes[index] >> 4) & 0xF);
char chara1 = Character.forDigit(char1, 16);
int char2 = ((bytes[index]) & 0xF);
char chara2 = Character.forDigit(char2, 16);
result.append(chara1);
result.append(chara2);
}
return result.toString();
}

/**
* 16进制表示的字符串转换为字节数组
*
* @param hexString 16进制表示的字符串
* @return byte[] 字节数组
*/
public static byte[] hexStringToByteArray(String hexString) {
hexString = hexString.replaceAll(" ", "");
int len = hexString.length();
byte[] bytes = new byte[len / 2];
for (int i = 0; i < len; i += 2) {
// 两位一组,表示一个字节,把这样表示的16进制字符串,还原成一个字节
bytes[i / 2] = (byte) ((Character.digit(hexString.charAt(i), 16) << 4) + Character
.digit(hexString.charAt(i + 1), 16));
}
return bytes;
}
}

启动

通过继承SpringInitializingBean类,重写afterPropertiesSet方法,这个方法将在所有的属性被初始化后调用。
然后会创建一个线程执行ServerSocket的监听,初始化我们的TcpSocket对象,一旦Server接收到了连接请求后,会创建一个SocketReceive对象将其扔给线程池执行,在线程池中的SocketReceive对象可以通过ApplicationContext获取Spring Bean执行业务代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.example.socket_demo.socket;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class SpringFinishedListener implements InitializingBean {
@Autowired
private TcpSocket tcpsocket;

@Override
public void afterPropertiesSet() {
Thread serverThread = new Thread(tcpsocket);
serverThread.start();
}
}

测试工具

这里推荐一个测试工具,还挺好用的。下载连接