CompletableFuture 로 IO Bound 작업을 병렬화하기
— CompletableFutre, Java, Async — 6 min read
Java 에서 I/O Bound 작업을 병렬화 하는 법
들어가기에 앞서
미디어 파일에 대한 처리가 필요할 떄가 있다. 예를 들어 동영상의 경우 썸네일을 추출한다거나 여러 화질로 트랜스코딩을 진행해야 한다. 한 번에 여러 개의 동영상에 대한 처리 요구가 들어오게 되면 I/O 작업 때문에 처리 시간이 길어진다. 따라서 이 작업들을 병렬화하여 처리 시간을 줄일 수 있는 Java 의 CompletableFuture 과 적절한 ThreadPool 구성에 대한 개념까지 알아보자.
Our Goal
- CompletableFuture 의 기본 개념을 이해하고 응용할 수 있다.
- I/O Bound 작업에 알맞는 Thread Pool 의 구성을 이해하고 설정할 수 있다.
구현 과정
-
환경
- Spring Boot
- FFmpeg
- CompletableFuture
-
CompletableFuture 란?
Java8 부터 등장한 비동기 작업을 유연하게 수행할 수 있도록 지원해주는 클래스이다. 기존에 존재하던
Future
의 한계점들을 보완하기 위해 등장했다.- 블로킹을 통해서만 이후 결과 처리 가능
- 여러 비동기 작업 조합 불가능
- 예외 처리 불가능
이러한 한계점을 극복한 CopmletableFuture 로 IO Bound 작업을 병렬화하는 과정에 대해 살펴보자.
-
FFmpeg 을 통한 동영상 썸네일 추출 작업
IO Bound 작업으로 동영상 썸네일 추출하는 작업을 선택하였다. 도구는 FFmpeg 을 선택하였고 이를 Spring Boot 에서 쉽게 사용할 수 있도록 FFmpeg Wrapper 라이브러리를 적용해주었다.
implementation 'net.bramp.ffmpeg:ffmpeg:0.8.0'- FFmpeg 을 통한 썸네일 추출
public Supplier<String> extractThumbnail(String videoPath) throws IOException {ClassPathResource classPathResource = new ClassPathResource("thumbnails");final String path = classPathResource.getURI().getPath();return () -> {log.info("starting extracted thumbnail of {}", videoPath);final String thumbnailFilename = UUID.randomUUID().toString();FFmpegBuilder fFmpegBuilder = new FFmpegBuilder().addInput(videoPath).addExtraArgs("-ss", "00:00:01").addOutput(path + "/" + thumbnailFilename + ".png").setFrames(1).done();fFmpegExecutor.createJob(fFmpegBuilder).run();return thumbnailFilename;};} -
CompletableFuture 를 이용한 병렬화
public List<String> extractThumbnailsInParallel(List<String> videos) throws IOException{List<CompletableFuture<String>> completableFutures = new ArrayList<>();for (String video : videos) {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(extractThumbnail(video), executor).exceptionally(e -> {log.error("error occurring in extracted thumbnail of {} ", video, e);return exception;});completableFutures.add(completableFuture);}final CompletableFuture<List<String>> completableFuture = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).thenApply(v -> completableFutures.stream().map(CompletableFuture::join).collect(Collectors.toList()));return completableFuture.join();}CompletableFuture
를 통해 썸네일 추출 작업을 병렬화 한 코드이다. 필자는 Return Value 가 필요한 상황이기 때문에supplyAsync
를 사용하였고, 만약 필요없다면runAsync
를 사용해주면 된다.CompletableFuture.allOf
메서드를 통해 새로운 CompletableFuture 를 생성하고 이를 다시 현재 스레드에 join 하여 Async 로 동작하게 한 작업들이 모두 끝나면 Blocking 으로 결과 값을 받을 수 있다.이때 주의할 점은
exceptionally
나handle
메서드를 통해 예외를 제어해주지 않으면 현재 스레드에서 예외가 발생하게 된다.물론 join 없이 Non-Blocking 으로 구현한다면 현재 스레드에서 예외가 발생하지 않지만 비동기 작업이 정상적으로 이루어지지 않는 것에 대한 대비를 해주어야 하기 때문에 예외 처리를 해주어야 한다.
- join 과 get 의 차이점
- join 은 unchecked exception 을 throw 하고 get 은 checked exception 을 throw 한다.
- get 은 timeout 을 제어할 수 있다.
- join 은 interrupt 가 불가능하지만 get 은 가능하다.
- join 과 get 의 차이점
-
ForkJoinPool
ForkJoinPool 은 CompletableFutre 에서 default 로 설정되는 thread pool 이다. 이의 특징은 CPU Bound 연산을 수행할 때 work-stealing 알고리즘을 사용하여 작업이 밀려있는 다른 thread 의 작업 queue 에서 작업을 가져와 수행하기 때문에 CPU 를 좀 더 효율적으로 사용할 수 있다는 점이다.
이를 이용할 때 중요한 점은 가능한 가장 작은 단위로 작업을 분할할수록 work-stealing 알고리즘이 잘 작동하여 효율이 높아진다는 것이다.
필자는 IO Bound 작업이고 해당 작업과 서버 환경에서 세밀하게 thread pool 옵션들을 설정하기 위해
ThreadPoolExecutor
로 적용하였다. -
마치며
CompletableFuture 는 본 글에서 소개된 내용 말고도 연산을 여러 단계로 연계하거나 결합할 수 있는 다양한 기능을 가진 메서드들이 많아 유연한 비동기 프로그래밍을 지원한다. 참고에 잘 정리된 글에 대한 링크를 남겨두었으니 관심있게 보면 좋을 것 같다.
또한 thread pool 에 대해 짧게 다루어봤는데 좀 더 깊게 알아보고 성능 테스트까지 해보고 싶어 공부하고 정리해서 포스팅 해 볼 것이다. 본 포스팅에 사용된 코드는 여기에서 확인할 수 있다.