Jpa로 데이터 수정 작업 후 수정된 데이터를 카프카에 전송하는 방법

 

API를 통해 Mysql DB에서 데이터를 등록 및 수정 작업을 마친 후, 카프카를 통해 최신화된 데이터를 다른 서버에 전송하는 로직이 있다.

@Service
@RequiredArgsConstructor
public class Service {

    private final KafkaService kafkaService;
    private final Reader reader;
    private final Writer writer;
    
    @Transactional
    public void add(Request request) {
        // 데이터 저장
        Entity entity = writer.save(request);
        
        // 카프카로 데이터 전송
        kafkaService.send(savedEntity);
    }
    
    @Transactional
    public void update(Request request) {
        Entity entity = reader.findById(request.getId());
        
        writer.update(request, updatedEntity);
		
        // 카프카로 수정된 데이터 전송
        kafkaService.send(updatedEntity);
    }
    
    
}

 

※ 계층 구조
일반적인 Controller -> Service -> Repository구조에서 계층을 추가하여
Controller -> Service -> Component(Reader, Writer) -> Repository 로 사용중이다.
Component는 도메인별로 Reader, Writer가 존재하며 Service에서는 데이터를 조합한다.

 

다음은 수정 로직중에서 Spring Data Jpa를 아래와 같이 사용하고 있었다.

  • 변경감지를 하기 위해 findById()롤 데이터를 조회
  • 새로 추가된 데이터일 경우 save()를 사용하여 저장
  • 삭제 되는 데이터일 경우 @Modifying를 사용하여 데이터 삭제 (별도 옵션 미사용)

등록시에는 API로 보낸 데이터들이 엔티티에 존재하기 때문에 데이터 정합성에 문제가 없었다.

하지만, Mysql에는 수정된 데이터가 반영이 되었지만, 카프카에 전송시에는 수정되기전 데이터가 보내지고 있었다.

예를 들어, 기존 데이터가 1,2,3이 존재하였다. 3을 제거하고 4를 추가하면 1,2,4가 된다. DB에는1,2,4가 반영이 되었지만 실제 카프카에 전송시에는 1,2,3이 보내지고 있었다.

 

아래와 같이 요청값을 바탕으로 카프카에 전송하기 위한 데이터를 별도로 만드는 방법도 있다.

    @Transactional
    public void update(Request request) {
        Entity entity = reader.findById(request.getId());

        writer.update(request, updatedEntity);

        //카프카 전용 엔티티
        Entity KafkaEntity = new Entity();
        KafkaEntity.setData(reqeust);

        // 카프카로 수정된 데이터 전송
        kafkaService.send(KafkaEntity);
    }

하지만, 카프카에 보내야 되는 객체 구조가 복잡할 경우 코드량이 증가하는 단점이 존재한다.

 

Jpa의 원리를 이용한 방법이 있었다. 위에 설명한 수정 로직 일부의 작동 원리를 다시 살펴보자.

  • findById()를 통해 데이터 조회 후 영속성 컨텍스트에 담는다. 영속성 컨텍스트에 같은 @Id가 존재하면 DB에 조회하지 않고 영속성 컨텍스트에 있는 데이터를 꺼내서 사용한다.
  • save()는 쓰기 지연 SQL 저장소에 임시 저장되고 트랜잭션이 끝날떄 커밋을 수행하여 저장되어 있던 query들을 날린다.
  • @Modifying는 쓰기지연 저장소에 담는게 아니라 DB에 바로 쿼리를 수행한다.

데이터 싱크가 틀어진 원인은 영속성 컨텍스트에 담긴 최신화되지 않은 데이터를 가져다가 카프카로 전송해서 발생했다. 

수정작업이 끝난 이후 엔티티매니저를 통해 쓰기 지연 저장소에 담긴 쿼리도 flush()를 하여 DB에 반영해준뒤 영속성 컨텍스트를 비워준다. 그리고 다시 조회를 하면 수정된 데이터가 반영된 엔티티를 구할 수 있다.

 

아래와 같이 코드를 수정하였다.

@Service
@RequiredArgsConstructor
public class Service {

    private final KafkaService kafkaService;
    private final Reader reader;
    private final Writer writer;
    @PersistenceContext
    private EntityManager entityManager;
    
    @Transactional
    public void update(Request request) {
        Entity entity = reader.findById(request.getId());
        
        writer.update(request, updatedEntity);
        
        //플러시
        entityManager.flush();

        //영속성 컨텍스트 비워줌
        entityManager.clear();
        
        //JPQL로 join fetch 사용하여 조회
        Entity kafkaEntity = reader.findById(request.getId());
		
        // 카프카로 수정된 데이터 전송
        kafkaService.send(kafkaEntity);
    }
    
}

- @Transactional이 붙어있기 때문에 flush(), clear() 후에 RuntimeException이 발생해도 최종적으로는 DB에 반영되지 않는다.

 

작성하다가 의문이 든점

- flush()로 DB에 반영은 했지만 commit은 되지 않은 상태에서 JPQL로 조회를 했을때 수정된 데이터가 조회되는 이유는 뭘까?

=> 자기자신의 스레드에서 DB에 반영한 사항은 읽어 올 수 있다. 단지 다른 스레드에서 최신화된 데이터를 못읽는것 뿐이다(commit 전이기 때문에)

 

자료조사하다 새롭게 알게된 사실들

 

※ JPQL bulkDelete 작동 원리

Q. bulkUpdate 후에 영속성 컨텍스트 초기화를 하지 않고 select 쿼리 후 조회했을 때는 바뀐게 반영되지 않는데 bulkDelete에서는 바뀐게 반영된다. 왜 그럴까?

A. bulkUpdate 후에는 해당 엔티티들의 개수와 id는 변하지 않았기 때문에 DB에서 조회한 id 값들도 같을 것이고 1차 캐시에서 해당 엔티티들도 그대로 있을테니 1차 캐시에 있는 엔티티들을 불러온 것이고, bulkDelete 후에는 DB에서 조회한 id 들에 한해서만 영속성 컨텍스트 1차 캐시에서 값을 조회해오는 것이라 DB에서 삭제된 1차 캐시의 해당 엔티티는 불러오지 않는다.

https://www.inflearn.com/questions/531772/%EC%82%AD%EC%A0%9C-%EB%B2%8C%ED%81%AC-%EC%97%B0%EC%82%B0-flush-%ED%98%B8%EC%B6%9C

 

※ JPQL bulkDelete 작동 원리

JPQL을 사용시 DDL이든 DML이든 Query를 실행하기 전, 자동으로 flush() 메소드를 호출합니다.
이유는 EntityManager의 기본 flush 옵션이 FlushModeType.AUTO로 되어있기 때문입니다.
만약 커밋할 때만 flush 메소드를 호출하게 하고 싶다면, flushMode 타입을 COMMIT으로 설정해주면 됩니다.

단, 반드시 flush가 발생하는 것은 아니고 상황에 따라 다르다. 엔티티 작업이 겹치지 않으면 flush가 발생하지 않는다. 자세한 내용은 아래블로그 참조
https://jiwondev.tistory.com/242

 

https://blog.neonkid.xyz/234

 

※ 더티체킹

dirty-checking을 통해 데이터를 수정하는 쿼리는 flush 하는 시점에 더티체킹이 일어난다. 참고로 update 쿼리는 콘솔에 찍히지 않는다.

 

참고

https://w97ww.tistory.com/m/109

 

★ 정리 굿

https://jiwondev.tistory.com/242

 

'Tech > JPA' 카테고리의 다른 글

Jpa save vs saveAndFlush vs saveAll  (4) 2024.01.25
detached entity passed to persist 에러  (0) 2021.12.01