ch11 동시성 - LenKIM/everyone-is-effective-java-study GitHub Wiki
스레드는 여러 활동을 동시에 수행할 수 있게 해준다. 하지만 동시성 프로그래밍은 단일 스레드 프로그래밍보다 어렵다.
잘못될 수 있는 일이 늘어나고 문제를 재현하기도 어려워지기 때문이다.
- 동기화는 배타적 실행뿐 아니라 스레드 사이의 안정적인 통신에 꼭 필요하다. : 이는 한 스레드가 만든 변화가 다른 스레드에게 언제 어떻게 보이는지를 규정한 자바의 메모리 모델 때문이다.
- Tread.stop 은 사용하지 말자 : 다른 스레드를 멈추는 올바른 방법은, 첫 번째 스레드는 자신의 boolean필드를 폴링하면서 그 값이 true가 되면 멈춘다. 이 필드를 false로 초기화해놓고, 다른 스레드에서 이 스레드를 멈추고자 할 때 true로 변경하는 식이다.boolean 필드를 읽고 쓰는 작업은 원자적이라 어떤 프로그래머는 이런 필드에 접근할 때 동기화를 제거하기도 한다.
// 코드 78-1 잘못된 코드 - 이 프로그램은 얼마나 오래 실행될까? (415쪽)
public class StopThread {
private static boolean stopRequested;
public static void main(String[] args)
throws InterruptedException {
Thread backgroundThread = new Thread(() -> {
int i = 0;
while (!stopRequested)
i++;
});
backgroundThread.start();
TimeUnit.SECONDS.sleep(1);
stopRequested = true;
}
}
이는 잘못된 코드다. 왜일까?
원인은 동기화에 있다. 동기화하지 않으면 메인 스레드가 수정한 값을 백그라운드 스레드가 언제쯤에나 보게 될지 보증할 수 없다. // 스레드-세이프하지않다.
//원래 코드
while(!stopRquested)
i++;
//최적화한 코드
if(!stopRequested)
while(true)
i++
이는 Hoisting 라는 최적화 기법이다. 이 결과 프로그램은 응답 불가(liveness failure)상태가 되어 더 이상 진적이 없다.
이를 해결하기 위한 방법으로 synchronized
를 활용하자. 적절히 동기화해 스레드가 정상 종료된다.
// 코드 78-2 적절히 동기화해 스레드가 정상 종료한다. (416쪽)
public class StopThread {
private static boolean stopRequested;
private static synchronized void requestStop() {
stopRequested = true;
}
private static synchronized boolean stopRequested() {
return stopRequested;
}
public static void main(String[] args)
throws InterruptedException {
Thread backgroundThread = new Thread(() -> {
int i = 0;
while (!stopRequested()) {
i++;
System.out.println(i);
}
});
backgroundThread.start();
TimeUnit.SECONDS.sleep(1);
requestStop();
}
}
- 쓰기 메서드(requestStop)과 읽기 메서드(stopReqeusted) 모두를 동기화했음에 주목하자.
만약 쓰기 메서드만 동기화해서는 충분하지 않습니다. 쓰기와 읽기 모두가 동기화되지 않으면 동작을 보장하지 않는다.
동기화는 배타적 수행과 스레드 간 통신이라는 두 가지 기능을 수행하는데, 이 코드에서는 그중 통신 목적으로만 사용한 것
더 빠른 대안으로 volatile
를 사용하는 방법도 있다. volatile
한정자는 배타적 수행과는 상관없고, 항상 가장 최근에 기록된 값을 읽게 됨을 보장합니다.
// 코드 78-3 volatile 필드를 사용해 스레드가 정상 종료한다. (417쪽)
public class StopThread {
private static volatile boolean stopRequested;
public static void main(String[] args)
throws InterruptedException {
Thread backgroundThread = new Thread(() -> {
int i = 0;
while (!stopRequested)
i++;
});
backgroundThread.start();
TimeUnit.SECONDS.sleep(1);
stopRequested = true;
stopRequested = false;
stopRequested = false;
}
}
그러나, volatile 도 주의깊게 사용해야 한다. 만약 매번 고유한 값을 반환할 의도라면 굳이 필요하지 않습니다.
또는 java.util.concurrent.atomic
을 활용해서 락-프리 동기화를 해결하자.
private static final volatile int num = 0;
// private static final AtomicLong nextSerialNum = new AtomicLong();
public static long generatedSerialNum(){
return num++;
// return nextSerialNum.getAndIncrement();
}
애초에 가변데이터를 사용하지 않는 것이 정답일 수 있다.
- 가변 데이터는 단일 스레드에서만 쓰도록 하자.
- 응답 불가와 안전 실패를 피하려면 동기화 메서드나 동기화 블록 안에서는 제어를 클라이언트에 양도하면 안 된다.
- 동기화된 영역 안에서는 재정의할 수 있는 메서드를 호출해서는 안되며, 클라이언트가 넘겨준 함수 객체를 호출해서도 안된다.
예시를 보자.
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
// 관찰자 패턴을 구현하여, 원소가 추가되면 알려주는 집합 (420-425쪽)
public class ObservableSet<E> extends ForwardingSet<E> {
public ObservableSet(Set<E> set) { super(set); }
// 코드 79-1 잘못된 코드. 동기화 블록 안에서 외계인 메서드를 호출한다. (420쪽)
// private final List<SetObserver<E>> observers
// = new ArrayList<>();
// public void addObserver(SetObserver<E> observer) {
// synchronized(observers) {
// observers.add(observer);
// }
// }
//
// public boolean removeObserver(SetObserver<E> observer) {
// synchronized(observers) {
// return observers.remove(observer);
// }
// }
// private void notifyElementAdded(E element) {
// synchronized(observers) {
// for (SetObserver<E> observer : observers)
// observer.added(this, element);
// }
// }
// // 코드 79-3 외계인 메서드를 동기화 블록 바깥으로 옮겼다. - 열린 호출 (424쪽)
// private void notifyElementAdded(E element) {
// List<SetObserver<E>> snapshot = null;
// synchronized(observers) {
// snapshot = new ArrayList<>(observers);
// }
// for (SetObserver<E> observer : snapshot)
// observer.added(this, element);
// }
// 코드 79-4 CopyOnWriteArrayList를 사용해 구현한 스레드 안전하고 관찰 가능한 집합 (425쪽)
private final List<SetObserver<E>> observers =
new CopyOnWriteArrayList<>();
public void addObserver(SetObserver<E> observer) {
observers.add(observer);
}
public boolean removeObserver(SetObserver<E> observer) {
return observers.remove(observer);
}
private void notifyElementAdded(E element) {
for (SetObserver<E> observer : observers)
observer.added(this, element);
}
@Override public boolean add(E element) {
boolean added = super.add(element);
if (added)
notifyElementAdded(element);
return added;
}
@Override public boolean addAll(Collection<? extends E> c) {
boolean result = false;
for (E element : c)
result |= add(element); // notifyElementAdded를 호출한다.
return result;
}
}
// 집합 관찰자 콜백 인터페이스 (421쪽)
public interface SetObserver<E> {
// ObservableSet에 원소가 더해지면 호출된다.
void added(ObservableSet<E> set, E element);
}
java.util.concurrent
라는 패키지가 등장한다. 이 패키지는 실행자 프레임워크(Executor Framework)라고 하는 인터페이스 기반의 유연한 테스크 실행 기능을 담고 있다.
그래서, 뛰어난 작업 큐를 단 한줄로 생성할 수 있다.
ExcecutorSerivce exec = Excutors.newSingleThreadExecturoe();
다음은 이 실행자에 실행할 테스크(task)를 넘기는 방법이다.
exec.excute(runnable);
그리고 다음은 실행자를 우하하게 종료시키는 방법이다.
exec.shutdown();
실행자 서비스는 그 외에도
- 특정 태스크가 완료되기를 기다린다.
- 태스크 모음 중 아무것 하나(invokeAny 메소드) 혹은 모든 태스크(invoke All 메서드)가 완료되기를 기다린다.
- 실행자 서비스가 종료하기를 기다린다(awaitTermination 메서드)
- 완료된 테스크들의 결과를 차례로 받는다(ExecutorCompletionService 이용)
- 태스크를 특정 시간에 혹은 주기적으로 실행하게 한다.(ScheduledThreadPoolExecutor)
작업 큐를 손수 만드는 일은 삼가야 하고, 스레드를 직접 다루는 것도 일반적으로 삼가야 한다.
스레드를 직접 다루면 Thread가 작업 단위와 수행 메커니즘 역할을 모두 수행하게 된다. 반면 실행자 프레임워크에서는 작업 단위와 실행 메커니즘이 분리된다. 작업 단위를 나타내는 핵심 추상 개념이 태스크다.
태스크에는 두 가지가 있다.
- Runnable
- Callable(Runnable과 유사하지만, 값을 반환받고 임의의 예외를 던질 수 있다.)
/**
* Returns a {@link Callable} object that, when
* called, runs the given task and returns the given result. This
* can be useful when applying methods requiring a
* {@code Callable} to an otherwise resultless action.
* @param task the task to run
* @param result the result to return
* @param <T> the type of the result
* @return a callable object
* @throws NullPointerException if task null
*/
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
태스크를 수행하는 일반적인 매커니즘이 바로 실행자 서비스다. 태스크 수행을 실행자 서비스에 맡기면 원하는 태스크 수행 정책을 선택할 수 있고, 생각이 바뀌면 언제든 바꿀 수 있다. 핵심은 실행자 프레임워크가 작업 수행을 담당해준다는 것!
자바7이 되면서 실행자 프레임워크에서느 포크-조인(fork-join)태스크를 지원하도록 확장되었지만, 포크-조인을 언제 써봤던가... 취준생이였던가.
ForkJoinTask의 인스턴스는 작은 하위 태스크로 나뉠 수 있고, ForkJoinPool을 구성하는 스레드들이 이 태스크들을 처리하며, 일을 먼저 끝낸 스레드는 다른 스레드의 남은 태스크를 가져와 대신 처리할 수 있다.
동시성 유틸리는 java.util.concurrent
를 말합니다.
이는 크게 세 범주로 나눔
- 실행자 프레임워크
- 동시성 컬렉션(concurrent collection)
- 동기화 장치(synchronizer)
실행자 프레임워크는 아이템80 에서 살펴봤고, 나머지 2가지에 대해서 알아보자.
동시성 컬렉션은 List,Queue,Map 같은 표준 컬렉션 인터페이스에 동시성을 가미해 구현한 고성능 컬렉션.
-
동시성을 무력화하는 건 불가능하며, 외부에서 락을 추가로 사용하면 오히려 속도가 느려진다.
-
동시성을 무력화하는 것을 못하므로, 여러 메서드를 원자적으로 묶어 호출하는 일 또한 불가능하다. 그래서 여러 기본 동작을 하나의 원자적 동작으로 묶는 '상태 의존적 수정' 메서드들이 추가되었다. 예를 들면 Map의
putIfAbsend(key, value)
메서드는 주어진 키에 매핑된 값이 아직 없을 때만 새 값을 집어넣습니다. 그리고 기존 값이 있었다면 그 값을 반환하고, 없었다면 null을 반환합니다. 이 메서드 덕에 스레드 안전한 정규화 맵을 쉽게 구현할 수 있습니다.import java.util.concurrent.*; // ConcurrentMap으로 구현한 동시성 정규화 맵 public class Intern { // 코드 81-1 ConcurrentMap으로 구현한 동시성 정규화 맵 - 최적은 아니다. (432쪽) private static final ConcurrentMap<String, String> map = new ConcurrentHashMap<>(); // public static String intern(String s) { // String previousValue = map.putIfAbsent(s, s); // return previousValue == null ? s : previousValue; // } // 코드 81-2 ConcurrentMap으로 구현한 동시성 정규화 맵 - 더 빠르다! (432쪽) public static String intern(String s) { String result = map.get(s); if (result == null) { result = map.putIfAbsent(s, s); if (result == null) result = s; } return result; } }
/** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
Concurrentmap으로 구현한 동시성 정규화 맵의 경우에는 get같은 검색 기능에 최적화되었다. 따라서 get을 먼저 호출하여 필요할 때만 putIfAbsent를 호출하면 더 빠르다.
-
Collections.synchronizedMap 보다는 ConcurrentHashMap을 사용하는 게 더 좋다. 동기화된 맵을 동시성 맵으로 교체하는 것만으로 동시성 애플리케이션의 성능이 극적으로 개선된다.
-
그 외에도 Queue를 확장한 BlockingQueue에 추가된 메서드 중 take는 큐의 첫 원소를 꺼냅니다. 이때 만약 큐가 비었다면 새로운 원소가 추가될 때까지 기랍니다. 이런 특성 덕에 BlockingQueue는 작업 큐(생산자-소비자 큐)로 쓰기에 적합합니다. 작업 큐는 하나 이상의 생산자(producer)스레드가 작업(work)을 큐에 추가하고, 하나 이상의 소비자(consumer)스레드가 큐에 있는 작업을 꺼내 처리하는 형태.
-
그러므로, ThreadPoolExceutor 안에는 BlockingQueue를 자주 사용합니다.
그 밖에도 자주 사용하는 동기화 장치로는 CountDownLatch와 Semaphore가 있고, CyclicBarrier와 Exchanger는 그보다 덜 쓰입니다. 그리고 가장 강력한 동기화 장치는 바로 Phaser입니다.
카운트다운 래치는 일회성 장벽으로, 하나 이상의 스레드가 또다른 하나 이상의 스레드 작업이 끝날 때까지 기다리게 합니다. 유일한 생성자로 int 값을 받으며, 이 값이 래치의 countDown 메서드를 몇번 호출해야 대기 중인 스레드들을 깨우는지 결정합니다.
이를 활용한 CountDownLatch - 동시 실행 시간을 재는 간단한 프레임워크
// 코드 81-3 동시 실행 시간을 재는 간단한 프레임워크 (433-434쪽)
public class ConcurrentTimer {
private ConcurrentTimer() { } // 인스턴스 생성 불가
public static long time(Executor executor, int concurrency,
Runnable action) throws InterruptedException {
CountDownLatch ready = new CountDownLatch(concurrency);
CountDownLatch start = new CountDownLatch(1);
CountDownLatch done = new CountDownLatch(concurrency);
for (int i = 0; i < concurrency; i++) {
executor.execute(() -> {
ready.countDown(); // 타이머에게 준비를 마쳤음을 알린다.
try {
start.await(); // 모든 작업자 스레드가 준비될 때까지 기다린다.
action.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
done.countDown(); // 타이머에게 작업을 마쳤음을 알린다.
}
});
}
ready.await(); // 모든 작업자가 준비될 때까지 기다린다.
long startNanos = System.nanoTime();
start.countDown(); // 작업자들을 깨운다.
done.await(); // 모든 작업자가 일을 끝마치기를 기다린다.
return System.nanoTime() - startNanos;
}
}
API 문서에 안정성 수준을 나타내는데, 이떄 synchronized 한정자에 대한 오해가 있다.
- 자바독이 기본 옵션에서 생성된 API문서에는 synchronized 한정자가 포함되지 않는다. 메서드 선언에는 synchronized 한정자를 선언할지는 구현 이슈일뿐 API에 속하지 않는다. 따라서 이것만으로 스레드 세이프하다고 믿을 수 없다.
- 멀티스레드 환경에서도 API를 안전하게 사용하게 하려면 클래스가 지원하는 스레드 안전성 수준을 정확히 명시해야 한다.
- 다음은 안전성이 높은 순이다.
- 불변(Immutable)
- 무조건적 스레드 안전(unconditionally thread-safe) - AtomicLong, ConcurrentHashMap
- 조건부 스레드 안전(conditionally thread-safe) - Collections.synchronizedXX 레퍼 메서드들이 여기에 속하는데, 이 컬렉션들이 반환한 반복자는 외부에서 동기화해야 한다.
- 스레드 안전하지 않음(not thread-safe)
- 스레드 적대적(thread-hostile)
- 다음은 안전성이 높은 순이다.
/**
* Returns a synchronized (thread-safe) map backed by the specified
* map. In order to guarantee serial access, it is critical that
* <strong>all</strong> access to the backing map is accomplished
* through the returned map.<p>
*
* It is imperative that the user manually synchronize on the returned
* map when iterating over any of its collection views:
* <pre>
* Map m = Collections.synchronizedMap(new HashMap());
* ...
* Set s = m.keySet(); // Needn't be in synchronized block
* ...
* synchronized (m) { // Synchronizing on m, not s!
* Iterator i = s.iterator(); // Must be in synchronized block
* while (i.hasNext())
* foo(i.next());
* }
* </pre>
* Failure to follow this advice may result in non-deterministic behavior.
*
* <p>The returned map will be serializable if the specified map is
* serializable.
*
* @param <K> the class of the map keys
* @param <V> the class of the map values
* @param m the map to be "wrapped" in a synchronized map.
* @return a synchronized view of the specified map.
*/
public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) {
return new SynchronizedMap<>(m);
}
synchronizedMap의 javadoc이 좋은 예시이다. 만약 docs의 예시에 따르지 않으면 예측할 수 없는 동작이 일어난다.
여기서, mutex를 final로 선언한 것을 주목하자~!! lock 필드는 항상 final로 선언해야 한다.
지연초가화는 필드의 초기화 시점을 그 값이 처음 필요할 때까지 늦추는 기법이다.
이것을 왜 신중히 해야하는가?
지연초기화는 양날의 검이다. 클래스 혹은 인스턴스 생성 시의 초기화 비용은 줄지만 그 대신 지연 초기화하는 필드에 접근하는 비용은 커진다. 지연 초기화하려는 필드들 중 결국 초기화가 이뤄지는 비율에 따라, 실제 초기화에 드는 비용에 따라, 초기화된 각 필드를 얼마나 빈번히 호출하느냐에 따라 지연 초기화가 실제로는 성능이 느려지게 할 수도 있다.
- 대부분의 상황에서 일반적인 초기화가 지연 초기화보다 낫다.
일반적인 방법은
private static FieldType computeFieldValue() {
return new FieldType();
}
// 코드 83-1 인스턴스 필드를 초기화하는 일반적인 방법 (443쪽)
private final FieldType field1 = computeFieldValue();
그러나, 지연 초기화가 초기화 순환성(initialization circularity)을 깨뜨릴 것 같으면 synchronized를 단 접근자를 사용하자.
// 코드 83-2 인스턴스 필드의 지연 초기화 - synchronized 접근자 방식 (443쪽)
private FieldType field2;
private synchronized FieldType getField2() {
if (field2 == null)
field2 = computeFieldValue();
return field2;
}
만약 성능 때문에 정적 필드를 지연 초기화해야 한다면 지연 초기화 홀더 클래스(lazy initialization holder class) 관용구를 사용하자.
// 코드 83-3 정적 필드용 지연 초기화 홀더 클래스 관용구 (443쪽)
private static class FieldHolder {
static final FieldType field = computeFieldValue();
}
private static FieldType getField() { return FieldHolder.field; }
성능 때문에 인스턴스 필드를 지연 초기화해야 한다면 이중검사(double-check) 관용구를 사용하자.
// 코드 83-4 인스턴스 필드 지연 초기화용 이중검사 관용구 (444쪽)
private volatile FieldType field4;
private FieldType getField4() {
FieldType result = field4;
if (result != null) // 첫 번째 검사 (락 사용 안 함)
return result;
synchronized(this) {
if (field4 == null) // 두 번째 검사 (락 사용)
field4 = computeFieldValue();
return field4;
}
}
// 코드 83-5 단일검사 관용구 - 초기화가 중복해서 일어날 수 있다! (445쪽)
private volatile FieldType field5;
private FieldType getField5() {
FieldType result = field5;
if (result == null)
field5 = result = computeFieldValue();
return result;
}
위 코드에서 volatile 을 주의깊게 보자. 초기화를 중복으로 일으키는 문제가 있지만- 중복을 체크할 필요가 없다.
대부분의 필드는 지연시키지 말고 곧바로 초기화해야 한다. 성능 때문에 혹은 위험한 초기화 순환을 막기 위해 꼭 지연 초기화를 써야 한다면 올바른 지연 초기화 기법을 사용하자. 인스턴스 필드에는 이중검사 관용구를, 정적 필드에는 지연 초기화 홀더 클래스 관용구를 사용하자. 반복해 초기화해도 괜찮은 인스턴스 필드에는 단일검사 관용구도 고려 대상이다.
정확성이나 성능이 스레드 스케줄러에 따라 달라지는 프로그램이라면 다른 플랫폼에 이식하기 어렵다.
- 실행 가능한 스레드 수를 적게 유지하는 주요 기법은 각 스레드가 무언가 유용한 작업을 완료한 후에는 다음 일거리가 생길 때까지 대기하도록 하는 것
- 스레드는 당장 처리해야 할 작업이 없다면 실행돼서는 안 된다.
- 스레드는 절대 바쁜 대기 상태가 되면 안 된다. 공유 객체의 상태가 바뀔 때까지 쉬지 않고 검사해서는 안 된다는 뜻이다.
// 코드 84-1 끔찍한 CountDownLatch 구현 - 바쁜 대기 버전! (447쪽)
public class SlowCountDownLatch {
private int count;
public SlowCountDownLatch(int count) {
if (count < 0)
throw new IllegalArgumentException(count + " < 0");
this.count = count;
}
public void await() {
while (true) {
synchronized(this) {
if (count == 0)
return;
}
}
}
public synchronized void countDown() {
if (count != 0)
count--;
}
}
Thread.yield ????? 가 뭐지... 스레드 우선순위는 또 뭘까....
스레드 우선순위는 이미 잘 동작하는 프로그램의 서비스 품질을 높이기 위해 드물게 쓰일 수는 있지만, 간신히 동작하는 프로그램을 '고치는 용도'로 사용해서는 절대 안 된다.