Spring Batch ‐ 스프링 배치 청크 프로세스 이해 - dnwls16071/Backend_Study_TIL GitHub Wiki
- Chunk란, 여러 개의 아이템을 묶은 하나의 덩어리, 블록을 의미한다.
- 한 번에 하나씩 아이템을 입력 받아 Chunk 단위의 덩어리로 만든 후, Chunk 단위로 트랜잭션을 처리한다. 즉, Chunk 단위의 Commit과 Rollback이 이루어진다.
- 일반적으로 대용량 데이터를 한 번에 처리하는 것이 아니라 청크 단위로 쪼개어서 더 이상 처리할 데이터가 없을 때까지 반복해서 입출력하는데 사용된다.
-
Chunk<I>
vsChunk<O>
-
Chunk<I>
는 ItemReader로부터 읽은 하나의 아이템을 Chunk에서 정한 개수만큼 반복해서 저장하는 타입 -
Chunk<O>
는 ItemReader로부터 전달받은Chunk<I>
를 참조해서 ItemProcessor에서 적절하게 가공, 필터링한 다음 ItemWriter에 전달하는 타입
-
@Bean
public Step step1() {
return new StepBuilder("step1", jobRepository)
.<String, String> chunk(3, platformTransactionManager)
.reader(new ListItemReader<>(Arrays.asList("item1", "item2", "item3", "item4", "item5")))
.processor(new ItemProcessor<String, String>() {
@Override
public String process(String item) throws Exception {
Thread.sleep(300);
System.out.println("item = " + item);
return "my" + item;
}
})
.writer(new ItemWriter<String>() {
@Override
public void write(Chunk<? extends String> items) throws Exception {
Thread.sleep(300);
System.out.println("items = " + items);
}
})
.build();
}
- ChunkOrientedTasklet은 스프링 배치에서 제공하는 Tasklet의 구현체로서 Chunk 지향 프로세스를 담당하는 도메인 객체이다.
- ItemReader, ItemWriter, ItemProcessor를 사용해 Chunk 기반의 데이터 입출력 처리를 담당한다.
- TaskletStep에 의해 반복적으로 실행되며, ChunkOrientedTasklet이 실행될 때마다 매번 새로운 트랜잭션이 생성되어 처리가 이루어진다.
- Exception이 발생할 경우, 해당 Chunk는 롤백이 되며 이전에 커밋한 Chunk는 완료된 상태가 유지된다.
- 내부적으로 ItemReader를 핸들링하는 ChunkProvider와 ItemProcessor, ItemWriter를 핸들링하는 ChunkProcessor 타입의 구현체를 가진다.
@Bean
public Step step1() {
return new StepBuilder("step1", jobRepository)
.<String, String> chunk(3, platformTransactionManager) // Chunk 사이즈 지정한다.
.reader(new ListItemReader<>(Arrays.asList("item1", "item2", "item3", "item4", "item5"))) // 데이터를 읽는다.
.processor(new ItemProcessor<String, String>() { // 데이터를 가공한다.
@Override
public String process(String item) throws Exception {
Thread.sleep(300);
System.out.println("item = " + item);
return "my_" + item;
}
})
.writer(new ItemWriter<String>() { // 데이터를 쓴다.
@Override
public void write(Chunk<? extends String> items) throws Exception {
Thread.sleep(300);
System.out.println("items = " + items);
}
})
.build();
}
-
ChunkProvider
- ItemReader를 사용해서 소스로부터 아이템을 Chunk Size만큼 읽어서 Chunk 단위로 만들어 제공하는 도메인 객체이다.
- Chunk를 만들고, 내부적으로 반복문을 사용해서 ItemReader.read()를 호출해서 item을 계속 Chunk에 쌓는다.
- 외부로부터 ChunkProvider가 호출될 때마다 항상 새로운 Chunk가 생성된다.
- 반복문 종료 시점
- Chunk Size만큼 읽으면 반복문이 종료되고 ChunkProcessor로 넘어간다.
- ItemReader가 읽은 item이 null일 경우, 반복문 종료 및 해당 Step 반복문까지 종료된다.
- 기본 구현체로서 SimpleChunkProvider와 FaultTolerantChunkProvider가 있다.
-
ChunkProcessor
- ItemProcessor를 사용해서 Item을 변형, 가공, 필터링하고 ItemWriter를 사용해서 Chunk 데이터를 저장 출력한다.
- Chunk를 만들고 앞에서부터 넘어온 Chunk의 item을 한 건씩 처리한 후 Chunk에 저장한다.
- 외부로부터 ChunkProcessor가 호출될 때마다 항상 새로운 Chunk가 생성된다.
- ItemProcessor는 설정 시 선택사항으로서 만약 객체가 존재하지 않을 경우 ItemReader에서 읽은 item 그대로가 Chunk에 저장된다.
- ItemProcessor 처리가 완료되면 Chunk에 있는 List을 ItemWriter에게 전달한다.
- ItemWriter 처리가 완료되면 Chunk 트랜잭션이 종료되고 Step 반복문에서 ChunkOrientedTasklet가 새롭게 실행된다.
- ItemWriter는 Chunk Size만큼 데이터를 Commit하기 때문에 Chunk Size는 곧 Commit Interval이 된다.
- 기본 구현체로서 SimpleChunkProcessor와 FaultTolerantChunkProcessor가 있다.
- ItemReader
- 다양한 입력으로부터 데이터를 읽어서 제공하는 인터페이스
- Flat 파일 : csv, txt
- XML, Json
- Database
- JMS, Rabbit MQ같은 메시지 큐 서비스
- Custom Reader
- ChunkOrientedTasklet 실행 시 필수적 요소로 설정해야 한다.
- 다양한 입력으로부터 데이터를 읽어서 제공하는 인터페이스
- ItemWriter
- Chunk 단위로 데이터를 받아 일괄 출력 작업을 위한 인터페이스
- Flat 파일 : csv, txt
- XML, Json
- Database
- JMS, Rabbit MQ같은 메시지 큐 서비스
- Custom Writer
- 아이템 하나가 아닌 아이템 리스트를 전달받는다.
- ChunkOrientedTasklet 실행 시 필수적 요소로 설정해야 한다.
- Chunk 단위로 데이터를 받아 일괄 출력 작업을 위한 인터페이스
- ItemProcessor
- 데이터를 출력하기 전에 데이터를 가공, 변형, 필터링하는 역할
- ItemReader 및 ItemWriter와 분리되어 비즈니스 로직을 구현할 수 있다.
- ItemReader로부터 받은 아이템을 특정 타입으로 변환해서 ItemWriter에 넘겨줄 수 있다.
- ItemReader로부터 받은 아이템 중 필터 과정을 거쳐 원하는 아이템들만 ItemWriter에게 넘겨줄 수 있다.
- ChunkOrientedTasklet 실행 시 선택적 요소이기 때문에 청크 기반 프로세싱에서 ItemProcessor 단계가 필수 단계는 아니다.
@Component
public class CustomItemReader implements ItemReader<Customer> {
private final List<Customer> list;
public CustomItemReader(List<Customer> list) {
this.list = new ArrayList<>(list);
}
@Override
public Customer read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (!list.isEmpty()) {
return list.remove(0);
};
return null;
}
}
@Component
@RequiredArgsConstructor
public class CustomItemProcessor implements ItemProcessor<Customer, Customer> {
@Override
public Customer process(Customer customer) throws Exception {
customer.setName(customer.getName());
return customer;
}
}
@Component
@RequiredArgsConstructor
public class CustomItemWriter implements ItemWriter<Customer> {
@Override
public void write(Chunk<? extends Customer> chunk) throws Exception {
chunk.forEach(System.out::println);
}
}
- ItemReader와 ItemWriter 처리 과정 중 상태를 저장하고 오류가 발생하면 해당 상태를 참조하여 실패한 곳에서 재시작하도록 지원한다.
- 리소스를 열고 닫아야 하며 입출력 장치 초기화 등의 작업을 해야 하는 경우 사용한다.
- ExecutionContext를 매개변수로 받아서 상태 정보를 업데이트 한다.
- ItemReader 및 ItemWriter는 ItemStream을 구현해야 한다.
- 실제 트랜잭션의 시작 지점 : RepeatTemplate를 통해 ItemReader를 시작하는 시점에서부터
- Chunk 사이즈만큼 아이템을 담으면서 ItemReader를 반복한다.
- Chunk 사이즈를 초과했다면 SimpleChunkProcessor에 읽었던 아이템을 전달한다.
- ItemProcessor에서 Iterator를 돌려 아이템을 Chunk에 추가하고 Chunk가 ItemWriter에 담긴 아이템들을 넘기고 ItemWriter가 DB에 저장한다.
- 커밋 주기는 청크 사이즈와 같다.