IT/Java

Java - Multi-Thread Programming

Normal_One 2019. 11. 17. 18:00

  신규 프로젝트에 들어가게 되었는데 API 통신 내부에서 병렬로 각각 처리를 해야 하는 임무를 받았습니다. 그래서 이번 기회에 열심히 Multi Thread에 대해 공부하게 되었고 나름 스스로 만족할 정도로 공부를 끝냈습니다. 아래 내용은 제가 공부한 Multi Thread에 대한 내용입니다. 

 

 - Thread 실행 Class

 Thread를 실행할 Class로 ThreadPoolExecutor를 생성하여 Thread Pool을 관리하는 방식입니다. 중간에 Thread를 Sleep 시키는 이유는 ThreadPoolExecutor가 어떤 방식으로 돌아가는지 Console로 확인하고 싶어서 넣었습니다.   ThreadPoolExecutor는 기본적으로 선언한 CorePoolSize 만큼 Thread를 실행하다가 CorePoolSize 이상의 Thread 실행이 들어오면 BlockingQueue에 Runnable들을 담습니다. 그러다가 BlockingQueue까지 꽉 차게 되면 그때 점진적으로 MaximumPoolSize까지 Thread 실행 개수를 늘립니다. 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package testProject;
 
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
public class test {
    
    private static int corePoolSize = 4;
    private static int maximumPoolSize = 4;
    private static int keepAliveTime = 60;
    private static int blockingQueueSize = 20;
    private static BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(blockingQueueSize);
    
    /**
     * @parameter 
     * corePoolSize : 최초 생성되어 유지되는 Thread 갯 수
     * maximumPoolSize : 최대 생성되는 Thread 갯 수
     * keepAliveTime : Core size로 Thread 실행되고 있을 때 IDLE 상태로 있는 Thread의 유지 시간
     * TimeUnit : KeepAliveTime의 단위 설정(시, 분, 초 등)
     * BlockingQueue<Runnable> : corePoolSize를 넘어 Thread가 들어올 때 임시 보관하는 Queue
     * ThreadFactory : Thread 생성 팩토리
     * RejectedExecutionHandler : maximum Size 및 Queue Size 초과하여 RejectedExecutionException 발생 시 실행하는 Handler 
     */
    //private static ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, blockingQueue, Executors.defaultThreadFactory(), new customRejectedHandler());
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, blockingQueue, Executors.defaultThreadFactory());
    
    public static void main(String[] args) throws InterruptedException {
        int seq = 0;
        Future<String> future = null;
        
        while(true){
            seq++;
            Runnable r1 = new testRunnable(seq);
            Runnable r2 = new testRunnable2(seq);
            Callable c1 = new testCallable(seq);    
 
            try {                
                executor.submit(r1);
                executor.submit(r2);
                future = executor.submit(c1);
                
                System.out.println("Active Count : " + executor.getActiveCount());
                System.out.println("Queue Size : " + executor.getQueue().size());
                
                //System.out.println("Future : " + future.get());
                //System.out.println("Future : " + future.isDone() );
                
            } catch (RejectedExecutionException e) {
                /**
                 * RejectedExecutionException 발생 시 대기 Queue의 Runnable 삭제 처리
                 */
                executor.remove(r1);
                executor.remove(r2);
                executor.remove(new FutureTask(c1));
                
                System.out.println("exception 발생 - seq : " + seq);
            } /*catch (ExecutionException e) {
                e.printStackTrace();
            }*/
            
            Thread.sleep(500);
        }
    }
}
 
cs

 31줄에 RejectedExecutionHandler를 만들어 놓고 쓰지 않은 이유는 Exception 발생 시 순서가 밀려서 Queue에 저장 된 Runnable 및 Callable를 지워서 Exception 발생 시 세 가지 Task 모두 실행되지 않게 하고 싶어서였습니다. RejectedExecutionHandler로 따로 Exception을 관리하고 싶다면 아래 제가 예제로 만들어둔 RejectedExecutionHandler를 참조하시면 됩니다. RejectedExecutionHandler를 선언하지 않으면 AbortPolicy로 설정되는데, Exception을 발생시킵니다. 이 외에도 여러 기본적인 설정들이 있는데

http://cris.joongbu.ac.kr/course/2018-1/jcp/api/java/util/concurrent/class-use/RejectedExecutionHandler.html 

 여기에 잘 설명되어 있으니 참조하시면 됩니다.

 

 Callable을 Remove할 때는 FutureTask를 이용했는데 기본적으로 BlockingQueue가 Runnable들을 담는 Queue이기 때문에 Callable을 Runnable로 캐스팅할 필요가 있는데, Callable을 Runnable로 캐스팅하려면 일반적인 방식으로는 불가능하고 FutureTask를 이용해야 합니다. 

 

 try 선언부 마지막을 보면 future.get()이 주석된 게 보이는데, future.get()을 하면 Callable이 Return 하는 결과 값을 받아볼 수 있습니다. 다만, future.get()을 넣어두면 Callable이 끝날 때까지 무작정 기다리기 때문에 필요에 따라 써야 될 것 같습니다.

 

 

 - Runnable 1번 

작업들이 밀려서 Queue Size 이상으로 담겨 버리는 것까지 확인하고 싶어서 Thread.sleep를 넣었습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package testProject;
 
public class testRunnable implements Runnable {
    
    int seq = 0;
    public testRunnable(int seq) {
        this.seq = seq;
    }
    
    public void run() {
        System.out.println("testRunnable1 : " + seq);
        
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
 
cs

 

- Runnable 2번

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package testProject;
 
public class testRunnable2  implements Runnable {
    
    int seq = 0;
    public testRunnable2(int seq) {
        this.seq = seq;
    }
    
    public void run() {
        System.out.println("testRunnable2 : " + seq);
        
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
 
cs

 

- Callable 1번

결과 값을 Return 받는 것도 해보고 싶어서 Callable로 만들었습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package testProject;
 
import java.util.concurrent.Callable;
 
public class testCallable  implements Callable<String> {
    
    int seq = 0;
    public testCallable(int seq) {
        this.seq = seq;
    }
 
    @Override
    public String call() throws Exception {
        System.out.println("testRunnable3 : " + seq);
        
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        return "SUCCESS";
    }
}
cs

 

- RejectedExecutionHandler

 RejectedExecutionException 발생 시 사용할 Handler 입니다. 단점은 여기서는 run 안에 있는 변수들을 꺼내올 수 있는 방법이 없다는 것이었습니다. 따라서 Exception 발생 시 다시 Runnable을 실행하는 용도 정도로만 사용 가능할 것으로 보입니다. Runnable 안에 변수를 확인할 수 있는 방법이 있다면 누가 좀 알려주시면 감사하겠습니다(Google에는 일단 없었습니다...).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package testProject;
 
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
 
public class customRejectedHandler implements RejectedExecutionHandler{
 
    @Override
    public void rejectedExecution(Runnable run, ThreadPoolExecutor executor) {
        /**
         * Custom 내용 구현
         */
    }
 
}
 
cs

 

  역시 새로운 걸 배우는 건 재밌는 것 같습니다. 물론 일로 떨어지지 않으면 혼자 나서서 공부하지는 않지만, 일로 떨어지면 주말에 공부를 하곤 하니 이 정도면 괜찮은 개발자 아닌가 하고 혼자 위로해 봅니다. 날씨가 점점 추워지는 데 모두 감기 조심하시길 바랍니다.