SpringBoot中的服务端推送SSE
服务器发送事件(Server-sent Events)是一种服务器向客户端发送事件和数据的单向通讯。Server-Sent 事件指的是网页自动获取来自服务器的更新。以前也可能做到这一点,前提是网页不得不询问是否有可用的更新。通过服务器发送事件,更新能够自动到达。
# 优缺点对比
# WebSocket
- 优点:双工通信
- 缺点:需专门定义数据协议,解析数据流,且部分服务器支持不完善,后台例如java spring boot 2.1.2 仅支持websocket 1.0(最高已达1.3)
# SSE
- 优点:开发简单,和传统的http开发几乎无任何差别,客户端开发简单,有标准支持(EventSource)
- 缺点:和websocket相比,只能单工通信,建立连接后,只能由服务端发往客户端,且占用一个连接,如需客户端向服务端通信,需额外打开一个连接
# Springboot的Server实现
/**
* @author blog.unclezs.com
* @date 2021/07/17
*/
@RestController
@SpringBootApplication
public class SseServer implements ApplicationContextAware {
private ApplicationContext context;
public static void main(String[] args) {
SpringApplication.run(SseServer.class, args);
}
@GetMapping(value = "/sse/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public ResponseEntity<SseEmitter> sseServer(@PathVariable(value = "id") String id) {
// 防止nginx缓存请求
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.set("X-Accel-Buffering", "no");
httpHeaders.setCacheControl(CacheControl.noCache());
SseService manager = context.getBean(SseService.class);
SseEmitter emitter = manager.registerSseEmitter(id);
return ResponseEntity.ok().contentType(MediaType.TEXT_EVENT_STREAM).headers(httpHeaders).body(emitter);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.context = applicationContext;
new Thread(() -> {
while (true) {
try {
// 模拟推送消息
context.getBean(SseService.class).sendMessage("123", String.format("sse消息:【%s】", DateUtil.now()));
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* @author blog.unclezs.com
* @date 2021/07/17
*/
@Service
public class SseServiceImpl implements SseService {
private static final Map<String, SseEmitter> SSE_EMITTERS = new HashMap<>();
public SseEmitter registerSseEmitter(String id) {
SseEmitter emitter = new SseEmitter(60L * 1000L);
emitter.onCompletion(() -> System.out.println("SseEmitter is completed"));
emitter.onTimeout(() -> System.out.println("SseEmitter is timed out"));
emitter.onError((ex) -> System.out.println("SseEmitter got error:" + ex.getMessage()));
SSE_EMITTERS.put(id, emitter);
return emitter;
}
@Override
public void sendMessage(String id, Object data) {
SseEmitter emitter = SSE_EMITTERS.get(id);
if (emitter == null) {
return;
}
System.out.println("发送消息:" + id + " data: " + data);
try {
emitter.send(data);
} catch (IOException e) {
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 客户端实现
# Okhttp版本
/**
* @author blog.unclezs.com
* @date 2021/07/17
*/
public class OkSseClient {
public static void main(String[] args) {
// 定义see接口
Request request = new Request.Builder().url("http://127.0.0.1:8080/sse/123").build();
OkHttpClient okHttpClient = new OkHttpClient.Builder()
.connectTimeout(1, TimeUnit.DAYS)
.readTimeout(1, TimeUnit.DAYS)//这边需要将超时显示设置长一点,不然刚连上就断开,之前以为调用方式错误被坑了半天
.build();
// 实例化EventSource,注册EventSource监听器
RealEventSource realEventSource = new RealEventSource(request, new EventSourceListener() {
private long callStartNanos;
private void printEvent(String name) {
long nowNanos = System.nanoTime();
if (name.equals("callStart")) {
callStartNanos = nowNanos;
}
long elapsedNanos = nowNanos - callStartNanos;
System.out.printf("=====> %.3f %s%n", elapsedNanos / 1000000000d, name);
}
@Override
public void onOpen(EventSource eventSource, Response response) {
printEvent("onOpen");
}
@Override
public void onEvent(EventSource eventSource, String id, String type, String data) {
printEvent("onEvent");
//请求到的数据
System.out.println(data);
}
@Override
public void onClosed(EventSource eventSource) {
printEvent("onClosed");
}
@Override
public void onFailure(EventSource eventSource, Throwable t, Response response) {
t.printStackTrace();
//这边可以监听并重新打开
printEvent("onFailure");
}
});
//真正开始请求的一步
realEventSource.connect(okHttpClient);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# Java原生实现
/**
* @author blog.unclezs.com
* @date 2021/07/17
*/
public class SseClient {
/**
* 获取SSE输入流。
*
* @param urlPath /
* @return /
* @throws IOException /
*/
public static InputStream getSseInputStream(String urlPath) throws IOException {
URL url = new URL(urlPath);
HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
// 这儿根据自己的情况选择get或post
urlConnection.setRequestMethod("GET");
urlConnection.setDoOutput(true);
urlConnection.setDoInput(true);
urlConnection.setUseCaches(false);
urlConnection.setRequestProperty("Connection", "Keep-Alive");
urlConnection.setRequestProperty("Charset", "UTF-8");
//读取过期时间(很重要,建议加上)
urlConnection.setReadTimeout(20 * 1000);
// text/plain模式
urlConnection.setRequestProperty("Content-Type", "text/plain; charset=UTF-8");
InputStream inputStream = urlConnection.getInputStream();
return new BufferedInputStream(inputStream);
}
/**
* 读取数据。
*
* @param is /
* @param sseMessageHandler /
* @throws IOException /
*/
public static void readStream(InputStream is, SseMessageHandler sseMessageHandler) throws IOException {
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line;
while ((line = reader.readLine()) != null) {
// 处理数据接口
sseMessageHandler.actMsg(is, line);
}
// 当服务器端主动关闭的时候,客户端无法获取到信号。现在还不清楚原因。所以无法执行的此处。
reader.close();
} catch (IOException e) {
e.printStackTrace();
throw new IOException("关闭数据流!");
}
}
public static void main(String[] args) throws IOException {
String urlPath = "http://localhost:8080/sse/123";
InputStream inputStream = getSseInputStream(urlPath);
readStream(inputStream, (is, line) -> System.out.println(line));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# 最后
在 GitHub 编辑此页 (opens new window)
上次更新: 2024/02/25, 12:11:11