79、SpringBoot 整合 R2DBC — R2DBC 就是 JDBC 的 反应式版本, R2DBC 是 JDBC 的升级版。
★ 何谓R2DBC
R2DBC 就是 JDBC 的 反应式版本, R2DBC 是 JDBC 的升级版。
R2DBC 是 Reactive Relational Database Connectivity (关系型数据库的响应式连接) 的缩写
反应式的就是类似于消息发布者和订阅者,有消息就进行推送。R2DBC中DAO接口中方法的返回值是 Flux 或 Mono
因此,反应式的 R2DBC ,是不存在 【分页】 这种情况的。
JDBC 或者 R2DBC 都是用来对数据库进行操作的
★ Spring R2DBC
Spring Data 为 JDBC 提供了 Spring Data JDBC 项目,为 R2DBC 则提供了 Spring Data R2DBC 项目。 早期Spring项目并未包含Spring R2DBC模块,而是由Spring Data R2DBC项目来提供Spring R2DBC功能。 从Spring 5.3开始,R2DBC才从Spring Data R2DBC中分离成Spring的Spring R2DBC模块。 从Spring 5.3开始, Spring为支持R2DBC单独提供了Spring R2DBC模块。 ▲ 对比 传统: JDBC Spring JDBC(需要自己用JdbcTemplate实现DAO组件的实现类) Spring Data JDBC(只需提供接口) 反应式:R2DBC Spring R2DBC Spring Data R2DBC

★ 两个版本的 DatabaseClient:
- DatabaseClient 是 Spring Data R2DBC 的核心API。 - 早期 DatabaseClient 和它的内部类提供了大量的流式API来拼接SQL(非常像jOOQ), 例如通过select()方法模拟SQL的SELECT子句、通过from()方法模拟SQL的FROM子句 ……但这些方法的参数都是String类型,因此这样做的意义并不大,被淘汰了。 - 从Spring Data R2DBC 1.2版本开始,推荐使用 Spring R2DBC 的 DatabaseClient ——新设计的DatabaseClient直接使用sql()方法来接受String类型的SQL语句,这样更加简单、粗暴。
★ Spring Data R2DBC的功能(完全类似于Spring Data JDBC)
SpringBoot 整合 Spring Data JDBC
- DAO接口只需继承 ReactiveCrudRepository 或 ReactiveSortingRepository, Spring Data R2DBC能为DAO组件提供实现类。 - Spring Data R2DBC 支持方法名关键字查询、类似于 Spring Data JDBC。 - Spring Data R2DBC 支持用 @Query 定义查询语句 - Spring Data R2DBC 同样支持 DAO组件 添加自定义的查询方法。 - 类似 Spring Data JDBC,同样不支持 Example 查询 和 Specification 查询
★ Spring Data R2DBC 的映射(完全类似于Spring Data JDBC)
- 与Spring Data JDBC相同,同样默认使用“约定优于配置”的同名映射策略。同样支持如下注解: - @Table:映射自定义的表名。 - @Column注解:映射自定义的列名 - @Id注解:修饰标识属性 - @PersistenceConstructor:修饰主构造器
★ Spring Data R2DBC的变化
- 与Spring Data JDBC不同,R2DBC中DAO接口中方法的返回值是 Flux 或 Mono 区别:Spring Data JDBC 的 Dao 接口中的方法的返回值,基本是List集合或者是单个对象。 - 当要实现自定义的查询方法时,不再使用JdbcTemplate, 而是使用DatabaseClient(相当于JdbcTemplate)。 JdbcTemplate或者是DatabaseClient,都是被封装后,用来对数据库进行操作的
【注意点】
1、 如果你要使用反应式API来访问数据库,请务必使用数据库的反应式驱动,传统驱动是不行的。

把导入的依赖的截图显示下

2、 连接R2DBC的数据库的连接信息也是不同的, URL的协议不再是jdbc,而是r2dbc

3、 如果你要测试反应式DAO组件,请一定要使用block来保证执行完成。
因为:
block 是阻塞线程,一定要等待 Mono 或 flux 发布数据为止,所以这个 block 只能用在测试用例中
block 就相当于在等 消息发布者Mono 发布数据,如果没有发布数据,就一直等,阻塞,相当于把反应式又变成了同步的数据读取方式
如果要测试反应式API,可调用 block 方法来同步获取数据,因为查询可能有数据,也可能查不到数据,所以 block方法有几种:
.block():表示一定要取到值,一定要有数据 ; .blockOptional():如果取不到值,就返回一个null,或者是返回Optional

代码演示:
演示通过 R2DBC(关系型数据库的响应式连接) 反应式的 进行数据库连接查询。
上面,pom.xml 已经添加了对应的 R2DBC 依赖和反应式的数据库驱动,然后配置文件也改成了R2DBC 模式
接下来就是写 DAO 组件了。

自定义的Dao组件

自定义dao组件的接口实现类
当要实现自定义的查询方法时,不再使用JdbcTemplate,而是使用DatabaseClient(相当于JdbcTemplate)

这个是User类

还需要配置数据库的连接

测试:
一些方法的作用:
.block(): block 是阻塞线程,一定要等待 Mono 或 flux 发布数据为止,
表示一定要取到值,一定要有数据
所以这个 block 只能用在测试用例中
block 就相当于在等消息发布者Mono发布数据,如果没有发布数据,就一直等,阻塞,
相当于把反应式又变成了同步的数据读取方式
.blockOptional() :如果取不到值,就返回一个null,或者是返回Optional
.ifPresent(System.err::println):判断是否有数据,当数据不为空的时候,就调用打印功能
.subscribe(System.err::println):消息订阅者subscribe 获取数据
.collectList():将 Flux 中所有数据搜集成 Mono
测试都是通过的,注意的方法都在截图中解释






pom文件

完整代码:
User
package cn.ljh.app.domain;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.PersistenceConstructor;
import org.springframework.data.relational.core.mapping.Column;
import org.springframework.data.relational.core.mapping.Table;
/**
* author JH
*/
@Data
//此处不能添加JPA注解,因为此项目没有用到 JPA
@Table("user_inf")
public class User
{
@Column(value = "user_id")
@Id
private Integer id;
private String name;
private String password;
private int age;
/**
* @PersistenceConstructor
* 修饰主构造器。当你的映射类中有多个构造器时,
* 你希望Spring Data JDBC用哪个构造器来创建对象,就用该注解来修饰该构造器
*/
@PersistenceConstructor
public User()
{
}
public User(Integer id, String name, String password, int age)
{
this.id = id;
this.name = name;
this.password = password;
this.age = age;
}
@Override
public String toString()
{
return "User{" +
"id=" + id +
", name='" + name + '\'' +
", password='" + password + '\'' +
", age=" + age +
'}';
}
}
UserDao
package cn.ljh.app.dao;
import cn.ljh.app.domain.User;
import org.springframework.data.r2dbc.repository.Modifying;
import org.springframework.data.r2dbc.repository.Query;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
//ReactiveCrudRepository 是 Spring Data R2DBC 的反应式 API
public interface UserDao extends ReactiveCrudRepository, CustomUserDao
{
// 继承 ReactiveCrudRepository 接口后,就已经有通用的 CRUD 操作,无需自己来书写这些方法
//Spring Data R2DBC 的 DAO 组件中的所有方法的返回值类型要么是 MONO ,要么是 Flux,
//MONO 或者 Flux 都继承了 Publisher,返回多条数据,用 Flux 类型接收, 返回一条数据,用 MONO 类型接收
//方法名关键字查询---------全自动
//根据名字模糊查询
Flux findByNameLike(String namePattern);
//根据年龄大小进行范围查询
Flux findByAgeGreaterThan(int startAge);
//根据年龄区间进行范围查询
Flux findByAgeBetween(int startAge, int endAge);
//@Query 查询 --->自己定义查询语句-----半自动
//rowMapperClass 或 rowMapperRef 是用来做自定义映射,查询出来的User对象的数据,映射到Student对象的属性上面去都可以,因为是自定义的。
//根据密码模糊查询
@Query("select * from user_inf where password like :passwordPattern")
Flux findByPassword(String passwordPattern);
//根据年龄范围修改名字
@Query("update user_inf set name = :name where age between :startAge and :endAge")
//更改数据库数据需要用到这个注解
@Modifying
Mono updateNameByAge(String name, int startAge, int endAge);
}
CustomUserDao
package cn.ljh.app.dao;
import cn.ljh.app.domain.User;
import reactor.core.publisher.Flux;
//自己定义的接口,用来实现全手动的查询
public interface CustomUserDao
{
//通过名字进行模糊查询
Flux customQuery(String namePattern);
}
CustomUserDaoImpl
package cn.ljh.app.dao.impl;
import cn.ljh.app.dao.CustomUserDao;
import cn.ljh.app.domain.User;
import lombok.SneakyThrows;
import org.springframework.r2dbc.core.DatabaseClient;
import reactor.core.publisher.Flux;
//自定义接口的实现类
public class CustomUserDaoImpl implements CustomUserDao
{
private DatabaseClient databaseClient;
//通过有参构造器进行依赖注入
public CustomUserDaoImpl(DatabaseClient databaseClient)
{
this.databaseClient = databaseClient;
}
@SneakyThrows
@Override
public Flux customQuery(String namePattern)
{
//使用 DatabaseClient 来访问数据库,这个 API(DatabaseClient) 就相当于 传统的 JdbcTemplate
// :0 代表第一个占位符,jDBCTemplate 是用 "?" 做占位符。 如果写 :aaa ,那么就是命名参数做占位符 , :0 就是未知参数做占位符
Flux userFlux = this.databaseClient.sql("select * from user_inf where name like :0")
//为占位符绑定参数
.bind(0, namePattern)
//把查询出来的一行行数据(Row)转成user对象
//map的作用就是将一行(Row)数据 转成 一个目标对象(比如这里目标对象是User)
.map(row -> new User(
row.get("user_id", Integer.class),
row.get("name", String.class),
row.get("password", String.class),
row.get("age", Integer.class)
))
//就是查询取出所有数据
.all();
return userFlux;
}
}
UserDaoTest
package cn.ljh.app;
import cn.ljh.app.dao.UserDao;
import cn.ljh.app.domain.User;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.Optional;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
public class UserDaoTest
{
@Autowired
private UserDao userDao;
// 继承 ReactiveCrudRepository 接口后,就已经有通用的 CRUD 操作,无需自己来书写这些方法==============
//添加user对象
@ParameterizedTest
@CsvSource({"a11a,x11xx,2", "b11b,x11xx,3"})
public void testSave(String name, String password, int age)
{
//ReactiveCrudRepository 的 save 方法
Mono save = userDao.save(new User(null, name, password, age));
//如果要测试反应式API,可调用 block 方法来同步获取数据,因为查询可能有数据,也可能查不到数据,所以 block方法有几种:
//.block():表示一定要取到值,一定要有数据 ; .blockOptional():如果取不到值,就返回一个null,或者是返回Optional
Optional userOptional = save.blockOptional();
//判断是否有数据,当数据不为空的时候,就调用打印功能
userOptional.ifPresent(System.err::println);
}
//根据id查询对象
@ParameterizedTest
@ValueSource(ints = {1})
public void testFindById(Integer id)
{
//userMono 是数据发布者
Mono userMono = userDao.findById(id);
//如果要测试反应式DAO组件,一定要使用block来保证执行完成
//block 是阻塞线程,一定要等待 Mono 或 flux 发布数据为止,所以这个 block 只能用在测试用例中
//block 就相当于在等消息发布者Mono发布数据,如果没有发布数据,就一直等,阻塞,相当于把反应式又变成了同步的数据读取方式
Optional userOptional = userMono.blockOptional();
//判断是否有数据,当数据不为空的时候,就调用打印功能
userOptional.ifPresent(System.err::println);
}
//根据id删除用户对象
@ParameterizedTest
@ValueSource(ints = {24})
public void testDelete(Integer id)
{
Mono mono = userDao.deleteById(id);
//如果要测试反应式DAO组件,一定要使用block来保证执行完成,block的作用就是一定会等待着阻塞着直到拿到数据
Optional optional = mono.blockOptional();
}
//根据id修改对象 ,有id,save就是修改
@ParameterizedTest
@CsvSource({"13,aaa,xxxx,22"})
public void testUpdate(Integer id, String name, String password, int age)
{
//跟上面添加对象的写法一样
Mono save = userDao.save(new User(id, name, password, age));
Optional userOptional = save.blockOptional();
userOptional.ifPresent(System.err::println);
}
//根据名字模糊查询
@ParameterizedTest
@ValueSource(strings = {"孙%", "%精"})
public void testFindByNameLike(String namePattern) throws InterruptedException
{
//userFlux 数据发布者
Flux userFlux = userDao.findByNameLike(namePattern);
//除了用 block 这种同步的方式获取数据之外,还可以用这个 消息订阅者subscribe 获取数据。
//为 Flux 指定了一个消息订阅者,消息订阅者是一个 Lambda 表达式,只是负责将数据打印出来
//如果只是用 subscribe ,因为反应式API是异步的,意味着有可能数据还没有到来,测试方法就已经执行结束了
userFlux.subscribe(System.err::println);
//让测试方法暂定1秒,等到Flux的数据到来---只适合数据量小的时候
Thread.sleep(1000);
}
//根据年龄大小进行范围查询
@ParameterizedTest
@ValueSource(ints = {500, 10})
public void testFindByAgeGreaterThan(int startAge) throws InterruptedException
{
//userFlux 数据发布者
Flux userFlux = userDao.findByAgeGreaterThan(startAge);
//将 Flux 中所有数据搜集成 Mono
Mono<List> listMono = userFlux.collectList();
//同步阻塞,一定要取出Mono中的所有数据
List userList = listMono.block();
//遍历打印
userList.forEach(System.err::println);
}
//根据年龄区间进行范围查询
@ParameterizedTest
@CsvSource({"15,20", "500,1000"})
public void testFindByAgeBetween(int startAge, int endAge)
{
//userFlux 数据发布者
Flux userFlux = userDao.findByAgeBetween(startAge, endAge);
//将 Flux 中所有数据搜集成 Mono
Mono<List> listMono = userFlux.collectList();
//同步阻塞,一定要取出Mono中的所有数据
List userList = listMono.block();
//遍历打印
userList.forEach(System.err::println);
}
//根据密码模糊查询
@ParameterizedTest
@ValueSource(strings = {"niu%", "%3"})
public void testFindBySql(String passwordPattern)
{
//userFlux 数据发布者
Flux userFlux = userDao.findByPassword(passwordPattern);
//将 Flux 中所有数据搜集成 Mono
Mono<List> listMono = userFlux.collectList();
//同步阻塞,一定要取出Mono中的所有数据
List userList = listMono.block();
//遍历打印
userList.forEach(System.err::println);
}
//根据年龄范围修改名字----修改
@ParameterizedTest
@CsvSource({"牛魔王aa,800,1000"})
//@Transactional 和 @Rollback(false) 是只有 spring data jdbc 才需要用到
//@Transactional //事务
//@Rollback(false) //测试的数据不进行回滚
public void testUpdateNameByAge(String name, int startAge, int endAge)
{
//mono消息发布者
Mono mono = userDao.updateNameByAge(name, startAge, endAge);
//blockOptional:同步方式获取数据,返回值为 Optional
Optional optionalInteger = mono.blockOptional();
//ifPresent :判断是否有数据,有数据则执行打印功能
optionalInteger.ifPresent(System.err::println);
}
//通过名字进行模糊查询,
@ParameterizedTest
@ValueSource(strings = {"孙%"})
public void testCustomQuery(String namePattern)
{
//userFlux 数据发布者
Flux userFlux = userDao.customQuery(namePattern);
//将 Flux 中所有数据搜集成 Mono
Mono<List> listMono = userFlux.collectList();
//同步阻塞,一定要取出Mono中的所有数据
List userList = listMono.block();
//遍历打印
userList.forEach(System.err::println);
}
}
application.properties
# 传统的mysql数据库连接 # spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver # spring.datasource.url=jdbc:mysql://localhost:3306/springboot?serverTimezone=UTC # spring.datasource.username=root # spring.datasource.password=123456 # R2DBC 反应式的数据库连接 # 或者不指定反应式的驱动,高版本的数据库基本可以不用我们去加载驱动,系统可以自己去识别驱动, # spring.datasource.driver-class-name=反应式API的驱动 spring.r2dbc.url=r2dbc:mysql://localhost:3306/springboot?serverTimezone=UTC spring.r2dbc.username=root spring.r2dbc.password=123456
pom.xml
4.0.0
org.springframework.boot
spring-boot-starter-parent
2.4.5
cn.ljh
DataR2DBC
1.0.0
DataR2DBC
11
org.springframework.boot
spring-boot-starter-data-r2dbc
dev.miku
r2dbc-mysql
mysql
mysql-connector-java
runtime
org.springframework.boot
spring-boot-devtools
runtime
true
org.projectlombok
lombok
true
org.springframework.boot
spring-boot-starter-test
test
org.springframework.boot
spring-boot-maven-plugin
org.projectlombok
lombok
本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/f79791dd2b.html
