Exploring
首页
  • Java

    • 面向对象的思想OOP
    • 浅谈Java反射原理
    • endorsed覆盖JDK中的类
  • 认证与授权

    • LDAP概念和原理介绍
    • OAuth2介绍
  • Impala

    • Impala 介绍
  • MySQL

    • 关于MySQL的一些面试题
    • 解决MySQL不到中文数据
    • 数据库之事务与实现原理
  • Oracle

    • oracle的表空间,用户管理,表操作,函数
    • oracle的查询、视图、索引
    • plsql简单入门
  • Redis

    • 数据类型详解
    • 跳越表
    • 数据持久化的两种方式
  • 共识算法

    • gossip
  • RPC

    • GRPC初识与快速入门
    • ProtocolBuffer基本语法
  • RabbitMQ

    • RabbitMQ入门程序之HelloWorld
    • RabbitMQ之工作模式
  • Zookeeper

    • Zookeeper一文入门
  • Docker

    • Docker入门初体验
  • Maven

    • 把自己的包到Maven中央仓库
    • Maven之自定义插件
  • Nginx

    • nginx的安装
    • nginx的配置文件
    • nignx 的变量
  • Tomcat

    • Servlet3通过SPI进行注册组件
  • Vagrant

    • vagrant 初始化
    • vagrant 常用配置
    • vagrant 自己制作 box
  • Linux

    • 启动方式 Systemd
    • 后台服务
    • 防火墙与 Iptables
  • 设计模式

    • 设计模式-代理
    • 设计模式-单例模式
    • 设计模式-迭代器
  • 分布式

    • CAP 理论
  • 数据结构

    • 数据结构之堆Heap
    • 数据结构之哈希表
    • 数据结构之队列
  • 计算机网络

    • HTTP与HTTPS详解
    • 浅谈DNS协议
    • ISP中的网络层
  • 算法

    • 常用查找算法及Java实现
    • 常用排序算法及Java实现
    • 迪杰斯特拉算法
  • 操作系统

    • 操作系统之进程调度算法
    • 操作系统之进程通讯IPC
    • 操作系统之内存管理
  • 抓包

    • 生成安卓系统证书
  • 加解密

    • 常见加密算法
    • 公开秘钥基础知识
    • RSA 解析
  • Windows

    • scoop 包管理
    • windows-terminal 配置
    • 增强 PowerShell
归档
Github (opens new window)
首页
  • Java

    • 面向对象的思想OOP
    • 浅谈Java反射原理
    • endorsed覆盖JDK中的类
  • 认证与授权

    • LDAP概念和原理介绍
    • OAuth2介绍
  • Impala

    • Impala 介绍
  • MySQL

    • 关于MySQL的一些面试题
    • 解决MySQL不到中文数据
    • 数据库之事务与实现原理
  • Oracle

    • oracle的表空间,用户管理,表操作,函数
    • oracle的查询、视图、索引
    • plsql简单入门
  • Redis

    • 数据类型详解
    • 跳越表
    • 数据持久化的两种方式
  • 共识算法

    • gossip
  • RPC

    • GRPC初识与快速入门
    • ProtocolBuffer基本语法
  • RabbitMQ

    • RabbitMQ入门程序之HelloWorld
    • RabbitMQ之工作模式
  • Zookeeper

    • Zookeeper一文入门
  • Docker

    • Docker入门初体验
  • Maven

    • 把自己的包到Maven中央仓库
    • Maven之自定义插件
  • Nginx

    • nginx的安装
    • nginx的配置文件
    • nignx 的变量
  • Tomcat

    • Servlet3通过SPI进行注册组件
  • Vagrant

    • vagrant 初始化
    • vagrant 常用配置
    • vagrant 自己制作 box
  • Linux

    • 启动方式 Systemd
    • 后台服务
    • 防火墙与 Iptables
  • 设计模式

    • 设计模式-代理
    • 设计模式-单例模式
    • 设计模式-迭代器
  • 分布式

    • CAP 理论
  • 数据结构

    • 数据结构之堆Heap
    • 数据结构之哈希表
    • 数据结构之队列
  • 计算机网络

    • HTTP与HTTPS详解
    • 浅谈DNS协议
    • ISP中的网络层
  • 算法

    • 常用查找算法及Java实现
    • 常用排序算法及Java实现
    • 迪杰斯特拉算法
  • 操作系统

    • 操作系统之进程调度算法
    • 操作系统之进程通讯IPC
    • 操作系统之内存管理
  • 抓包

    • 生成安卓系统证书
  • 加解密

    • 常见加密算法
    • 公开秘钥基础知识
    • RSA 解析
  • Windows

    • scoop 包管理
    • windows-terminal 配置
    • 增强 PowerShell
归档
Github (opens new window)
  • Java

    • 基础

    • 并发与多线程

    • 日志系统

    • 单元测试

    • JVM

    • Spring

      • 理解依赖注入与松耦合
      • Spring-Aop-注解方式食用
      • spring给util类注入bean
      • Spring之Bean的生命周期
      • Spring之Bean的作用域
      • Spring中Profiles的作用及原理浅析
      • SpringBoot中的服务端推送SSE
        • 优缺点对比
          • WebSocket
          • SSE
        • Springboot的Server实现
        • 客户端实现
          • Okhttp版本
          • Java原生实现
        • 最后
    • SpringBoot

    • 一些工具

  • 语言
  • Java
  • Spring
unclezs
2021-07-19
0
目录

SpringBoot中的服务端推送SSE

服务器发送事件(Server-sent Events)是一种服务器向客户端发送事件和数据的单向通讯。Server-Sent 事件指的是网页自动获取来自服务器的更新。以前也可能做到这一点,前提是网页不得不询问是否有可用的更新。通过服务器发送事件,更新能够自动到达。

# 优缺点对比

# WebSocket

  • 优点:双工通信
  • 缺点:需专门定义数据协议,解析数据流,且部分服务器支持不完善,后台例如java spring boot 2.1.2 仅支持websocket 1.0(最高已达1.3)

# SSE

  • 优点:开发简单,和传统的http开发几乎无任何差别,客户端开发简单,有标准支持(EventSource)
  • 缺点:和websocket相比,只能单工通信,建立连接后,只能由服务端发往客户端,且占用一个连接,如需客户端向服务端通信,需额外打开一个连接

# Springboot的Server实现

/**
 * @author blog.unclezs.com
 * @date 2021/07/17
 */
@RestController
@SpringBootApplication
public class SseServer implements ApplicationContextAware {
  private ApplicationContext context;


  public static void main(String[] args) {
    SpringApplication.run(SseServer.class, args);
  }

  @GetMapping(value = "/sse/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  public ResponseEntity<SseEmitter> sseServer(@PathVariable(value = "id") String id) {
    // 防止nginx缓存请求
    HttpHeaders httpHeaders = new HttpHeaders();
    httpHeaders.set("X-Accel-Buffering", "no");
    httpHeaders.setCacheControl(CacheControl.noCache());
    SseService manager = context.getBean(SseService.class);
    SseEmitter emitter = manager.registerSseEmitter(id);
    return ResponseEntity.ok().contentType(MediaType.TEXT_EVENT_STREAM).headers(httpHeaders).body(emitter);
  }

  @Override
  public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    this.context = applicationContext;
    new Thread(() -> {
      while (true) {
        try {
          // 模拟推送消息
          context.getBean(SseService.class).sendMessage("123", String.format("sse消息:【%s】", DateUtil.now()));
          Thread.sleep(5000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }).start();
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
 * @author blog.unclezs.com
 * @date 2021/07/17
 */
@Service
public class SseServiceImpl implements SseService {
  private static final Map<String, SseEmitter> SSE_EMITTERS = new HashMap<>();

  public SseEmitter registerSseEmitter(String id) {
    SseEmitter emitter = new SseEmitter(60L * 1000L);
    emitter.onCompletion(() -> System.out.println("SseEmitter is completed"));
    emitter.onTimeout(() -> System.out.println("SseEmitter is timed out"));
    emitter.onError((ex) -> System.out.println("SseEmitter got error:" + ex.getMessage()));
    SSE_EMITTERS.put(id, emitter);
    return emitter;
  }

  @Override
  public void sendMessage(String id, Object data) {
    SseEmitter emitter = SSE_EMITTERS.get(id);
    if (emitter == null) {
      return;
    }
    System.out.println("发送消息:" + id + " data: " + data);
    try {
      emitter.send(data);
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

# 客户端实现

# Okhttp版本

/**
 * @author blog.unclezs.com
 * @date 2021/07/17
 */
public class OkSseClient {
  public static void main(String[] args) {
    // 定义see接口
    Request request = new Request.Builder().url("http://127.0.0.1:8080/sse/123").build();
    OkHttpClient okHttpClient = new OkHttpClient.Builder()
            .connectTimeout(1, TimeUnit.DAYS)
            .readTimeout(1, TimeUnit.DAYS)//这边需要将超时显示设置长一点,不然刚连上就断开,之前以为调用方式错误被坑了半天
            .build();

    // 实例化EventSource,注册EventSource监听器
    RealEventSource realEventSource = new RealEventSource(request, new EventSourceListener() {
      private long callStartNanos;

      private void printEvent(String name) {
        long nowNanos = System.nanoTime();
        if (name.equals("callStart")) {
          callStartNanos = nowNanos;
        }
        long elapsedNanos = nowNanos - callStartNanos;
        System.out.printf("=====> %.3f %s%n", elapsedNanos / 1000000000d, name);
      }

      @Override
      public void onOpen(EventSource eventSource, Response response) {
        printEvent("onOpen");
      }

      @Override
      public void onEvent(EventSource eventSource, String id, String type, String data) {
        printEvent("onEvent");
        //请求到的数据
        System.out.println(data);
      }

      @Override
      public void onClosed(EventSource eventSource) {
        printEvent("onClosed");
      }

      @Override
      public void onFailure(EventSource eventSource, Throwable t, Response response) {
        t.printStackTrace();
        //这边可以监听并重新打开
        printEvent("onFailure");
      }
    });
    //真正开始请求的一步
    realEventSource.connect(okHttpClient);
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

# Java原生实现

/**
 * @author blog.unclezs.com
 * @date 2021/07/17
 */
public class SseClient {

  /**
   * 获取SSE输入流。
   *
   * @param urlPath /
   * @return /
   * @throws IOException /
   */
  public static InputStream getSseInputStream(String urlPath) throws IOException {
    URL url = new URL(urlPath);
    HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
    // 这儿根据自己的情况选择get或post
    urlConnection.setRequestMethod("GET");
    urlConnection.setDoOutput(true);
    urlConnection.setDoInput(true);
    urlConnection.setUseCaches(false);
    urlConnection.setRequestProperty("Connection", "Keep-Alive");
    urlConnection.setRequestProperty("Charset", "UTF-8");
    //读取过期时间(很重要,建议加上)
    urlConnection.setReadTimeout(20 * 1000);
    // text/plain模式
    urlConnection.setRequestProperty("Content-Type", "text/plain; charset=UTF-8");
    InputStream inputStream = urlConnection.getInputStream();
    return new BufferedInputStream(inputStream);
  }

  /**
   * 读取数据。
   *
   * @param is                /
   * @param sseMessageHandler /
   * @throws IOException /
   */
  public static void readStream(InputStream is, SseMessageHandler sseMessageHandler) throws IOException {
    try {
      BufferedReader reader = new BufferedReader(new InputStreamReader(is));
      String line;
      while ((line = reader.readLine()) != null) {
        // 处理数据接口
        sseMessageHandler.actMsg(is, line);
      }
      // 当服务器端主动关闭的时候,客户端无法获取到信号。现在还不清楚原因。所以无法执行的此处。
      reader.close();
    } catch (IOException e) {
      e.printStackTrace();
      throw new IOException("关闭数据流!");
    }
  }

  public static void main(String[] args) throws IOException {
    String urlPath = "http://localhost:8080/sse/123";
    InputStream inputStream = getSseInputStream(urlPath);
    readStream(inputStream, (is, line) -> System.out.println(line));
  }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

# 最后

本文中的代码源码 (opens new window)

在 GitHub 编辑此页 (opens new window)
上次更新: 2024/02/25, 12:11:11
Spring中Profiles的作用及原理浅析
SpringBoot 监控

← Spring中Profiles的作用及原理浅析 SpringBoot 监控→

Theme by Vdoing | Copyright © 2018-2024 unclezs
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式