Blog Entry
Quarkus中使用Hibernate Reactive进行异步保存
Quarkus中使用Hibernate Reactive进行异步保存
- Created
- 2023/08/09
- Updated
- 2023/08/09
环境
> java -versionopenjdk version "11.0.19" 2023-04-18OpenJDK Runtime Environment GraalVM CE 22.3.2 (build 11.0.19+7-jvmci-22.3-b18)OpenJDK 64-Bit Server VM GraalVM CE 22.3.2 (build 11.0.19+7-jvmci-22.3-b18, mixed mode, sharing)
> quarkus -version3.2.2.Finalquarkus extensions
<dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-hibernate-reactive-panache</artifactId></dependency>诉求
整体项目是基于reactive方式,查询数据库通过Hibernate Reactive,所以调用链都是通过Uni<?>来传递;
具体来说,在登录成功后,需要进行登录信息的入库,而这个入库操作成功与否并不是很重要,完全可以作为一个异步操作;
这只是一个小的应用场景,不排除有其他异步操作,像异步日志,异步通知之类。
相关类
package com.xkyii.spry.web.service;
import com.xkyii.spry.web.entity.SysLoginInfo;import com.xkyii.spry.web.entity.SysUser;import com.xkyii.spry.web.repository.SysLoginInfoRepository;import io.quarkus.hibernate.reactive.panache.Panache;import io.quarkus.hibernate.reactive.panache.common.WithTransaction;import io.smallrye.mutiny.Uni;import io.vertx.core.Vertx;import jakarta.enterprise.context.ApplicationScoped;import jakarta.inject.Inject;import org.jboss.logging.Logger;
import java.util.Date;
@ApplicationScopedpublic class SysLoginInfoService {
@Inject Logger logger;
@Inject SysLoginInfoRepository loginInfoRepository;
public Uni<SysLoginInfo> create(SysUser user) {
SysLoginInfo info = new SysLoginInfo(); info.setUserName(user.getUserName()); info.setIpaddr("127.0.0.1"); info.setLoginLocation("内网IP"); info.setBrowser("Chrome 11"); info.setOs("Unknown"); info.setStatus("1"); info.setMsg("成功"); info.setLoginTime(new Date());
logger.info("创建登录日志");
return loginInfoRepository.persist(info); }}尝试
Java方式 ❌
尝试了几种:
import java.util.concurrent.Executor;import java.util.concurrent.ExecutorService;import java.util.concurrent.ScheduledExecutorService;
@ApplicationScopedpublic class SysUserService {
@Inject Executor executor;
@Inject ExecutorService executorService;
@Inject ScheduledExecutorService scheduledExecutorService;
@WithTransaction public Uni<LoginOutput> login(LoginCommand input) {
String username = input.getUsername(); return userRepository.find("userName", username).firstResult() // 省略其他步骤 // ... // 保存登录日志 (想要个异步效果) .onItem().invoke(u -> {
// java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [245]: 'vert.x-eventloop-thread-3' current Thread [2456]: 'executor-thread-1' executor.execute(() -> { loginInfoService.create(u) .subscribe().with(x -> logger.info("创建登录日志 with executor"), Throwable::printStackTrace); });
// java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [244]: 'vert.x-eventloop-thread-2' current Thread [2589]: 'executor-thread-1' executorService.execute(() -> { loginInfoService.create(u) .subscribe().with(x -> logger.info("创建登录日志 with executorService"), Throwable::printStackTrace); });
// java.lang.IllegalStateException: HR000068: This method should exclusively be invoked from a Vert.x EventLoop thread; currently running on thread 'executor-thread-1' scheduledExecutorService.schedule(() -> { loginInfoService.create(u) .subscribe().with(x -> logger.info("创建登录日志 with scheduledExecutorService"), Throwable::printStackTrace); }, 10, TimeUnit.MILLISECONDS); } // 生成token .onItem().transform(u -> new LoginOutput(tokenService.generateToken(u))) ; }}这几种方式,用来运行常规的异步任务应该是没问题的,但是与hibernate reactive配合并不成功;
Vertx方式 ❌
@ApplicationScopedpublic class SysUserService {
@WithTransaction public Uni<LoginOutput> login(LoginCommand input) {
String username = input.getUsername(); return userRepository.find("userName", username).firstResult() // 省略其他步骤 // ... // 保存登录日志 (想要个异步效果) .onItem().invoke(u -> {
// 1 // 运行在同一个线程,没报错,但是也没有保存成功。 Vertx.currentContext().runOnContext(e -> { loginInfoService.create(u) .subscribe().with(x -> logger.info("创建登录日志 with Vertx.currentContext()"), Throwable::printStackTrace); });
// 2 // java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [245]: 'vert.x-eventloop-thread-3' current Thread [1056]: 'vert.x-worker-thread-2' vertx.executeBlocking(e -> { loginInfoService.create(u) .emitOn(Infrastructure.getDefaultWorkerPool()) .subscribe().with(x -> logger.info("创建登录日志 with vertx.executeBlocking"), Throwable::printStackTrace); }); } // 生成token .onItem().transform(u -> new LoginOutput(tokenService.generateToken(u))) ; }}- 没有报错,但是数据并没有成功入库,应该是没有正确开启
session的缘故,并且由于和当前的调用一定会运行在同一个线程,所以并不能达到效果 - 报错了
Smallrye方式 ❌
@ApplicationScopedpublic class SysUserService {
@WithTransaction public Uni<LoginOutput> login(LoginCommand input) {
String username = input.getUsername(); return userRepository.find("userName", username).firstResult() // 省略其他步骤 // ... // 保存登录日志 (想要个异步效果) .onItem().invoke(u -> {
// 1 // 保存成功,但是实际上没有达到异步效果,只是subscribe在另一个线程而已 loginInfoService.create(u) .emitOn(Infrastructure.getDefaultWorkerPool()) .subscribe().with(x -> logger.info("创建登录日志 emitOn Infrastructure.getDefaultWorkerPool()"), Throwable::printStackTrace);
// 2 // java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [244]: 'vert.x-eventloop-thread-2' current Thread [1911]: 'executor-thread-1' loginInfoService.create(u) .runSubscriptionOn(Infrastructure.getDefaultWorkerPool()) .subscribe().with(x -> logger.info("创建登录日志 runSubscriptionOn Infrastructure.getDefaultWorkerPool()"), Throwable::printStackTrace); } // 生成token .onItem().transform(u -> new LoginOutput(tokenService.generateToken(u))) ; }}- 不是异步效果,执行仍然在当前线程,只是订阅结果到
worker线程了 - 执行在异步线程,但是报错了
EventLoop ✔
经过前面的尝试,以及对报错内容分析,其实已经大致找到了原因,简单来说就是hibernate reactive session不是线程安全的,所以必须运行在Vert.x的EventLoop thread,不能是worker thread或者其他线程;
整理了一个方案,有点蹩脚,但是简单测试一下是可以用:
import io.vertx.core.Vertx;import io.vertx.core.impl.ContextInternal;import io.vertx.core.impl.EventLoopContext;import io.vertx.core.impl.VertxInternal;
import static io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setContextSafe;
@ApplicationScopedpublic class SysUserService {
@WithTransaction public Uni<LoginOutput> login(LoginCommand input) {
String username = input.getUsername(); return userRepository.find("userName", username).firstResult() // 省略其他步骤 // ... // 保存登录日志 (想要个异步效果) .onItem().invoke(u -> {
// 既然只能运行在EventLoop thread,而当前eventloop thread又达不到异步效果,就尝试在别的eventloop thread上运行 VertxInternal vxi = (VertxInternal) vertx; Executor delegate = vertx.nettyEventLoopGroup(); EventLoopContext context = vxi.createEventLoopContext(); ContextInternal internal = (ContextInternal) VertxContext.getOrCreateDuplicatedContext(context); setContextSafe(internal, true); delegate.execute(() -> internal.dispatch(() -> { loginInfoService.create(u) .subscribe().with(x -> logger.info("创建登录日志 with getOrCreateDuplicatedContext"), Throwable::printStackTrace); })); } // 生成token .onItem().transform(u -> new LoginOutput(tokenService.generateToken(u))) ; }}EventBus ✔
这是推荐方式了,业务分离,实现简单
- 建立一个事件消费端,这里直接把
create方法调整了一下
package com.xkyii.spry.web.service;
import com.xkyii.spry.web.entity.SysLoginInfo;import com.xkyii.spry.web.entity.SysUser;import com.xkyii.spry.web.repository.SysLoginInfoRepository;import io.quarkus.hibernate.reactive.panache.common.WithTransaction;import io.quarkus.vertx.ConsumeEvent;import io.smallrye.mutiny.Uni;import jakarta.enterprise.context.ApplicationScoped;import jakarta.inject.Inject;import org.jboss.logging.Logger;
import java.util.Date;
@ApplicationScopedpublic class SysLoginInfoService {
@Inject Logger logger;
@Inject SysLoginInfoRepository loginInfoRepository;
@ConsumeEvent("SysLoginInfoService-create-with-SysUser") @WithTransaction public Uni<SysLoginInfo> create(SysUser user) {
SysLoginInfo info = new SysLoginInfo(); // info.setInfoId(180L); info.setUserName(user.getUserName()); info.setIpaddr("127.0.0.1"); info.setLoginLocation("内网IP"); info.setBrowser("Chrome 11"); info.setOs("Unknown"); info.setStatus("1"); info.setMsg("成功"); info.setLoginTime(new Date());
logger.info("创建登录日志");
return loginInfoRepository.persist(info); }}- 通过
EventBus调用即可:
import io.vertx.core.Vertx;
@ApplicationScopedpublic class SysUserService { @Inject Vertx vertx;
@WithTransaction public Uni<LoginOutput> login(LoginCommand input) {
String username = input.getUsername(); return userRepository.find("userName", username).firstResult() // 省略其他步骤 // ... // 保存登录日志 (想要个异步效果) .onItem().invoke(u -> { vertx.eventBus().publish("SysLoginInfoService-create-with-SysUser", u); } // 生成token .onItem().transform(u -> new LoginOutput(tokenService.generateToken(u))) ; }}