Spring WebFlux
# 1. Spring-WebFlux 简介
# 1. 什么是 Spring-WebFlux
Spring 5.0 添加了 Spring-WebFlux 模块将默认的 web 服务器改为 Netty,支持 Reactive 应用,它的特点是:
- 完全非阻塞式的(non-blocking)
- 支持 Reactive Stream 背压(Reactive Streams back pressure)
- 运行在 Netty, Undertow, and Servlet 3.1+ 容器
Spring MVC 构建于 Servlet API 之上,使用的是同步阻塞式 I/O 模型,每一个请求对应一个线程去处理
# 2 Spring WebFlux 与 Spring MVC 的区别
Spring WebFlux 与 Spring MVC 的关系如下图
Spring WebFlux 并不是为了替代 Spring MVC 的,它与 Spring MVC 一起形成了两套 WEB 框架。两套框架有交集比如对 @Controller 注解的使用,以及均可使用 Tomcat、Jetty、Undertow 作为 Web 容器。
Spring MVC 还是 WebFlux?
官方建议:
- 如果已经有了一个运行良好的 SpringMVC 应用程序,则无需更改。命令式编程是编写、理解和调试代码的最简单方法,我们可以选择最多的库,因为从历史上看,大多数都是阻塞的。
- 如果是个新应用且决定使用 非阻塞 Web 技术栈,那么 WebFlux 是个不错的选择。
- 对于使用 Java8 Lambda 或者 Kotlin 且 要求不那么复杂的小型应用程序或微服务来说,WebFlux 也是一个不错的选择
- 在微服务架构中,可以混合使用 SpringMVC 和 Spring WebFlux,两个都支持基于注解的编程模型
- 评估应用程序的一种简单方法是检查其依赖关系。如果要使用阻塞持久性 API(JPA、JDBC)或网络 API,那么 Spring MVC 至少是常见架构的最佳选择
- 如果有一个调用远程服务的 Spring MVC 应用程序,请尝试响应式WebClient
- 对于一个大型团队,向非阻塞、函数式和声明式编程转变的学习曲线是陡峭的。在没有全局开关的情况下,想启动 WebFlux,可以先使用 reactive WebClient。此外,从小处着手并衡量收益。我们预计,对于广泛的应用,这种转变是不必要的。
提示
这里需要自己或者团队从各方面评估是否需要转换或者升级,确保你的改变能带来巨大收益,否则维持现状才是最佳
# 3. 优点 (为什么要用 WebFlux?)
- 吞吐量
随着每个请求的被处理时间越长、并发请求的量级越大,WebFlux 相比 SpringMVC 的整体吞吐量高的越多,平均的请求响应时间越短
- 非阻塞 (Netty)
Netty 所用的 Reactor 线程模型,就解决了阻塞 IO 的问题,具体来讲,它使用的是主从 Reactor 多线程模型
同时 Netty 自身也很好地利用了 IO 多路复用、epoll 优化、零拷贝等技术,极大程度上优化了 IO 的性能。 SpringWebFlux 底层也依赖了 Netty ,所以也获得了 Netty 带来的优势。这一点可以概括为应用的弹性或伸缩性。根据实际请求量的大小进行资源的伸缩。
# 2. 代码演示(+部分代码解析)
# 1. 代码准备
- 新建 SpringBoot 项目
- java version: 11
- jdk version: 11
- SpringBoot version: 2.7.3
- 添加项目依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
- 添加测试 Controller
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/webflux")
public class HelloController {
@GetMapping("/hello")
public Mono<String> hello() {
return Mono.just("Hello Spring Webflux");
}
}
- 测试
curl http://localhost:9099/webflux/hello
Hello Spring Webflux
- 测试返回对象
// HelloController.java
@GetMapping("/posts")
public Flux<Post> posts() {
WebClient webClient = WebClient.create();
Flux<Post> postFlux = webClient.get().uri("http://jsonplaceholder.typicode.com/posts").retrieve().bodyToFlux(Post.class);
return postFlux;
}
// Post.java
import lombok.Data;
import lombok.NoArgsConstructor;
@NoArgsConstructor
@Data
public class Post {
private Integer userId;
private Integer id;
private String title;
private String body;
}
http://localhost:9099/webflux/posts
WebClient
WebClient 是 Spring5 以后提供的,可以替代 RestTemplate,我们利用 WebClient 请求 jsonplaceholder 提供的 json 对象数组,将返回的结果映射成为 Post 对象,然后直接将 Post 对象列表返回给客户端
# 2. Mono 和 Flux (响应式编程中发布者角色)
可以看到启动日志中 Server 是 Netty
- Mono:返回 0 或 1 个元素,即单个对象
- Flux:返回 N 个元素,即 List 列表对象
# 3. Stream
// HelloController.java
@GetMapping(value = "/flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> flux() {
Flux<String> flux = Flux.fromArray(new String[]{"a", "b", "c", "d"}).map(s -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "<letter:" + s + ">";
});
return flux;
}
结果
SSE
这里的效果是因为 SSE (Server Send Event)
MediaType.TEXT_EVENT_STREAM_VALUE
服务器向浏览器推送消息的一种方案 text/event-stream
# 4. 请求分发
Spring MVC 的前端控制器是 DispatcherServlet
, 而 WebFlux 是 DispatcherHandler
,它实现了 WebHandler 接口
// DispatcherHandler.java
public Mono<Void> handle(ServerWebExchange exchange) {
if (this.handlerMappings == null) {
return this.createNotFoundError();
} else {
return CorsUtils.isPreFlightRequest(exchange.getRequest())
? this.handlePreFlight(exchange)
: Flux.fromIterable(this.handlerMappings)
.concatMap((mapping) -> {
return mapping.getHandler(exchange);
})
.next().switchIfEmpty(this.createNotFoundError()).flatMap((handler) -> {
return this.invokeHandler(exchange, handler);
})
.flatMap((result) -> {
return this.handleResult(exchange, result);
});
}
}
# 3. 与数据库连接 (opens new window)
# 1. 添加项目依赖
<!-- r2dbc mysql-->
<dependency>
<groupId>dev.miku</groupId>
<artifactId>r2dbc-mysql</artifactId>
<version>0.8.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
<version>2.7.3</version>
</dependency>
# 2. 创建连接工厂
- 建连接工厂
// config/ConnectionFactoryConfiguration.java
package com.example.demo.config;
import dev.miku.r2dbc.mysql.MySqlConnectionConfiguration;
import dev.miku.r2dbc.mysql.MySqlConnectionFactory;
import io.r2dbc.spi.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConnectionFactoryConfiguration {
@Bean
public ConnectionFactory connectionFactory() {
return MySqlConnectionFactory.from(MySqlConnectionConfiguration.builder()
.host("localhost")
.port(3306)
.username("root")
.password("root")
.database("r2dbc")
.build());
}
}
- 创建配置类,用来启用 R2DBC 技术连接 MySQL
package com.example.demo.config;
import io.r2dbc.spi.ConnectionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration;
import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories;
@Configuration
@EnableR2dbcRepositories
public class R2dbcConfiguration extends AbstractR2dbcConfiguration {
private final ConnectionFactory connectionFactory;
public R2dbcConfiguration(@Qualifier("connectionFactory") ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
@Override
public ConnectionFactory connectionFactory() {
return this.connectionFactory;
}
}
推荐
当你的项目简单不需要定制连接工厂可以采用配置文件方式
步骤1和步骤2 可以替换为 application.yml
spring:
r2dbc:
url: r2dbc:mysql://localhost:3306/r2dbc?characterEncoding=UTF-8
username: root
password: root
name: r2dbc
pool:
validation-query: SELECT 1
enabled: true
# 3. 利用 ReactiveCrudRepository
创建仓储
创建数据库并添加 PersonRepository
以及 Person 实体
create database r2dbc character set utf8;
use r2dbc;
CREATE TABLE `person` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
`age` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
// PersonRepository.java
package com.example.demo.repository;
import com.example.demo.vo.Person;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface PersonRepository extends ReactiveCrudRepository<Person, Integer> {
}
// Person.java
package com.example.demo.vo;
import lombok.Data;
@Data
public class Person {
private int age;
private String name;
private int id;
}
# 4. 添加 DBController.java
package com.example.demo.controller;
import com.example.demo.repository.PersonRepository;
import com.example.demo.vo.Person;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
@RestController
@RequestMapping("dbtest")
public class DBController {
private final PersonRepository personRepository;
public DBController( PersonRepository personRepository) {
this.personRepository = personRepository;
}
@GetMapping("persons")
public Flux<Person> persons() {
return personRepository.findAll();
}
}
- 测试
GET
http://localhost:9099/dbtest/persons
HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: application/json
[
{
"age": 10,
"name": "张三",
"id": 1
},
{
"age": 20,
"name": "李四",
"id": 2
}
]
# 4. 总结
Reactive Programming
作为观察者模式(Observer) 的延伸,不同于传统的命令编程方式( Imperative programming)同步拉取数据的方式,如迭代器模式(Iterator) 。而是采用数据发布者同步或异步地推送到数据流(Data Streams)的方案。
当该数据流(Data Steams)订阅者监听到传播变化时,立即作出响应动作。在实现层面上,Reactive Programming
可结合函数式编程简化面向对象语言语法的臃肿性,屏蔽并发实现的复杂细节,提供数据流的有序操作,从而达到提升代码的可读性,以及减少 Bugs 出现的目的。同时,Reactive Programming
结合背压(Backpressure
)的技术解决发布端生成数据的速率高于订阅端消费的问题。
如果说:
- Spring Cloud 是从【宏观系统层面的开发】角度在实践健壮的高可用系统+系统运维
- K8S 在【DEV OPS】层面实践更好的系统运维
- Service Mesh 在【基础设施层(infra)】实践健壮的高可用系统+系统运维,
- 那么 WebFlux(包括整个 Reactive Stack 体系的其他成员)就是从【微观项目层面的开发】角度在实践健壮的高可用系统+系统运维
# 参考
- https://mp.weixin.qq.com/s/zdSELhxMPokeE7y71vvbhw
- https://blog.csdn.net/weixin_43653599/article/details/113975523