Spring framework/Spring Webflux

spring webflux 5 (웹플럭스 적용기, Mono와 Flux 자주 보이는 map)

마샤와 곰 2020. 3. 20. 16:19

 

* 자주 보이는 map

 

이번에는 연산자(함수) 중에서 자주 보이는 map에 대해서 살펴보자.

map은 Javascript와 Java의 스트림에서 사용하는 map과 동일한 기능을 수행한다.

 

아주 심플하게 사용되는 형태를 살펴보자.

public static void main(String[] args) {
    String text = "abcd";
    Mono<String> mono = Mono.just(text);
    mono.map( arg-> arg.length()).subscribe( str->{  //map을 통해서 string값을 길이로 바꾸었다.
        System.out.println(str);  //숫자 4가나온다.
    });
}

크게 어렵지 않는 내용이다.

text라는 문자를 구독하다가 map을 통해 길이로 바꾸어 주었고, 바꾼 값을 출력하게 하였다.

 

웹플럭스를 사용하다보면 map이 필요한 경우가 참 많이 나오는데..

조금 더 다른 형태의 경우를 살펴보자.

앞전 시간과 동일한 개발환경에 아래 two라는 이름을 가진 메소드를 추가하여보자.

private static void two() {
    List<String> array = new ArrayList<String>();
    array.addAll(Arrays.asList(new String[]{"a", "b", "c", "d", "e", "e"}));

    Flux.fromIterable(array).groupBy( arr -> arr).map( arg->{  //첫번째 map, arg는 GroupedFlux
        Mono<Map<Object, Object>> mono = arg.count().map( count -> { //두번째 map, arg.count()는 Mono<Long>
            Map<Object, Object> item = new HashMap<>();
            item.put(arg.key(), count);
            return item;				
        });
        return mono;
    }).subscribe( (data)->{	
        System.out.println(data); 
    });
}        

 

Flux에서 fromIterable이라는 메소드를 통해서 대상을 Flux객체로 만들었다.

그리고나서 groupBy와 map이라는 메소드를 붙여주었다.

Flux를 통해서 만들어진 객체를 groupBy를 실행하여 데이터를 묶는 역할을 하였다. (마치 sql의 group by 처럼!)

그리고나서 map을 통해서 해당 데이터에서의 key와 group하면서 세어진 갯수를 Map객체에 담아서 변경하였다.

map이 여러번 들어간 이유는 groupBy로 생성된 객체가 클래스 형태이기 때문이다.

 

그리하여 대충 짐작해보건데..

기대되는 모양세는 group한 key와 count를 통해서 Map형태의 결과가 나타날 것 같다.

그런데 막상 실행하여보면,

뭐지? 왠 영문이 튀어나오네?

 

뭔가 된 것 같긴한데 영문이 튀어나왔다.

먼저 첫번째 메소드를 보자.

groupBy라는 메소드는 결과를 GroupedFlux 클래스로 리턴하여준다.

오...map으로 매핑했을때의 결과가 저거였군..

 

 

그리고나서 나온 결과를 Mono<Map<Object, Object>> 의 결과로 map 하여 리턴 하였다.

주석을 한번 달아보면..

private static void two() {
    List<String> array = new ArrayList<String>();
    array.addAll(Arrays.asList(new String[]{"a", "b", "c", "d", "e", "e"}));

    Flux.fromIterable(array).groupBy( arr -> arr).map( arg->{  //첫번째 map, arg는 GroupedFlux
        Mono<Map<Object, Object>> mono = arg.count().map( count -> { //두번째 map, arg.count()는 Mono<Long>
            Map<Object, Object> item = new HashMap<>();
            item.put(arg.key(), count);
            return item;				
        });
        return mono;  //첫번째 map 값을 Mono<Map<Object, Object>>로 바꾼다음,
    }).subscribe( (data)->{	  //해당 데이터를 구독!
        System.out.println(data); 
    });
}        

 

최종적으로 첫번째 map이 Mono<Map<Object, Object>>로 되돌려 주기 때문에 결과가 해당 클래스명으로 출력 된 것이다.

그러면 저 내용을 Map데이터로 보려면 어떻게 해야되는 것 일까?

Mono라는 객체로 되돌려 받았으니 당연히 subscribe해야된다.

subscribe안에서 subscribe가 이루어졌다.

 

그러면 저러한 구독해야되는 대상이 나중에 많아지고 복잡해지면 소스코드는 구독에, 구독에 구독이 더해지는 매우 복잡한 형태가 될 것 이다.

 

 

 

* 도와줘요~ flatMap, concatMap

 

저렇게 구독할 Flux의 객체가 단일 Flux가 아닌 구독의 구독의 형태로 되어버린경우에는 flatMap이나 concatMap을 사용한다.

두 메소드의 차이는 순서를 유지하느냐, 유지하지 않느냐의 차이이다.

flatMap은 순서를 보장하지 않으며 concatMap은 순서를 보장하여준다.

Flux의 객체를 서로 순서에 맞추어서 사용하려면 concatMap을 사용하는데...

여기서는 굳이 그럴필요가 없으므로 flatMap을 사용하였다.

 

그러면 한번 바꾸어보자!

기존 map부분을 flatMap으로 바꾸어보자.

private static void two() {
    List<String> array = new ArrayList<String>();
    array.addAll(Arrays.asList(new String[]{"a", "b", "c", "d", "e", "e"}));

    Flux.fromIterable(array).groupBy( arr -> arr).flatMap( arg->{  //flatMap으로 바꾸었다.
        Mono<Map<Object, Object>> mono = arg.count().map( count -> {
            Map<Object, Object> item = new HashMap<>();
            item.put(arg.key(), count);
            return item;				
        });
        return mono; 
    }).subscribe( (data)->{	 
        System.out.println(data); 
    });
}        

 

위 내용을 실행하여보면..

아니...Map형태로 데이터가 출력 되었다.

 

해당 map으로 변환된 Flux의 요소를 병합을 통하여 단일 Flux로 바꾸어 준 것을 볼 수 있다.

구독에 구독이 나오는경우(구독 객체가 2중으로 된 경우) 해당 함수를 사용하면 조금 더 쉽게 사용 할 수 있다.

변환된 결과를 리스트형태로도 볼 수 있는데, flatMap 뒷 부분에 collectList 메소드를 붙여주면된다.

private static void two() {
    List<String> array = new ArrayList<String>();
    array.addAll(Arrays.asList(new String[]{"a", "b", "c", "d", "e", "e"}));

    Flux.fromIterable(array).groupBy( arr -> arr).flatMap( arg->{  //flatMap으로 바꾸었다.
        Mono<Map<Object, Object>> mono = arg.count().map( count -> {
            Map<Object, Object> item = new HashMap<>();
            item.put(arg.key(), count);
            return item;				
        });
        return mono; 
    }).collectList().subscribe( (data)->{	 //리스트로 돌려줘요~
        System.out.println(data); 
    });
}        

 

요렇게 메소드를 추가하면 Map결과를 리스트 형태로 받을 수 있다.

리스트 형태로 바꾸었다.

 

subscribe를 통해서 구독한 결과는 Disposable라는 클래스를 반환한다.

private static void two() {
    List<String> array = new ArrayList<String>();
    array.addAll(Arrays.asList(new String[]{"a", "b", "c", "d", "e", "e"}));

    Disposable dispose = Flux.fromIterable(array).groupBy( arr -> arr).flatMap( arg->{  //dispose?
        Mono<Map<Object, Object>> mono = arg.count().map( count -> {
            Map<Object, Object> item = new HashMap<>();
            item.put(arg.key(), count);
            return item;				
        });
        return mono; 
    }).collectList().subscribe( (data)->{
        System.out.println(data); 
    });
    System.out.println("처분? -> "+dispose.isDisposed());
}        

 

해당 클래스는 작업을 취소 또는 삭제할 수 있는지에 대한 정보를 지니고 있다.

isDisposed 메소드를 호출하면 처분(?) 했는지에 대한 여부를 반환한다.

 

이때 구독행위를 아에 중단하려면 dispose 메소드를 호출하면 된다.

dispose를 실행하면 구독행위를 중단하여준다.

구독행위를 중단하여보기 위하여 delayElements라는 메소드를 붙여주었다. (바로 subscribe 하지 않기위해)

private static void two() {
    List<String> array = new ArrayList<String>();
    array.addAll(Arrays.asList(new String[]{"a", "b", "c", "d", "e", "e"}));

    Disposable dispose = Flux.fromIterable(array).groupBy( arr -> arr).flatMap( arg->{  //dispose?
        Mono<Map<Object, Object>> mono = arg.count().map( count -> {
            Map<Object, Object> item = new HashMap<>();
            item.put(arg.key(), count);
            return item;				
        });
        return mono; 
    }).delayElements(Duration.ofSeconds(3)).collectList().subscribe( (data)->{ //3초 딜레이
        System.out.println(data); 
    });

    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("처분가능? -> "+dispose.isDisposed());
    dispose.dispose();	 //중단!   
    System.out.println("처분가능? -> "+dispose.isDisposed());
}        

 

구독에 딜레이를 부여해서 구독을 중단하는 코드로 바꾸어 보았다.

위 내용을 실행하면 subscribe에 아무것도 나타나지 않는다.

처분여부만 나온다.

 

위 내용에 사용한 최총 코드이다.

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.time.Duration;
import java.util.*;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class JustTest {

    public static void main(String[] args) {
        String text = "abcd";
        Mono<String> mono = Mono.just(text);
		
        mono.map( arg-> arg.length()).subscribe( str->{
            System.out.println(str);
        });
        
        two();
    }
    
    private static void two() {
        List<String> array = new ArrayList<String>();
        array.addAll(Arrays.asList(new String[]{"a", "b", "c", "d", "e", "e"}));

        //기본 사용법
        Flux.fromIterable(array).groupBy( arr -> arr).map( arg->{  
            Mono<Map<Object, Object>> mono = arg.count().map( count -> {
                Map<Object, Object> item = new HashMap<>();
                item.put(arg.key(), count);
                return item;				
            });
            return mono;
        }).subscribe( (data)->{	
            data.subscribe(System.out::println);  //Flux를 다시 구독~
        });

		
        //flatMap으로 변경!
        Flux.fromIterable(array).groupBy( arr -> arr).flatMap( arg->{
            Mono<Map<Object, Object>> mono = arg.count().map( count -> {
                Map<Object, Object> item = new HashMap<>();
                item.put(arg.key(), count);
                return item;				
            });
            return mono;
        }).collectList().subscribe( (data)->{
            System.out.println(data);
        });
		
        //list형식으로 변경!
        Flux.fromIterable(array).groupBy( arr -> arr).flatMap( arg->{
            Mono<Map<Object, Object>> mono = arg.count().map( count -> {
                Map<Object, Object> item = new HashMap<>();
                item.put(arg.key(), count);
                return item;				
            });
            return mono;
        }).collectList().subscribe( (data)->{
            System.out.println(data);
        });
		
        //처분여부
        Disposable dispose = Flux.fromIterable(array).groupBy( arr -> arr).flatMap( arg->{ 
            Mono<Map<Object, Object>> mono = arg.count().map( count -> {
                Map<Object, Object> item = new HashMap<>();
                item.put(arg.key(), count);
                return item;				
            });
            return mono;
        }).collectList().subscribe( (data)->{ 
            System.out.println(data);
        });
        System.out.println("처분? -> "+dispose.isDisposed());


        //처분해보기
        dispose = Flux.fromIterable(array).groupBy( arr -> arr).flatMap( arg->{ 
            Mono<Map<Object, Object>> mono = arg.count().map( count -> {
                Map<Object, Object> item = new HashMap<>();
                item.put(arg.key(), count);
                return item;				
            });
            return mono;
        }).delayElements(Duration.ofSeconds(3)).collectList().subscribe( (data)->{ 
            System.out.println(data);
        });
		
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("처분? -> "+dispose.isDisposed());
        dispose.dispose();
        System.out.println("처분? -> "+dispose.isDisposed());    
    }        
    
}

 

Flux와 Mono를 사용하면 정말 많이보게되는 메소드가 map, flatMap, concatMap이다.

조금 더 친숙해질 필요는 있는 것 같다.

그러면 여기서 궁금한 것은 문자가 담긴 array 값이 바뀐다면 subscribe의 내용이 과연 바뀌는지에 대한 여부이다.

 

뭔소린고 하니...

subscribe가 끝난 다음에 List<String> array의 값이 추가 되거나 삭제되거나 없어지거나에 대해서는 과연 subsrcibe로 다시 실행이 되는지에 대한 여부이다.

해당 내용에 대해서는 다음번에 작성하여보겠다.

spring webflux!

 

* 내용을 채우고 수정중입니다.

* 튜토리얼이나 가이드 목적보다도 개념정리에 목적을 두고 쓰고있습니다. 틀린부분이나 누락된 부분은 꼭 알려주세요!

 

 

반응형