개발자 이야기/스프링 프레임워크

Spirng - WebFlux 기본 개념

재테크 도전하는 개발자 2024. 5. 3. 21:46

1.웹플럭스 (WebFlux)
스프링 웹플럭스는 리액티브 스택 웹 프레임워크로, 완전한 논블로킹 (non-blocking) 방식으로 동작합니다.
Reactive Streams back pressure를 지원하여 자원을 효율적으로 사용할 수 있습니다.
기존의 스프링 웹 MVC는 서블릿 API와 서블릿 컨테이너를 위해 개발되었지만, 웹플럭스는 리액티브 스트림 라이브러리를 활용하여 비동기 처리를 간단하게 구현할 수 있습니다.
웹플럭스를 사용하면 비동기 프로그래밍을 효율적으로 처리할 수 있으며, 리액터를 활용하여 데이터를 다루는 방법을 익히면 더욱 효과적으로 개발할 수 있습니다! 

 

-------------------------------------------

 

2.리액터 (Reactor)
리액터는 리액티브 프로그래밍을 지원하는 라이브러리입니다.
ReactiveX의 연산자 어휘와 풍부한 연산자 세트를 통해 데이터를 처리할 수 있는 Mono와 Flux API 유형을 제공합니다.
Mono는 0 또는 1개의 요소를 처리하며, Flux는 0개 이상의 요소를 처리합니다.

-------------------------------------------

 

3.명령형 프로그래밍 vs 반응형 프로그래밍 (with 선언형 프로그래밍)
무슨 차이인지?

 

명령형 프로그래밍은 개발자에 의해서 작성된 코드가 정해진 순서대로 실행되는 방식의 프로그래밍입니다.

코드가 순서대로 실행되므로 디버거가 코드의 명령문을 명확하게 가리킬 수 있고 다음 코드라인이 무엇인지 명확하게 알 수 있습니다. 

 

반응형 프로그래밍은 주변 환경과 끊임없이 상호작용을 하는 프로그래밍입니다.

프로그램이 주도하는 것이 아니라 환경 (예: 데이터)이 변하면 이벤트를 받아 동작하도록 만드는 프로그래밍 기법입니다.

반응형 프로그램은 외부 요구에 끊임없이 반응하고 처리를 합니다. 

코드가 순서대로 실행되지 않습니다.

 

선언형 프로그래밍은 명령형 프로그래밍은 "어떻게"를 강조하고, 선언형 프로그래밍은 "무엇"을 강조합니다.

예를 들어, SQL 쿼리에서 "어떤 정보를 가져올지"보다 "어떻게 가져올지"에 더 관심이 있습니다.

선언형 프로그래밍은 함수형, 논리형, 제한형 프로그래밍 언어 등으로 작성되며, SQL, HTML 등이 해당됩니다.

 

-------------------------------------------

4.반응형 스트림(Reactive Stream)

- Publisher : 발행자 데이터를 생성하고, Subscriber에게 전송합니다.
- Subscriber : 구독자 Publisher로부터 데이터를 받아들이고, 소비합니다.
- Subscription : 구독 Subscriber가 처리할 데이터의 양을 정의합니다.

 

-------------------------------------------


5.Mono (0~1) vs Flux (0~n)
Mono : 1개만 반응
Flux : n개 모두 반응
무슨 차이인지?

 

5-1.Flux
Flux는 Reactive Stream의 Publisher에 해당하는 객체입니다.
Publisher는 데이터를 방출하는 역할을 합니다.
Flux는 0개부터 N개까지의 T 타입의 원소를 방출합니다.
onNext 이벤트가 발생하면 원소를 방출하게 됩니다.
onComplete 이벤트가 발생하면 스트림이 완료됩니다.
onError 이벤트가 발생하면 에러를 발생시킵니다.
이러한 터미널 이벤트가 발생하지 않으면 Flux는 무한히 유지됩니다.
Flux는 다양한 Operator를 가지고 있어 데이터를 변형하거나 조작할 수 있습니다.


5-2.Mono
Mono도 Flux와 마찬가지로 Reactive Stream의 Publisher를 상속받은 객체입니다.
하지만 Mono는 0개부터 1개까지의 T 타입의 원소를 방출합니다.
Mono는 단일 값을 처리하는데 사용됩니다.
onNext 신호를 통해 최대 하나의 아이템을 방출하고, onComplete 신호를 통해 스트림이 종료되었다는 것을 표현합니다.
onError를 통해 실패를 나타낼 수 있습니다.
Mono는 주로 단일 값 처리에 사용되고, Flux는 여러 개의 값을 처리할 때 사용됩니다. 이 두 가지를 적절하게 활용하여 비동기 프로그래밍을 설계할 수 있습니다.123

 

-------------------------------------------

 

6. Publisher, Subscriber  life Cycle

 

6-1.Publisher Life Cycle
- 데이터 생성: Publisher는 데이터를 생성합니다. 이 데이터는 스트림으로 흐르게 됩니다.


-  데이터 흐름: 생성된 데이터는 Subscriber에게 전달됩니다. 이때 데이터는 비동기적으로 흐르며, back pressure를 고려하여 전송됩니다.

 

※ back pressure : 백 프레셔는 비동기 컴포넌트 사이에서 데이터 흐름을 제어하는 메커니즘입니다. 발행자 (Publisher)가 데이터를 얼마나 빠르게 또는 느리게 생산할지를 구독자 (Subscriber)에게 제어할 수 있도록 합니다. 리액티브 스트림에서 백 프레셔는 비동기적인 상호 작용을 정의하는 작은 스펙입니다


-  데이터 완료: Publisher가 모든 데이터를 생성하면 완료 신호를 보냅니다.


6-2.Subscriber Life Cycle
-  구독 요청: Subscriber는 Publisher에게 구독을 요청합니다.


-  데이터 수신: Publisher는 데이터를 생성하고 Subscriber에게 전달합니다.


-  데이터 처리: Subscriber는 받은 데이터를 처리합니다. 이때 back pressure를 통해 데이터 흐름을 제어합니다.

 

-  완료 또는 에러 처리: 모든 데이터를 처리한 후 완료 또는 에러 상태로 전환됩니다.
이렇게 Publisher와 Subscriber가 상호작용하여 리액티브 스트림을 구성합니다. 이를 통해 논블로킹하게 데이터를 처리하고, back pressure를 통해 안정적인 동작을 보장할 수 있습니다.

 

-------------------------------------------

 

7.기본적인 명령어

 

7-1. Mono.just()
Mono.just()는 단일 값을 갖는 Mono를 생성합니다. 이 값은 한 번 방출되고, 그 후에 Mono가 완료됩니다
예를 들어, Mono.just("Hello, World!")는 "Hello, World!"라는 값을 가지는 Mono를 생성합니다.


7-2. Mono.fromCallable()
Mono.fromCallable()은 Callable 객체를 사용하여 Mono를 생성합니다. Callable이 실행되고 결과 값이 Mono로 방출됩니다.
예를 들어, Mono.fromCallable(() -> someExpensiveOperation())는 비용이 많이 드는 작업의 결과를 Mono로 감싸 반환합니다.


7-3. Flux.fromIterable()
Flux.fromIterable()은 Iterable(예: List, Set)의 요소를 Flux로 변환합니다. 각 요소는 순서대로 방출됩니다
예를 들어, Flux.fromIterable(Arrays.asList("A", "B", "C"))는 “A”, “B”, "C"를 순서대로 방출하는 Flux를 생성합니다.


7-4. Flux.just()
Flux.just()는 여러 값을 가지는 Flux를 생성합니다. 인자로 전달된 값들이 순서대로 방출되고, 그 후에 Flux가 완료됩니다
예를 들어, Flux.just(1, 2, 3)는 1, 2, 3을 순서대로 방출하는 Flux를 생성합니다.


7-5. doOnNext(), doOnComplete(), doOnError()
이들은 각각 요소가 방출될 때, Flux나 Mono가 완료될 때, 또는 에러가 발생했을 때 실행되는 콜백입니다.
예를 들어, flux.doOnNext(item -> log.info("Received item: " + item))는 요소가 방출될 때 로그를 남깁니다.


7-6. map
map은 Flux나 Mono의 각 요소를 변환합니다. 함수를 적용하여 요소를 변환하고 새로운 Flux나 Mono를 생성합니다.
예를 들어, flux.map(item -> item * 2)는 각 요소를 2배로 변환한 Flux를 생성합니다.


7-7. filter
filter는 조건을 만족하는 요소만을 방출하는 연산자입니다. 예를 들어, flux.filter(item -> item > 0)는 양수인 요소만을 방출하는 Flux를 생성합니다.


7-8. flatMap
flatMap은 각 요소를 다른 Flux나 Mono로 변환하고, 그 결과를 하나의 Flux로 합칩니다. 비동기 작업에 유용합니다
예를 들어, flux.flatMap(item -> getRelatedItems(item))는 각 요소를 관련된 항목으로 변환한 후 합쳐진 Flux를 생성합니다.


7-9. then
then은 이전 작업이 완료된 후 다음 작업을 실행합니다. 결과를 반환하지 않습니다.
예를 들어, flux.then(Mono.just("Done!"))는 Flux가 완료된 후 "Done!"을 방출하는 Mono를 생성합니다.


7-10. onErrorResume
에러가 발생했을 때 대체 Mono나 Flux를 반환합니다. 예를 들어, flux.onErrorResume(e -> Mono.just("Fallback"))는 에러가 발생하면 "Fallback"을 방출하는 Mono를 생성합니다.

 

-------------------------------------------

 

8.콜드스트림, 핫스트림 차이

 

콜드 스트림 (Cold Stream): 콜드 스트림은 데이터를 생성하는 시점에서 구독자가 없는 상태로 시작합니다. 이는 데이터를 발행하는 측에서 구독자가 생기기 전까지 데이터를 생성하지 않는 특성을 가지고 있습니다. 콜드 스트림은 일반적으로 컬렉션, Sequence, RxJava의 Observable 등과 같은 데이터 소스에서 나옵니다. 콜드 스트림은 구독자가 생긴 후에야 데이터를 생성하며, 각 구독자는 독립적으로 데이터를 받습니다.


핫 스트림 (Hot Stream): 핫 스트림은 데이터를 생성하는 시점과 상관없이 항상 데이터를 발행합니다. 핫 스트림은 이미 데이터를 생성하고 있으며, 구독자가 생기면 해당 데이터를 구독자에게 전달합니다. 웹플럭스(Flux)에서는 핫 스트림을 사용하여 이벤트 기반 프로그래밍을 지원합니다.
( 구독자가 생기든 말든 상관없이 계속해서 데이터를 발행합니다)


예를 들어, 웹플럭스에서는 Flux 클래스를 사용하여 핫 스트림을 생성할 수 있습니다. 이 스트림은 이미 데이터를 발행하고 있으며, 구독자가 생기면 해당 데이터를 구독자에게 전달합니다. 반면에 콜드 스트림은 구독자가 생기기 전까지 데이터를 생성하지 않습니다.


이제 웹플럭스에서 콜드 스트림과 핫 스트림을 어떻게 다루는지 살펴보겠습니다. 웹플럭스는 리액티브 프로그래밍을 위한 라이브러리로, 코틀린 코루틴과 함께 사용됩니다. 웹플럭스에서는 콜드 스트림을 핫 스트림으로 변경하는 방법으로 shareIn 연산자를 제공합니다. 이 연산자를 사용하면 콜드 스트림을 주어진 코루틴 스코프 내에서 시작되는 핫 스트림으로 변경할 수 있습니다. 또한, replay 매개변수를 통해 최근 값들을 새로운 구독자에게 다시 발행할 수 있습니다.

 

※ 코루틴 : 코루틴은 프로그래밍에서 루틴의 일종으로서, 협동 루틴이라 할 수 있습니다. "코루틴"이라는 단어에서 "Co"는 “with” 또는 "together"를 의미합니다. 즉, 코루틴은 함께 수행되는 함수입니다
코루틴은 서브 루틴과는 다른 특징을 가지고 있습니다. 서브 루틴은 무조건 순차적으로 수행되어야 하지만, 코루틴은 함께 수행되며 서로 무제한 양보를 할 수 있습니다. 이는 스레드의 자원을 최대한 활용할 수 있게 해줍니다.
간단히 말하면, 코루틴은 실행의 지연과 재개를 허용함으로써 비선점적 멀티태스킹을 위한 서브 루틴을 일반화한 컴퓨터 프로그램 구성 요소입니다. 코루틴은 비동기 작업을 효율적으로 처리하는 데 사용되며, Kotlin과 같은 언어에서 지원됩니다

예를 들어, 다음과 같은 코드에서 shareIn 연산자를 사용하여 콜드 스트림을 핫 스트림으로 변경할 수 있습니다:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

private suspend fun initHeavyLogic() {
    delay(1000)  // 1초 동안 대기한 다음
    println("initHeavyLogic")  // "initHeavyLogic"을 출력하는 비동기 작업을 수행
}

// 코루틴은 특정 범위 내에서 실행되며, 해당 범위를 지정하여 코루틴이 어떤 스레드에서 실행되는지, 어떤 라이프사이클에 종속되는지 등을 결정함.
// Dispatchers.IO는 I/O 작업에 최적화된 디스패처(dispatcher)를 의미합니다. 
// 따라서 이 코루틴 범위 내에서 실행되는 코루틴은 I/O 작업을 처리하기에 적합합니다. 
// 코루틴 범위를 지정함으로써 코루틴이 어떤 스레드에서 실행되는지 명시적으로 설정할 수 있습니다
val coroutineScope = CoroutineScope(Dispatchers.IO)  // 스레드에서 코루틴을 실행할 수 있는 범위를 정의합니다.

val counter: Flow<Int> = flow {  // Flow는 비동기적인 데이터 스트림을 처리하기 위한 기능을 제공, 블록 내에서 데이터를 생성합니다.
    initHeavyLogic()  // 함수를 호출하여 초기 로직을 실행합니다.
    var count = 0
    while (true) {  // 무한 루프를 통해 count 값을 생성하고, 200ms마다 데이터를 발행합니다.
        emit(count++)  // 값을 발행
        delay(200)
    }
    // shareIn()을 사용하여 플로우를 공유하고, Eagerly 모드로 시작합니다.
    // 공유 플로우는 여러 구독자가 동시에 값을 수신할 수 있으며, 각 구독자는 독립적으로 데이터를 받습니다.
}.shareIn(  
    scope = coroutineScope,  // 코루틴 범위를 정의하는 객체. 현재 코루틴이 coroutineScope 내에서 실행됨을 의미
    started = SharingStarted.Eagerly  // SharingStarted.Eagerly는 즉시 감지를 의미. 코루틴이 시작되자마자 값을 방출하기 시작.
)

fun main(args: Array<String>) {
    println("Started")  // "Started"를 출력합니다.
    runBlocking {  // runBlocking 블록 내에서 두 개의 코루틴을 실행합니다.
        launch {  // 첫 번째 코루틴
            println("#1 Launched")  // "#1 Launched"를 출력하고, 
            counter    // counter 플로우에서 처음 10개의 값을 수집하여 출력합니다.
                .take(10)
                .collect { println("#1:$it") }
        }
        launch {  // 두번째 코루틴
            delay(2000)
            println("#2 Launched")  // 2초 후에 "#2 Launched"를 출력하고, 
            counter  // counter 플로우에서 다음 10개의 값을 수집하여 출력합니다.
                .take(10)
                .collect { println("#2:$it") }
        }
    }
    println("Finished")  //  "Finished"를 출력합니다.
}

 

shareIn 모드 中 핫스트림

  • Eagerly 모드:
    • 구독자가 처음 나타나기 전에도 상위 플로우가 시작됩니다.
    • 이 경우 replay 매개변수로 지정된 최신 값 이후의 모든 값은 즉시 삭제됩니다.
    • 즉, 구독자가 생기기 전에도 데이터를 발행하며, 최신 값 이후의 데이터는 삭제되지 않습니다.
    • 이는 핫 스트림과 유사한 특성입니다.

 

shareIn 모드 中  콜드스트림

  • Lazily 모드:
    • 구독자가 처음 나타난 후에 상위 플로우가 시작됩니다.
    • 이로 인해 첫 번째 구독자는 모든 발행된 값을 받지만, 이후 구독자는 최신 replay 값만 보장받습니다.
    • 즉, 구독자가 생기기 전까지 데이터를 생성하지 않으며, 구독자가 생긴 후에야 데이터를 생성합니다. 이는 콜드 스트림과 유사한 특성입니다.
  • WhileSubscribed 모드:
    • 구독자가 처음 나타날 때 상위 플로우가 시작되고, 마지막 구독자가 사라질 때 즉시 중단됩니다.
    • 이 경우 replay 값은 구독자 없이도 계속 캐시됩니다.
    • 즉, 구독자가 있을 때만 데이터를 생성하며, 모든 구독자가 사라지면 중단됩니다. 이 역시 콜드 스트림과 유사한 특성입니다.

 

-------------------------------------------


9.리액티브 시스템

리액티브 시스템은 리액티브한 원리와 원칙을 적용하여 개발된 소프트웨어 시스템을 의미합니다. 이러한 시스템은 주로 빠른 응답성과 내결함성을 강조하며, 비동기적인 처리, 메시지 기반 아키텍처, 이벤트 주도 구조 등을 활용하여 확장성과 탄력성을 갖출 수 있습니다.

우리는 응답이 잘 되고(클라이언트의 요청에 즉각적으로 응답하고), 탄력적이며 유연하고 메시지 기반으로 동작하는 반응을 잘하는 시스템 을 기대합니다. 우리는 이것을 리액티브 시스템(Reactive Systems)라고 부릅니다.

 

9-1. 응답성(Responsive)

시스템이 가능한 한 즉각적으로 응답하는 것을 응답성이 있다고 합니다.

응답성 있는 시스템은 신속하고 일관성 있는 응답 시간을 제공하고, 신뢰할 수 있는 상한선을 설정하여 일관된 서비스 품질을 제공합니다. 이러한 일관된 동작은 오류 처리를 단순화하고, 일반 사용자에게 신뢰를 조성하고, 새로운 상호작용을 촉진합니다.

 

9-2. 탄력성(Resilient)

시스템이 장애 에 직면하더라도 응답성을 유지 하는 것을 탄력성이 있다고 합니다.

탄력성은 복제, 봉쇄, 격리, 위임에 의해 실현됩니다. 장애는 각각의 구성 요소 에 포함되며 구성 요소들은 서로 분리되어 있기 때문에 이는 시스템이 부분적으로 고장이 나더라도, 전체 시스템을 위험하게 하지 않고 복구 할 수 있도록 보장합니다.

 

9-3. 유연성(Elastic)

시스템이 작업량이 변화하더라도 응답성을 유지하는 것을 유연성이라고 합니다. 리액티브 시스템은 입력 속도의 변화에 따라 이러한 입력에 할당된 자원을 증가시키거나 감소키면서 변화에 대응합니다.

 

9-4. 메시지 주도(Message Driven)

리액티브 시스템은 비동기 메시지 전달 에 의존하여 구성 요소 사이에서 느슨한 결합, 격리, 위치 투명성 을 보장하는 경계를 형성합니다.

이 경계는 장애 를 메시지로 지정하는 수단을 제공합니다. 명시적인 메시지 전달은 시스템에 메시지 큐를 생성하고, 모니터링하며 필요시 배압 을 적용함으로써 유연성을 부여하고, 부하 관리와 흐름제어를 가능하게 합니다.

논블로킹 통신은 수신자가 활성화가 되어 있을 때만 자원 을 소비할 수 있기 때문에 시스템 부하를 억제할 수 있습니다.

이벤트가 발생하거나 요청이 들어오면 해당 메시지를 큐에 넣고, 수신 컴포넌트들은 이 메시지를 비동기적으로 받아 처리합니다. 이러한 방식으로 컴포넌트 간의 결합도를 낮추고 확장성과 유연성을 강화할 수 있습니다.

리액티브 시스템은 데이터나 이벤트가 발생하면 즉시 반응하여 처리하는 것이 중요합니다. 이를 위해 변화 전파를 하는 과정에서 push 방식을 사용합니다.

 

-------------------------------------------


10.리액티브 스트림즈

리액티브 프로그래밍을 구현한 라이브러리들이 정해진 표준없이 구현되기 시작했습니다. 이에 하나의 규칙을 정해서, 여러 리액티브 프로그래밍 구현체들이 상호 변환 가능하도록 만들자는 목소리가 나오기 시작했습니다.

그렇게 만들어진 비동기 스트림 처리를 위한 표준(규칙)이 리액티브 스트림즈입니다.

 

이 표준은 데이터 스트림을 Non-Blocking이면서 비동기적인 방식으로 처리하기 위해 만들어졌습니다. 리액티브 스트림즈는 다양한 라이브러리에서 구현되어 있으며, 리액티브한 코드 작성과 구성을 용이하게 해 줍니다.

리액티브 스트림즈의 핵심 컴포넌트는 다음과 같습니다:

Publisher:
데이터를 생성하고 통지(발행, 게시, 방출)하는 역할을 합니다.
구독자(Subscriber)가 데이터를 받을 수 있도록 데이터를 제공합니다.


Subscriber:
구독한 Publisher로부터 통지된 데이터를 전달받아서 처리합니다.
Publisher로부터 데이터를 받기 위해 구독(subscribe)합니다.


Subscription:
Publisher에 요청할 데이터의 개수를 지정하고, 데이터의 구독을 취소하는 역할을 합니다.
데이터의 요청 개수를 지정하여 데이터 흐름을 조절합니다.


Processor:
Publisher와 Subscriber의 기능을 모두 가지고 있습니다.
다른 Publisher를 구독하거나, 다른 Subscriber가 구독할 수 있습니다.
리액티브 스트림즈를 통해 구현된 라이브러리로는 RxJava, Reactor, Akka Streams, Java 9 Flow API 등이 있습니다. 이 중에서 Spring Framework와 가장 궁합이 잘 맞는 구현체는 Reactor입니다. 이를 활용하여 비동기적이고 효율적인 코드를 작성할 수 있습니다.

 

 

발행자와 구독자 간의 데이터 흐름을 조절하고 백 프레셔( backPressure )를 지원합니다. 예를 들어, 데이터 저장소가 HTTP 서버에 응답에 사용할 데이터를 생성할 때 백 프레셔를 통해 데이터 흐름을 제어할 수 있습니다

 

-------------------------------------------

 

참고 : https://adjh54.tistory.com/232

 

[Java] Spring Boot Webflux 이해하기 -1 : 흐름 및 주요 특징 이해

해당 글에서는 Spring Boot Webflux에 대해 이해하고 전체적인 흐름, 특징에 대해서 이해를 돕기 위해 작성한 글입니다. 1) Spring Boot Webflux 💡 Spring Boot Webflux - 반응형 및 비동기적인 웹 애플리케이션

adjh54.tistory.com

 

참고2 : https://velog.io/@nittre/%EB%B8%94%EB%A1%9C%ED%82%B9-Vs.-%EB%85%BC%EB%B8%94%EB%A1%9C%ED%82%B9-%EB%8F%99%EA%B8%B0-Vs.-%EB%B9%84%EB%8F%99%EA%B8%B0

 

블로킹 Vs. 논블로킹, 동기 Vs. 비동기

와 드디어 이해했다 속이 후련~

velog.io

 

참고3 : 리액티브, 리액티브 시스템, 리액티브 프로그래밍, 리액티브 스트림즈......?! (velog.io)

 

리액티브, 리액티브 시스템, 리액티브 프로그래밍, 리액티브 스트림즈......?!

Spring Webflux를 이해하기 위한 Reactive 편

velog.io