웹플럭스에서는 논블럭킹 방식으로 데이터베이스에 접근한다.
현재까지 연결하여 지원가능한 관계형 데이터베이스는 5개이다. (2020.03 기준)
1. Postgres (io.r2dbc:r2dbc-postgresql)
2. H2 (io.r2dbc:r2dbc-h2)
3. Microsoft SQL Server (io.r2dbc:r2dbc-mssql)
4. MySQL (com.github.mirromutth:r2dbc-mysql)
5. jasync-sql MySQL (com.github.jasync-sql:jasync-r2dbc-mysql)
나머지 데이터 베이스는 지원하더라도 Blocking 방식으로 지원되므로...위 언급한 5개의 데이터 베이스를 제외하고는 아직 사용하지 않는 것이 좋다고 한다.
왜나하면, 웹플럭스는 논블럭킹 방식으로 만들어져 있는데..데이터베이스에서 갑자기 블럭해버리면 속도 및 성능이 저하되기 때문이다.
이번에는 Mysql을 사용하여 기능을 한번 구현하여 보았다.
필요한 라이브러리 종류이다. 2개는 커넥션과 관련된 라이브러리이고, 1개는 드라이버 종류이다.
* 메이븐 기준
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-pool</artifactId>
<version>0.8.0.RELEASE</version>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-pool</artifactId>
<version>0.8.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>dev.miku</groupId>
<artifactId>r2dbc-mysql</artifactId>
<version>0.8.1.RELEASE</version>
</dependency>
r2dbc라는 라이브러리를 사용하면 논블럭킹에 적합한..
복잡한 말을 쓰지말자.
r2dbc라이브러리를 활용해야 웹플럭스에 적합한 형태로 데이터베이스에 연결할 수 있다는 점을 기억하자.
첫번째 작업은 커넥션을 만들고 만든 커넥션을 pool 형태로 관리를 해 주는 작업을 하는 것 이다.
private ConnectionFactory factory;
private ConnectionPool pool;
@PostConstruct
public void init() {
factory = ConnectionFactories.get("r2dbcs:pool:mysql://비번:아이디@주소:포트/db명");
ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(factory)
.maxIdleTime(Duration.ofMillis(1000))
.maxSize(20)
.build();
pool = new ConnectionPool(configuration);
}
PostConstruct를 활용하여 빈 객체가 생성된 뒤에 데이터베이스 커넥션과 관련된 내용이 동작하도록 하였다.
크게 어렵지 않는 코드이다.
여기서 사용할 대표적인 객체는 바로 pool 객체이다. pool객체를 활용하여 데이터베이스와 연동하면 된다.
매번 그랬듯 요녀석도 Mono와 Flux를 활용하여 데이터베이스와 관련된 기능을 작성 해 주어야 한다.
먼저 단일 정보를 가져오는 기능을 만들어 보자.
public Mono<HashMap<Object, Object>> selectSingle() {
Mono<HashMap<Object, Object>> mono = Mono.from(pool.create()).map(connection -> //커넥션 가공
Flux.from(connection.createStatement("select name, desc, date from test where id_=?ids").bind("ids", "2").execute())
.concatMap( result-> //1차 변환
result.map((row, rowMetadata)-> { //결과 재 조립 후 리턴
HashMap<Object, Object> item = new HashMap<>();
item.put("names", row.get("name",String.class));
item.put("desc", row.get("desc",String.class));
item.put("date", row.get("date",Object.class));
return item;
})
).doFinally( (st)->{connection.close();})
).flatMap( ccc -> Mono.from(ccc)); //2차 변환
return mono;
}
세상에...반응형 코딩이 알아보기 어렵다고는 하지만 정말 이정도일줄은 몰랐다.
단순히 결과를 매핑하는데만 해도 이정도의 코드가 들어가야한다니..
재미있는 것은 해당 코드에서 바라보고 있는 test테이블의 데이터가 몇십 또는 몇백이상의 데이터가 존재하더라도 결과는 항상 1개라는 것 이다.
왜냐하면, Mono라는 녀석은 결과를 0~1개만 갖을 수 있기 때문이다.
그러면, 해당 내용을 n개를 갖을 수 있도록 Flux를 사용하여 보자.
public Flux<HashMap<Object, Object>> selectMany() {
Flux<HashMap<Object, Object>> flux = Flux.from(pool.create()).concatMap(connection -> //커넥션 가공 및 1차 변환
Flux.from(connection.createStatement("select name, desc, date from test where id_=?ids").bind("ids", "2").execute())
.concatMap( result-> //2차 변환
result.map((row, rowMetadata)-> { //결과 재 조립 후 리턴
HashMap<Object, Object> item = new HashMap<>();
item.put("names", row.get("name",String.class));
item.put("desc", row.get("desc",String.class));
item.put("date", row.get("date",Object.class));
return item;
})
).doFinally( (st)->{connection.close();})
);
return flux;
}
Flux를 사용해서 결과를 n개로 가져올 수 있도록 변환하였다.
마찬가지로 엄청나게 복잡한 코드가 압박으로 다가온다. 과연 저 코드를 보고 단번에 이해할 수 있을까..?
쿼리같은 경우는 bind 메소드를 호출하여 필요한 데이터를 넣어주면 되며, "?값"을 통해서 바꿀 필드명을 입력하면 된다.
그러면 등록, 수정 및 삭제는 어떻게 하는 것 일까?
단순하게도 createStatement 부분에 쿼리를 바꾸면 간단하게 해결된다.
public Mono<? extends Result> insertTest() {
return Mono.from(pool.create()).map( con ->
Mono.from(
con.createStatement(" insert into test(name, desc, date) values (?name, ?desc, ?date) ")
.bind("name", "abcd").bind("desc", "desc").bind("date", LocalDateTime.now()).execute()
).doFinally( st-> con.close()).map( result -> result)
).flatMap( c-> Mono.from(c));
}
최종 소스코드 모습이다.
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.HashMap;
import javax.annotation.PostConstruct;
import org.springframework.stereotype.Repository;
import io.r2dbc.pool.ConnectionPool;
import io.r2dbc.pool.ConnectionPoolConfiguration;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.Result;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Repository
public class DatabaseConfig {
private ConnectionFactory factory;
private ConnectionPool pool;
//연결
@PostConstruct
public void init() {
factory = ConnectionFactories.get("r2dbcs:pool:mysql://비번:아이디@주소:포트/db명");
ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(factory)
.maxIdleTime(Duration.ofMillis(1000))
.maxSize(20)
.build();
pool = new ConnectionPool(configuration);
}
//단일 셀렉트, 조회할 테이블 데이터가 n개이면 이런식으로 쓰면 안된다.
public Mono<HashMap<Object, Object>> selectSingle() {
Mono<HashMap<Object, Object>> mono = Mono.from(pool.create()).map(connection -> //커넥션 가공
Flux.from(connection.createStatement("select name, desc, date from test where id_=?ids").bind("ids", "2").execute())
.concatMap( result-> //1차 변환
result.map((row, rowMetadata)-> { //결과 재 조립 후 리턴
HashMap<Object, Object> item = new HashMap<>();
item.put("names", row.get("name",String.class));
item.put("desc", row.get("desc",String.class));
item.put("date", row.get("date",Object.class));
return item;
})
).doFinally( (st)->{connection.close();})
).flatMap( ccc -> Mono.from(ccc)); //2차 변환
return mono;
}
//단순 셀렉트 예제
public Flux<HashMap<Object, Object>> selectMany() {
Flux<HashMap<Object, Object>> flux = Flux.from(pool.create()).concatMap(connection -> //커넥션 가공 및 1차 변환
Flux.from(connection.createStatement("select name, desc, date from test where id_=?ids").bind("ids", "2").execute())
.concatMap( result-> //2차 변환
result.map((row, rowMetadata)-> { //결과 재 조립 후 리턴
HashMap<Object, Object> item = new HashMap<>();
item.put("names", row.get("name",String.class));
item.put("desc", row.get("desc",String.class));
item.put("date", row.get("date",Object.class));
return item;
})
).doFinally( (st)->{connection.close();})
);
return flux;
}
//저장 예제
public Mono<? extends Result> insertTest() {
return Mono.from(pool.create()).map( con ->
Mono.from(
con.createStatement(" insert into test(name, desc, date) values (?name, ?desc, ?date) ")
.bind("name", "abcd").bind("desc", "desc").bind("date", LocalDateTime.now()).execute()
).doFinally( st-> con.close()).map( result -> result)
).flatMap( c-> Mono.from(c));
}
}
여기서의 핵심은 createStatement에 원하는 쿼리를 써 주고 쿼리에 "?명칭" 을 입력하여 bind 메소드를 통해서 치환할 데이터를 입력하면 되는 부분이다.
코드량이 너무 복잡한 것으로 보아 이런식으로 사용했다가는 아무도 이해하지 못한 코드가 나올 것 같다.
만약 쿼리가 복잡하다면...Mybatis가 엄청 그리워 질 것 같다.
그러면, 이를 좀더 깔끔하게 해결하기위해서 JPA방식으로 바꾸면 어떨까?
다음번에는 몽고를 JPA방식으로 한번 써 보아야겠다. * 물론 r2dbc에서도 JPA형태를 지원하고 있습니다!
spring webflux!
* 내용을 채우고 수정중입니다.
* 튜토리얼이나 가이드 목적보다도 개념정리에 목적을 두고 쓰고있습니다. 틀린부분이나 누락된 부분은 꼭 알려주세요!
'Spring framework > Spring Webflux' 카테고리의 다른 글
spring webflux 8 (웹플럭스 적용기, 웹 필터, 인터셉터) (2) | 2020.04.06 |
---|---|
spring webflux 7 (웹플럭스 적용기, MongoDb) (0) | 2020.04.01 |
spring webflux 5 (웹플럭스 적용기, Mono와 Flux의 Processor) (0) | 2020.03.25 |
spring webflux 5 (웹플럭스 적용기, Mono와 Flux 자주 보이는 map) (0) | 2020.03.20 |
spring webflux 4 (웹플럭스 적용기, Mono와 Flux) (0) | 2020.03.18 |
댓글