惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

S
Security Affairs
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
CTFtime.org: upcoming CTF events
CTFtime.org: upcoming CTF events
Jina AI
Jina AI
P
Palo Alto Networks Blog
GbyAI
GbyAI
大猫的无限游戏
大猫的无限游戏
A
Arctic Wolf
Hugging Face - Blog
Hugging Face - Blog
小众软件
小众软件
Y
Y Combinator Blog
T
The Blog of Author Tim Ferriss
Blog — PlanetScale
Blog — PlanetScale
S
Schneier on Security
V
Vulnerabilities – Threatpost
C
Cybersecurity and Infrastructure Security Agency CISA
雷峰网
雷峰网
T
Tenable Blog
人人都是产品经理
人人都是产品经理
T
Tor Project blog
C
Cyber Attacks, Cyber Crime and Cyber Security
AWS News Blog
AWS News Blog
Microsoft Security Blog
Microsoft Security Blog
J
Java Code Geeks
Scott Helme
Scott Helme
SecWiki News
SecWiki News
C
CERT Recently Published Vulnerability Notes
Recorded Future
Recorded Future
I
InfoQ
Security Archives - TechRepublic
Security Archives - TechRepublic
Help Net Security
Help Net Security
Cloudbric
Cloudbric
C
Check Point Blog
Engineering at Meta
Engineering at Meta
TaoSecurity Blog
TaoSecurity Blog
B
Blog
Cyber Security Advisories - MS-ISAC
Cyber Security Advisories - MS-ISAC
博客园_首页
N
News and Events Feed by Topic
云风的 BLOG
云风的 BLOG
MyScale Blog
MyScale Blog
腾讯CDC
量子位
Application and Cybersecurity Blog
Application and Cybersecurity Blog
K
Kaspersky official blog
Vercel News
Vercel News
F
Full Disclosure
T
Troy Hunt's Blog
Forbes - Security
Forbes - Security
S
Security @ Cisco Blogs

宋浩志的博客

Windows安装nacos Edge用户登录一直转圈 使用coding持续集成SpringBoot项目 使用acme.sh来申请SSL证书 CentOS安装Nginx 使用Windows内置虚拟机Hyper-v安装CentOS ruoyi-vue项目集成flyway实现自动创建表 Docker常用命令记录 vue学习笔记四-列表渲染 vue学习笔记三条件渲染 vue学习笔记二模板语法 vue学习笔记一创建项目
SpringBoot整合Socket
宋浩志 · 2022-10-27 · via 宋浩志的博客

前言

前段时间公司一个物联网项目需要通过TCP连接设备收发消息,现在我把代码整理出来,分享一下。

源代码已发布在GitHub

使用ServerSocket绑定IP和端口,

TcpSocket实现Java的Runnable的类,在run方法中使用Accept监听端口是否有客户端发送连接请求,如果有连接来了就创建SocketReceive对象然后将他扔给线程池执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.example.socket_demo.socket;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author Administrator
*/
@Component
@Slf4j
public class TcpSocket implements Runnable {
public Integer port;
private ServerSocket server;
private ExecutorService threadPool;

public TcpSocket() {
try {
port = 8081;
threadPool = Executors.newCachedThreadPool();
server = new ServerSocket(port);
} catch (Exception e) {
log.error(e.getMessage());
}
}

@Override
public void run() {
while (true) {
try {
Socket socket = server.accept();
if (socket != null) {
SocketReceive socketReceive = new SocketReceive(socket);
threadPool.submit(socketReceive);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

在线连接

AllClientsMap类存放了所有的在线连接,通过hostAddresskeySocketValue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package com.example.socket_demo.socket;

import lombok.extern.slf4j.Slf4j;

import java.net.Socket;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

@Slf4j
public class AllClientsMap {

/**
* 所有已连接设备
*/
private static final ConcurrentMap<String, Socket> ALLCLIENTS = new ConcurrentHashMap<>();

/**
* 返回设备列表
*
* @return
*/
public static ConcurrentMap<String, Socket> getAllClients() {
return ALLCLIENTS;
}

/**
* 通过key获取客户端
*
* @return
*/
public static Socket getSocketByKey(String key) {
return ALLCLIENTS.get(key);
}

/**
* 添加设备到列表
*
* @param key
* @param socket
*/
public static void put(String key, Socket socket) {
ALLCLIENTS.put(key, socket);
log.info("设备Key:{}========ip:{}已加入列表", key, socket.getInetAddress().getHostAddress());
}

/**
* 移除设备
*
* @param key
*/
public static void remove(String key) {
ALLCLIENTS.remove(key);
log.info("已移除设备Key:{}", key);
}

/**
* 返回已连接设备数量
*
* @return
*/
public static int size() {
log.info("当前设备数:{}", ALLCLIENTS.size());
return ALLCLIENTS.size();
}

/**
* 打印信息
*
* @return
*/
public static void print() {
log.info("当前设备列表信息:长度:{}", ALLCLIENTS.size());
ALLCLIENTS.forEach((key, socket) -> {
log.info("设备Key:{}========ip:{}", key, socket.getInetAddress().getHostAddress());
});
}

/**
* 是否包含
*
* @param key
* @return
*/
public static boolean contains(String key) {
return ALLCLIENTS.containsKey(key);
}
}

创建SocketReceive

SocketReceive类中,我们可以执行相关的接收消息,以及业务操作;在第64行代码的位置,可以通过ApplicationContext获取Spring Bean执行业务代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package com.example.socket_demo.socket;

import lombok.extern.slf4j.Slf4j;

import java.io.DataInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;

@Slf4j
public class SocketReceive implements Runnable {
private Socket socket;

public SocketReceive() {
}

public SocketReceive(Socket socket) {
this.socket = socket;
}

@Override
public void run() {
while (true) {
if (null == socket) {
log.info("socket为空");
return;
}
boolean isClosed = socket.isClosed();
String hostAddress = socket.getInetAddress().getHostAddress();
if (isClosed) {
log.info("socket检测到关闭了");
if (AllClientsMap.contains(hostAddress)) {
AllClientsMap.remove(hostAddress);
AllClientsMap.print();
}
return;
}
String hostAddress = socket.getInetAddress().getHostAddress();
try {
//建立客户端信息输入流
DataInputStream in = new DataInputStream(socket.getInputStream());
//定义字节数组读取数据
byte[] bytes = new byte[1024];
int len = in.read(bytes);
if (len == -1) {
return;
}
//定义一个新数组copy,解决读取出来的数据字节不够全是0的问题
byte[] bytes1 = new byte[len];
System.arraycopy(bytes, 0, bytes1, 0, len);
log.info("客户端传的byte字节数组:" + printBytesByStringBuilder(bytes1));
String s = new String(bytes1);
log.info("客户端传的byte字节数组转换成字符串打印:" + s);
//转换hex数据
String data = byteArrayToHex(bytes1);
log.info("接收的16进制数据:" + data);
//如果服务端没有保存该socket
if(!AllClientsMap.contains(hostAddress)){
AllClientsMap.put(hostAddress, socket);
AllClientsMap.print();
}
log.debug("客户端" + hostAddress + "发送数据:{}", data);
//执行业务
System.out.println("执行业务");

//从map中获取客户端发送消息
response(AllClientsMap.getSocketByKey(hostAddress), data);

} catch (IOException e) {
if (AllClientsMap.contains(hostAddress)) {
AllClientsMap.remove(hostAddress);
AllClientsMap.print();
}
try {
socket.close();
log.error("{}断开连接", hostAddress);
return;
} catch (IOException ioException) {
log.error(ioException.getMessage());
}
}
}
}

/**
* 根据字节数组,输出对应的格式化字符串
*
* @param bytes 字节数组
* @return 字节数组字符串
*/
public static String printBytesByStringBuilder(byte[] bytes) {
StringBuilder stringBuilder = new StringBuilder();
for (byte aByte : bytes) {
stringBuilder.append(byte2String(aByte));
}
return stringBuilder.toString();
}

public static String byte2String(byte b) {
return String.format("%02x ", b);
}

/**
* 向socket发送消息
*
* @param socket 对应socket
* @param msg 消息
*/
public static void response(Socket socket, String msg) {
log.debug("向设备IP:{}发送消息:{}", socket.getInetAddress().getHostAddress(), msg);
OutputStream outputStream = null;
try {
outputStream = socket.getOutputStream();
outputStream.write(hexStringToByteArray(msg));
} catch (IOException e) {
try {
socket.close();
} catch (IOException ioException) {
log.error(ioException.getMessage());
}
log.error(e.getMessage());
}
}

/**
* 字节数组转字符串
*
* @param bytes
* @return
*/
public static String byteArrayToHex(byte[] bytes) {
StringBuilder result = new StringBuilder();
for (int index = 0, len = bytes.length; index <= len - 1; index += 1) {
int char1 = ((bytes[index] >> 4) & 0xF);
char chara1 = Character.forDigit(char1, 16);
int char2 = ((bytes[index]) & 0xF);
char chara2 = Character.forDigit(char2, 16);
result.append(chara1);
result.append(chara2);
}
return result.toString();
}

/**
* 16进制表示的字符串转换为字节数组
*
* @param hexString 16进制表示的字符串
* @return byte[] 字节数组
*/
public static byte[] hexStringToByteArray(String hexString) {
hexString = hexString.replaceAll(" ", "");
int len = hexString.length();
byte[] bytes = new byte[len / 2];
for (int i = 0; i < len; i += 2) {
// 两位一组,表示一个字节,把这样表示的16进制字符串,还原成一个字节
bytes[i / 2] = (byte) ((Character.digit(hexString.charAt(i), 16) << 4) + Character
.digit(hexString.charAt(i + 1), 16));
}
return bytes;
}
}

启动

通过继承SpringInitializingBean类,重写afterPropertiesSet方法,这个方法将在所有的属性被初始化后调用。
然后会创建一个线程执行ServerSocket的监听,初始化我们的TcpSocket对象,一旦Server接收到了连接请求后,会创建一个SocketReceive对象将其扔给线程池执行,在线程池中的SocketReceive对象可以通过ApplicationContext获取Spring Bean执行业务代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.example.socket_demo.socket;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class SpringFinishedListener implements InitializingBean {
@Autowired
private TcpSocket tcpsocket;

@Override
public void afterPropertiesSet() {
Thread serverThread = new Thread(tcpsocket);
serverThread.start();
}
}

测试工具

这里推荐一个测试工具,还挺好用的。下载连接