Reactor Samples - redutan/redutan.github.io GitHub Wiki

N๊ฐœ ๋ณ‘๋ ฌ ์กฐํšŒ (+ ์ˆœ์„œ๋ณด์žฅ)

Flux<PodCast> ์—์„œ ๊ฐ PodCast ๋ณ„๋กœ ๋‹ค์‹œ ์กฐํšŒ(api)ํ•œ ํ›„ ์กฐ๊ฑด(์˜ค๋””์˜ค ์ฑ„๋„์ธ๊ฐ€?)์— ๋”ฐ๋ผ์„œ ํ•„ํ„ฐ๋ง

    // ๋ณ‘๋ ฌ์ฒ˜๋ฆฌ = flatMapSequential + callable + scheduler
    private List<PodCast> filterAudioPodtyCasts(List<PodCast> casts) {
        return Flux.fromIterable(casts)
                   .flatMapSequential(cast -> monoPodCastAndAudio(cast)) 
                   .filter(PodCastAndAudio::isAudio)
                   .map(PodtyCastAndAudio::getCast)
                   .collectList()
                   .block(Duration.ofSeconds(13));
    }

    private Mono<PodCastAndAudio> monoPodCastAndAudio(PodCast cast) {
        Long castId = cast.getCastId();
        return Mono.fromCallable(() -> audioChannelChecker.isAudio(castId))
                   .subscribeOn(Schedulers.elastic())
                   .map(isAudio -> new PodCastAndAudio(cast, isAudio));
    }

๋‹ค์–‘ํ•œ ๋ชจ๋ธ์„ ์กฐํšŒ ํ›„ ์กฐํ•ฉ

ํƒœ๊ทธ, ํฌ์ŠคํŠธ, ๋‰ด์Šค, ๋žญํ‚น ๋“ฑ ๋ฉ”์ธํ™”๋ฉด์— ๋…ธ์ถœ๋˜๋Š” ์—ฌ๋Ÿฌ๊ฐ€์ง€ API๋ฅผ ํ•œ ๋ฒˆ์— ํ˜ธ์ถœํ•ด์„œ ์กฐํ•ฉ

        // ๋ณ‘๋ ฌ์ฒ˜๋ฆฌ = zip + TupleUtils.function & defer + scheduler(immediate)
        return Mono.zip(tags(), posts(accessMemberId), news(), rankings(accessMemberId), curators(accessMemberId))
                   .map(TupleUtils.function(MainDetail::new))
                   .doOnError(this::logError)
                   .onErrorReturn(MainDetail.EMPTY)
                   // ์˜ˆ์™ธ๋ฅผ ์›ํ•˜์ง€ ์•Š๋Š”๋‹ค๋ฉด .timeout ๋„ ์ข‹์€ ๋ฐฉ๋ฒ•
                   .blockOptional(Duration.ofMillis(5000))    
                   .orElse(MainDetail.EMPTY);
//...
    private Mono<List<RecommendTagRow>> tags() {
        return Mono.fromCallable(() -> recommendTagQuerier.getRecommendTags())
                   .subscribeOn(Schedulers.elastic())
                   .doOnError(this::logError)
                   .onErrorReturn(emptyList());

    }
@Getter
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class MainDetail {
    public static final MainDetail EMPTY = new MainDetail(emptyList(),
                                                          emptyList(),
                                                          NewsStandDto.EMPTY,
                                                          emptyList(),
                                                          emptyList());

    private List<RecommendTagRow> tags;
    private List<PostDetail> posts;
    private NewsStandDto news;
    private List<PostDetail> rankings;
    private List<RecommendCuratorRow> curators;

๋‹ค์–‘ํ•œ ๋ชจ๋ธ์„ ์กฐํšŒ ํ›„ ๋ช…๋ น(setAllDatas)

    private void setLatestReferenceDatas(Wiki wiki, PageContentReferencesHolder holder) {
        // block ์—†์ด ๋น„๋™๊ธฐ์ ์œผ๋กœ ๋ฐ์ดํ„ฐ๋“ค๋ฅผ ํ•œ ๋ฒˆ์— ์กฐํšŒ. ๋‹จ, ๋ชจ๋“  ์ •๋ณด๋ฅผ ๋‹ค ์กฐํšŒํ•  ๋•Œ ๊นŒ์ง€ block ํ•œ๋‹ค.
        Mono.zip(getMemberMap(wiki, holder),                                // member
                 getMemberGroupMap(wiki, holder))                           // memberGroup
            .blockOptional()
            .ifPresent(TupleUtils.consumer(holder::setAllDatas));
    }
    Mono<Map<Long, TenantMember>> getMemberMap(Wiki wiki, PageContentReferencesHolder holder) {
        if (!holder.hasReferences(PageContentReference.MEMBER)) {
            return Mono.just(emptyMap());
        }
        Set<Long> orgMemberIds = holder.ids(PageContentReference.MEMBER);
        return Mono.fromCallable(() -> accountService.getTenantMemberMap(wiki.getTenantId(), orgMemberIds))
                   .subscribeOn(Schedulers.elastic());
    }

class PageContentReferencesHolder {
    void setAllDatas(Map<Long, TenantMember> memberMap, Map<Long, ProjectMemberGroup> memberGroupMap) {
        this.memberMap = memberMap;
        this.memberGroupMap = memberGroupMap;
    }
    // ...
}

Retry

ํ‘ธ์‹œ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐœ์†กํ•œ ํ›„ ํ‘ธ์‹œ๋ฉ”์‹œ์ง€๊ฐ€ ์ •์ƒ์ ์œผ๋กœ ๋ฐœ์†ก๋˜์—ˆ๋Š”์ง€ ํ™•์ธ. ๋งŒ์•ฝ ํ‘ธ์‹œ๊ฐ€ ์ •์ƒ์ ์œผ๋กœ ๋ฐœ์†ก๋˜์ง€ ์•Š์•˜์œผ๋ฉด ํ‘ธ์‹œ์‹คํŒจ ๋ฐœํ–‰

  • Thread.sleep ์—†์ด ์ฒ˜๋ฆฌ ๊ฐ€๋Šฅ
    Long messageId = messageIdGetter.getMessageId();
    PushMessageResponse response =
            Mono.just(messageId)
                .map(this::getMessageByApi)
                .retryWhen(
                        Retry.anyOf(WaitingException.class)               // ์ด ์˜ˆ์™ธ๊ฐ€ ์žˆ์œผ๋ฉด retry
                             .fixedBackoff(Duration.ofMillis(gapMillis))  // gapMillis ๊ฐ„๊ฒฉ์œผ๋กœ 
                             .retryMax(retryMax)                          // retryMax ๋งŒํผ retry
                             .doOnRetry(this::logRetry))
                .doOnError(this::logError)
                .onErrorReturn(PushMessageResponse.EMPTY)
                .blockOptional()
                .orElse(PushMessageResponse.EMPTY);

    if (FAIL == response.getStatus()) {
        pushFailNotificator.send(new ContactMessagePushFail(message));
    }
//...
    private void logError(Throwable error) {
        if (log.isWarnEnabled()) {
            log.warn("PushFailCallbackPostExecutor Error : " + error.getMessage(), error);
        }
    }

    private void logRetry(RetryContext<Object> context) {
        if (log.isInfoEnabled()) {
            log.info("Retry {} times. exception = {}", context.iteration(), context.exception().getMessage());
        }
    }

    private PushMessageResponse getMessageByApi(Long messageId) {
        PushMessageResponse response = client.getMessage(messageId);
        if (WAIT == response.getStatus()) {
            throw new WaitingException();    // ์กฐ๊ฑด์— ๋งŒ์กฑํ•˜์ง€ ๋ชปํ•˜๋Š” ๊ฒฝ์šฐ ํŠน์ • ์˜ˆ์™ธ ๋ฐœ์ƒ(๊ทธ๋ž˜์•ผ์ง€ retry๋จ)
        }
        return response;
    }

Usage new[Elastic|...]

public class PageEventAssembler implements AutoCloseable {
    ...
    private final Scheduler rowScheduler = Schedulers.newElastic("page-event-row");

    ...
    private Mono<PageEventRow> monoPageEventRow(Tenant tenant, PageEvent source, OrganizationMember member) {
        return Mono.fromCallable(() -> {
            PageEventRow.Body body = toBody(tenant, source, member);
            PageEventRow.Creator creator = toCreator(tenant, source);
            return new PageEventRow(source.getEventId(), source.getPageId(), body,
                                    ZonedDateTime.of(source.getCreatedAt(), member.getZone()), creator);
        })
                   .doOnError(this::logError)
                   .onErrorReturn(PageEventRow.EMPTY)
                // .subscribeOn(Schedulers.newElasitc("page-event-row")); <- ์ด๊ฑด ์“ฐ๋ ˆ๋“œ ๋ฌดํ•œ ์ฆ๊ฐ€ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ•˜๊ฒŒ๋จ
                   .subscribeOn(rowScheduler);  
    }

    @Override
    public void close() {
        if (rowScheduler.isDisposed()) {
            return;
        }
        rowScheduler.dispose();
    }
}
โš ๏ธ **GitHub.com Fallback** โš ๏ธ