Spring framework/Spring Webflux

spring webflux 5 (웹플럭스 적용기, Mono와 Flux의 Processor)

마샤와 곰 2020. 3. 25. 10:47

 

Flux나 Mono를 통해서 생성된 객체(대상, 스트림)는 subscribe로의 구독을 통해서 수행을 한다.

아래 간단한 코드를 살펴보자.

private static void three() {
    List<String> array = new ArrayList<String>();
    array.addAll(Arrays.asList(new String[]{"a", "b", "c", "d", "e", "e"}));
    Flux.fromIterable(array).collectList().subscribe( (data)->System.out.println(data));
}

 

별거 없는 코드이다.

변수 array에 대해서 list로 변환한 다음 간단하게 출력을 하게 하였다.

그러면 생각해볼 것이 저 subscribe이다.

만약 array에 데이터가 새로 추가되면 subscribe는 과연 동작을 할 것 인가?

private static void three() {
    List<String> array = new ArrayList<String>();
    array.addAll(Arrays.asList(new String[]{"a", "b", "c", "d", "e", "e"}));
    Flux.fromIterable(array).collectList().subscribe( (data)->System.out.println(data));
    array.addAll(Arrays.asList(new String[]{"1", "2", "3"}));  //데이터를 추가 하였다.
}

 

아무런 변화가 일어나지 않았다.

 

해당 구독행위는 array에 대해서 하였기 때문에 뒤에서 변화가 이루어진 부분에 대해서는 이미 끝난 행위라 아무런 변화가 없었다.

그러면 한번 구독한 행위를 계속해서 발생하게 하려면 뭘 해야 되는 것 일까?

subscribe의 내용을 계속해서 동작하게 하려면 어떻게 해야되는 것 일까?

 

 

* 안녕~ Processor!

 

Flux에서 프로세서(Processor)와 관련된 내용을 좀 찾아보면 많은 프로세서들이 존재한다.

FluxProcessor, EmitterProcessor, ReplayProcessor등등..

 

그중 EmitterProcessor는 여러개의 구독자(subscriber)가 사용할 수 있는 구독과 발행이 동시해 일어나는 프로세서이다.

또한, 구독행위가 등록되고 난 이후에 해당 이벤트가 발생하면 구독하는 대상에게 동기적으로 전달한다.

대충 이해를 한 것은 다수의 구독행위(subscribe)에 대해서 계속해서 동작하면서 해당 동작을 동기적으로 실행해준다고 이해 하였다.(네...?)

 

아우..어렵다...예를통해 살펴보자.

여기서는 EmitterProcessor를 사용하였다.

private static void three() {
    List<String> array = new ArrayList<String>();
    array.addAll(Arrays.asList(new String[]{"a", "b", "c", "d", "e", "e"}));
    Flux.fromIterable(array).collectList().subscribe( (data)->System.out.println(data));
    array.addAll(Arrays.asList(new String[]{"1", "2", "3"}));  //데이터를 추가 하였다.
    
    //프로세서 시작 구간.
    EmitterProcessor<List<String>> data = EmitterProcessor.create();
    data.subscribe(t -> System.out.println(t));  //구독자 등장
    FluxSink<List<String>> sink = data.sink();   //발행인 등장
    sink.next(array);  //구독자에게 발행
}

 

주석내용을 자세히 살펴보면, 구독자가 등장한 다음에 발행하는 발행인(FluxSink)이 등장 하였다.

발행인이 next 메소드를 통해서 구독자에게 array 변수에 담긴 데이터를 전달 하였다.

해당 결과를 살펴보자.

데이터가 출력 되었다.

 

그러면..이해를 위해서 신문사, 신문을 구독하는 아저씨(?)로 비유를 하여보자.

private static void three() {
    List<String> array = new ArrayList<String>();
    array.addAll(Arrays.asList(new String[]{"a", "b", "c", "d", "e", "e"}));
    //Flux.fromIterable(array).collectList().subscribe( (data)->System.out.println(data));
    //array.addAll(Arrays.asList(new String[]{"1", "2", "3"}));  //데이터를 추가 하였다.
    
    //프로세서 시작 구간.
    EmitterProcessor<List<String>> data = EmitterProcessor.create();  //신문사
    data.subscribe(t -> System.out.println("101호 : "+t));  //신문을 읽는 101호 아저씨
    FluxSink<List<String>> sink = data.sink();   //신문 배달부
    sink.next(array); //신문 배달 완료
		
    array.addAll(Arrays.asList(new String[]{"new", "data", "hello"}));  //신문 내용을 바꿈
			
    data.subscribe(t -> System.out.println("302호 : "+t));    //새롭게 돈을 내고 신문을 읽는 302호 아저씨 등장!
    sink.next(array);  //신문 배달
		
    array.addAll(Arrays.asList(new String[]{"1", "2", "3"}));  //신문 내용을 바꿈
    sink.next(array);  //신문 배달
}

 

EmitterProcessor는 신문사 이다.

EmitterProcessor를 subscribe 하는 행위는 신문을 읽는 구독자(subscriber) 이다.

그리고 FluxSink라는 클래스를 통해서 next 메소드를 호출하는 것은 신문을 발행하는 일이다.

next를 통해서 신문을 계속해서 발행하면 subscribe를 통해서 정의한 내용이 계속해서 동작을 하게 된다.

새로 추가된 구독자의 행위도 잘 나옴을 볼 수 있다.

 

 

앞서 사용한 구독(subscribe)의 형태는 데이터를 유지(keep)한 상태에서 구독자(subscriber)가 도착한 경우 해당 데이터를 전달하는 방식이였다.    * 요런형태들 -> Flux.from().subsrcibe()

유지한 데이터를 subscribe 메소드를 통해서 실행하는 것이 주 목적이다.

 

그런데 Processor들은 기존에 있거나 새롭게 등장한 구독자(subscriber)에게 데이터(old data)를 전달한다.

발행하는 기관을 건설하고 구독자를 모집한 뒤에 계속해서 발행하는 형태이다. 구독자에게 발행(next)을 하여 행위를 동작시키는 것이 주 목적이다.

 

작업 할 내용이 데이터가 중심이면 일반 구독형태를 사용하고, 구독자가 중심이면 프로세스를 사용하면 될 것 같다.

 

해당내용을 좀더 찾아보려면 hot publishercold publisher라는 내용으로 검색을 해서 알아볼 수 있다.

쉽지않는 Flux와 Mono..여기까지 언급된 내용 말고도 더 많은 기능과 내용이 존재한다.

다음번에는 다시 Webflux로 돌아와서 데이터베이스와의 연동에 대해서 작성하여 보겠다.

 

최종 소스코드를 첨부한다.

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.time.Duration;
import java.util.*;


import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class JustTest {
    public static void main(String[] args) {
        three();
    
    }

    private static void three() {
        List<String> array = new ArrayList<String>();
        array.addAll(Arrays.asList(new String[]{"a", "b", "c", "d", "e", "e"}));
        //Flux.fromIterable(array).collectList().subscribe( (data)->System.out.println(data));
        //array.addAll(Arrays.asList(new String[]{"1", "2", "3"}));  //데이터를 추가 하였다.
    
        //프로세서 시작 구간.
        EmitterProcessor<List<String>> data = EmitterProcessor.create();  //신문사
        data.subscribe(t -> System.out.println("101호 : "+t));  //신문을 읽는 101호 아저씨
        FluxSink<List<String>> sink = data.sink();   //신문 배달부
        sink.next(array); //신문 배달 완료
		
        array.addAll(Arrays.asList(new String[]{"new", "data", "hello"}));  //신문 내용을 바꿈
			
        data.subscribe(t -> System.out.println("302호 : "+t));    //새롭게 돈을 내고 신문을 읽는 302호 아저씨 등장!
        sink.next(array);  //신문 배달
		
        array.addAll(Arrays.asList(new String[]{"1", "2", "3"}));  //신문 내용을 바꿈
        sink.next(array);  //신문 배달
    }

}

spring webflux!

 

* 내용을 채우고 수정중입니다.

* 튜토리얼이나 가이드 목적보다도 개념정리에 목적을 두고 쓰고있습니다. 틀린부분이나 누락된 부분은 꼭 알려주세요!

 

 

반응형