신발 입찰 거래 프로젝트를 진행하면서 알림 기능을 구현해야 했다.
기능을 적용해야 할 부분은 다음과 같다.
1. 즉시 판매의 경우 구매 입찰을 등록한 구매자에게 결제 알림이 전송되어야 한다.
구매자 1에게 알림을 전송하는 방법은 여러가지가 있는데 Sse를 사용해보기로 했다
위키백과에 SSE에 대한 설명이 나와있다.
SSE ( 서버 전송 이벤트 )는 클라이언트가 HTTP 연결을 통해 서버로부터 자동 업데이트를 수신할 수 있도록 하는 서버 푸시 기술이며, 초기 클라이언트 연결이 설정된 후 서버가 클라이언트를 향한 데이터 전송을 시작하는 방법을 설명합니다. 이는 일반적으로 브라우저 클라이언트에 메시지 업데이트 또는 지속적인 데이터 스트림을 보내는 데 사용되며 클라이언트가 이벤트 스트림을 수신하기 위해 특정 URL을 요청하는 EventSource라는 JavaScript API를 통해 기본 브라우저 간 스트리밍을 향상시키도록 설계되었습니다. SSE의 미디어 유형은 text/event-stream 입니다 .
여기서 sse는 메모리에 저장되어 재부팅시 데이터가 유실되기 때문에 필요한 데이터(지금까지 온 알림 목록)는 DB에 저장하여 반환하기로 했다.
컨트롤러
@GetMapping(value = "/subscribe", produces = "text/event-stream")
public SseEmitter subscribe(@RequestHeader(value = "Last-Event-Id", required = false, defaultValue = "") String lastEventId,
Authentication authentication) {
return notificationService.subscribe(authentication.getName(), lastEventId);
}
설명
구매자 1은 subscribe메서드를 통해 서버와 연결 상태를 유지한다.(연결 요청을 처리하기 위해서, MIME Typ 을 text/event-stream형태로 받아줘야한다.) 헤더에 들어가는 Last-Event-ID 값은 이전에 받지 못한 이벤트가 존재하는 경우 받은 마지막 이벤트 ID 값을 넘겨 그 이후의 데이터(받지 못한 데이터)부터 받을 수 있게 할 수 있는 정보를 의미한다.
서비스
public SseEmitter subscribe(String memberId, String lastEventId) {
String emitterId = makeTimeIncludeId(memberId);
SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter(SSE_TIME_OUT));
emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));
String eventId = makeTimeIncludeId(memberId);
sendNotification(emitter, eventId, emitterId, "EventStream Created. [userId=" + memberId + "]");
// 클라이언트가 미수신한 Event 목록이 존재할 경우 전송하여 Event 유실을 예방
if (hasLostData(lastEventId)) {
sendLostData(lastEventId, memberId, emitterId, emitter);
}
return emitter;
}
도메인
@Entity
@Getter
@Setter
@Builder
@AllArgsConstructor
public class Notification {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "notification_number")
private Long notificationNumber;
@Column(name = "domain_number")
private Long domainNumber; // (notificationType + domainNumber)로 어떤 Repository단의 정보를 가져와야 하는지 찾기 위한 속성
@Column(name = "notification_content")
private String notificationContent;
@Enumerated(EnumType.STRING)
private NotificationType notificationType;
@ManyToOne
@JoinColumn(name = "receiver_number")
private Member receiver;
@Column(name = "created_at")
private LocalDateTime createdAt;
@Column(name = "is_read")
private boolean isRead;
protected Notification() {
}
}
현재는 결제 관련 알림만 구현해 놓은 상황이라 notificationType이 PAYMENT밖에없지만 여러 종류의 알림이 있을 수 있고알림마다 필요한 정보를 db단에서 가져오기 위해 domainNumber를 사용했다.
ex) notificationType = PAYMENT이고 domainNumber가 1이면 결제 DB 데이터 중 row번호(고유값)가 1번인 데이터를 가져옴
레포지토리 구현
public interface EmitterRepository {
SseEmitter save(String emitterId, SseEmitter sseEmitter);
void saveEventCache(String emitterId, Object event);
Map<String, SseEmitter> findAllEmitterStartWithByMemberId(String receiverId);
Map<String, Object> findAllEventCacheStartWithByMemberId(String receiverId);
void deleteById(String id);
void deleteAllEmitterStartWithMemberId(String memberId);
void deleteAllEventCacheStartWithMemberId(String memberId);
}
@Repository
@NoArgsConstructor
public class EmitterRepositoryImpl implements EmitterRepository {
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
private final Map<String, Object> eventCache = new ConcurrentHashMap<>();
@Override
public SseEmitter save(String emitterId, SseEmitter sseEmitter) {
emitters.put(emitterId, sseEmitter);
return sseEmitter;
}
@Override
public void saveEventCache(String eventCacheId, Object event) {
eventCache.put(eventCacheId, event);
}
@Override
public Map<String, SseEmitter> findAllEmitterStartWithByMemberId(String memberNumber) {
return emitters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(memberNumber))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@Override
public Map<String, Object> findAllEventCacheStartWithByMemberId(String memberId) {
return eventCache.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(memberId))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@Override
public void deleteById(String id) {
emitters.remove(id);
}
@Override
public void deleteAllEmitterStartWithMemberId(String memberId) {
emitters.forEach(
(key, emitter) -> {
if (key.startsWith(memberId)) {
emitters.remove(key);
}
}
);
}
@Override
public void deleteAllEventCacheStartWithMemberId(String memberId) {
eventCache.forEach(
(key, emitter) -> {
if (key.startsWith(memberId)) {
eventCache.remove(key);
}
}
);
}
}
public interface NotificationRepository extends JpaRepository<Notification, Long> {
// 알림 목록 보기 API에서 반환해줄 데이터
@Query(value = "select n from Notification n where n.receiver.memberId=:memberId order by n.createdAt desc ")
List<Notification> findNotificationsByReceiverIdOrderByCreatedAtDesc(@Param("memberId") String memberId);
}
전체 서비스 로직
public SseEmitter subscribe(String memberId, String lastEventId) {
String emitterId = makeTimeIncludeId(memberId);
SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter(SSE_TIME_OUT));
emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));
String eventId = makeTimeIncludeId(memberId);
sendNotification(emitter, eventId, emitterId, "EventStream Created. [userId=" + memberId + "]");
// 클라이언트가 미수신한 Event 목록이 존재할 경우 전송하여 Event 유실을 예방
if (hasLostData(lastEventId)) {
sendLostData(lastEventId, memberId, emitterId, emitter);
}
return emitter;
}
private String makeTimeIncludeId(String memberId) {
return memberId + "_" + System.currentTimeMillis();
}
private void sendNotification(SseEmitter emitter, String eventId, String emitterId, Object data) {
try {
emitter.send(SseEmitter.event()
.id(eventId)
.data(data)
.name("notification"));
} catch (IOException e) {
emitterRepository.deleteById(emitterId);
}
}
private boolean hasLostData(String lastEventId) {
return !lastEventId.isEmpty();
}
private void sendLostData(String lastEventId, String memberId, String emitterId, SseEmitter emitter) {
Map<String, Object> eventCaches = emitterRepository.findAllEventCacheStartWithByMemberId(memberId);
eventCaches.entrySet().stream()
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
.forEach(entry -> sendNotification(emitter, entry.getKey(), emitterId, entry.getValue()));
}
public void send(NotificationRequest notificationRequest) {
Notification notification = notificationRepository.save(createNotification(
notificationRequest.domainNumber(),
notificationRequest.receiver(),
notificationRequest.content(),
notificationRequest.notificationType()));
String receiverId = String.valueOf(notificationRequest.receiver().getMemberId());
String eventId = receiverId + "_" + System.currentTimeMillis();
Map<String, SseEmitter> emitters = emitterRepository.findAllEmitterStartWithByMemberId(receiverId);
emitters.forEach(
(key, emitter) -> {
emitterRepository.saveEventCache(key, notification);
sendNotification(emitter, eventId, key, NotificationResponse.createNotificationResponse(notification, notificationRequest));
}
);
}
private Notification createNotification(Long domainNumber, Member receiver, String content, NotificationType notificationType) {
LocalDateTime now = LocalDateTime.now();
return Notification.builder()
.domainNumber(domainNumber)
.receiver(receiver)
.notificationContent(content)
.notificationType(notificationType)
.createdAt(now)
.isRead(false)
.build();
}
위 서비스 로직 중 send 메서드는 즉시 판매 메서드를 호출하는 서비스 계층에서 EventPublisher를 주입받아 즉시 판매 이벤트를 발행하면 EventListener는 리스닝하고 있다가 발행된 이벤트를 처리해주게 된다.
private final ApplicationEventPublisher eventPublisher; // ApplicationEventPublisher 주입
@Transactional
public SellingBidResponse sellingNow(SellingBidRequest sellingBidRequest, String memberId) {
Member seller = memberRepository.findByMemberId(memberId)
.orElseThrow(() -> new MemberNotFoundException(ErrorCode.MEMBER_NOT_FOUND));
Bid buyBidInfo = findTargetBuyBidOne(sellingBidRequest);
dealRepository.save(
DealMapper.selllNowToDeal(buyBidInfo, seller.getMemberNumber()));
notifySellInfo(buyBidInfo, createNotificationRequest(buyBidInfo)); // 이벤트 메서드 호출
return BidMapper.toSellingBidResponse(buyBidInfo);
}
private void notifySellInfo(Bid bid, NotificationRequest notificationRequest) {
bid.publishEvent(eventPublisher, notificationRequest); //publishEvent 는 도메인에서 구현
}
@Entity
@Getter
@Setter
@Builder
@AllArgsConstructor
public class Bid {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "bid_number")
private long bidNumber;
@ManyToOne
@JoinColumn(name = "product_number", nullable = false)
private Product product;
@ManyToOne
@JoinColumn(name = "member_number", nullable = false)
private Member member;
@Column(name = "bid_product_size", nullable = false)
private String size;
@Column(name = "bid_price", nullable = false)
private int bidPrice;
@Column(name = "created_at", nullable = false)
private LocalDateTime createdAt;
@Column(name = "bid_deadline")
private LocalDateTime bidDeadLine;
@Enumerated(EnumType.STRING)
private BidStatus bidStatus;
@Enumerated(EnumType.STRING)
private BidType bidType;
protected Bid() {
}
public void publishEvent(ApplicationEventPublisher eventPublisher, NotificationRequest notificationRequest) {
eventPublisher.publishEvent(notificationRequest);
}
}
⚠️주의
로컬 환경에서 되던 알림 기능이 배포 환경에서는 잘 안될수도 있습니다.
저같은 경우는 백엔드 서버에서 AWS EC2, Https, nginx의 reverse proxy기능을 사용중이였고 배포환경에서 알림이 클라이언트 측에 전송이 되지 않았습니다.
nginx의 다음 설정을 추가하여 배포 환경에서 알림이 잘 전송되었습니다.
가상 서버 환경 설정 편집기 들어가기
location에 다음 두 줄 추가
proxy_set_header Connection '';
proxy_http_version 1.1;
테스트
사용자 계정 2개를 만들고 브라우저를 2개(크롬, 시크릿 모드)를 띄워서 해당 시나리오를 테스트 해보면 다음과 같이 즉시 판매하기시 구매자 계쩡으로 알림이 전송되는걸 확인할 수 있다.
위 코드는 아래 블로그를 참고하며 도움을 많이 받았다. (본인 프로젝트에 맞게 수정)
'SideProject' 카테고리의 다른 글
스위프 5기 FADE 프로젝트 돌아보기 후기 (0) | 2024.08.12 |
---|---|
Redis Cache 적용해서 Tps를 높여보기 (2) | 2024.03.05 |
구매 서비스에서 재고 동시성 이슈 해결해보기(2) - Redisson (1) | 2024.02.28 |
구매 서비스에서 재고 동시성 이슈 해결해보기(1) - JPA LOCK (0) | 2024.02.26 |