개요

RxJS는 Reactive Extensions for JavaScript의 약자이며 비동기 데이터 흐름을 스트림 Observable로 다루는 라이브러리임 이벤트, API 응답, 클릭, WebSocket처럼 시간에 따라 변하는 값을 하나의 연속 흐름으로 모델링 가능

핵심 비교

  • Promise는 값 1개 처리
  • Observable은 값 0개부터 무한대까지 처리

Observable 핵심

정의

  • 시간에 따라 변경되는 데이터를 push 기반 스트림으로 표현한 객체
  • 구독 subscribe 시점에만 실행되는 lazy 특성
  • 구독 해제 unsubscribe 시 리소스 정리 가능

특징

  • lazy 실행으로 불필요한 작업 방지
  • cancel 가능으로 누수 방지
  • 0~무한개의 값 연속 처리

간단 예시

import { Observable } from 'rxjs'

const obs = new Observable(sub => {
  sub.next(1)
  sub.next(2)
  sub.complete()
})

obs.subscribe(v => console.log(v))

동작 관점

  • 생성 시 프로듀서 로직을 보관
  • subscribe 호출 시 프로듀서 실행 후 observer에게 next error complete를 push
  • 각 구독자는 독립적인 실행 흐름 가짐

Subject 핵심

정의

  • Observable과 Observer 두 역할을 동시에 수행하는 멀티캐스터
  • 외부에서 next로 값을 밀어 넣을 수 있음
  • 여러 구독자에게 동일 데이터 브로드캐스트

언제 쓰는지

  • 이벤트를 여러 구독자에게 멀티캐스트할 때
  • 외부 입력으로 스트림을 주도할 때

간단 예시

import { Subject } from 'rxjs'

const s = new Subject<number>()

s.subscribe(v => console.log('A', v))
s.subscribe(v => console.log('B', v))

s.next(1) // A와 B 모두 1 수신

Observable vs Subject 차이 요약

  • Observable은 내부 프로듀서가 데이터 생성, subscribe마다 독립 실행
  • Subject는 외부 next 가능, 모든 구독자에게 동일 데이터 전달, 이벤트 버스 역할에 적합

Teardown과 Subscription

정의

  • Observable 사용이 끝났을 때 리소스를 해제하는 정리 함수
  • subscribe 시 등록되고 unsubscribe 호출 시 실행

필요 이유

  • 이벤트 리스너 누수 방지
  • interval timeout 등 장기 실행 중지
  • 네트워크 소켓 등 연결 종료

패턴

const obs = new Observable(sub => {
  const id = setInterval(() => sub.next(Date.now()), 1000)
  return () => clearInterval(id) // teardown
})

const subscription = obs.subscribe(v => console.log(v))
setTimeout(() => subscription.unsubscribe(), 5000)

Subscription 내부 개념

  • 여러 teardown을 보관하는 컨테이너
  • unsubscribe 시 등록된 정리 로직 순차 실행

Scheduler 개념

RxJS는 emit 시점과 실행 컨텍스트를 제어하는 Scheduler를 제공함

  • 마이크로태스크 큐 기반 스케줄링
  • 매크로태스크 큐 setTimeout 기반
  • animationFrame 기반
  • queue 기반 동기 플러시

Scheduler로 시간 축과 실행 순서를 제어 가능하므로 Promise나 단순 EventEmitter 대비 구성 가능성이 큼

Operator 체인

map filter switchMap mergeMap 등 오퍼레이터는 Observable을 입력받아 새로운 Observable을 반환하는 변환기 함수

형태 요약

import { Observable } from 'rxjs'

function map<T, R>(fn: (v: T) => R) {
  return (source: Observable<T>) => new Observable<R>(sub => {
    const inner = source.subscribe({
      next: v => sub.next(fn(v)),
      error: e => sub.error(e),
      complete: () => sub.complete(),
    })
    return () => inner.unsubscribe()
  })
}

효과

  • 오퍼레이터를 이어붙여 체인 구성
  • 데이터 스트림을 함수형 파이프라인처럼 조합 가능

내부 동작 이해 정리

구성 요소

  • Observer 패턴으로 next error complete를 push
  • Push 기반 프로듀서가 값을 능동적으로 밀어넣음
  • Subscription과 Teardown으로 생명주기와 리소스 관리
  • Scheduler로 실행 컨텍스트와 타이밍 제어
  • Operators로 스트림 변환 체인 구축

흐름도

  • Observable 생성 시점에는 실행 없음
  • subscribe 호출 시 프로듀서 실행 후 observer로 값 push
  • 중간에 operator가 값을 가공
  • 최종 observer가 결과 수신
  • unsubscribe 시 teardown 실행으로 정리

베스트 프랙티스 간단 메모

  • 구독은 생성한 곳에서 해제 전략을 함께 설계할 것
  • operator 체인은 명확한 취지를 가진 최소 구성 유지
  • Subject는 상태 공유보다는 브리지나 이벤트 멀티캐스트 용도로 제한
  • 스케줄러 사용 시 테스트 환경에서 타이밍을 통제 가능한 전략 선택

한 줄 요약

RxJS는 Observer 패턴과 Push 프로듀서 Subscription과 Teardown Scheduler 그리고 Operator 체인을 조합해 비동기 이벤트 흐름을 스트림처럼 추상화함

참고자료