Monday, August 25, 2008

Thread Executors in java

With Java 1.4, when we used to write threaded applications, we did not have much options in limiting and reusing the same thread for the available tasks. I had written applications earlier where each process that needed to be executed freely (or a process which can be executed on a thread) used to create its own thread. And once a thread was created and used, it was simply discarded. New threads were created whenever needed.

With java 1.5, there is an executor class which allows you to create controlled thread pools and execute the list of tasks on the same number of threads. If the number of tasks exceeds the number of threads then these extra tasks just wait for a thread to become available and then start their execution.

A simple threaded server using java 1.4 :

1 import java.io.*;
2 import java.net.*;
3 import java.util.*;
4
5 public class testPool implements Runnable
6 {
7   private int port;
8   private int active, total;
9
10   public testPool(int port)
11   {
12     this.port = port;
13     System.out.println("Thread pool constructor - creating thread");
14     new Thread(this).start();
15   }
16
17   public void run()
18   {
19     try
20     {
21       System.out.println("Thread pool - new thread created");
22       ServerSocket ss = new ServerSocket(port);
23       while(true)
24       {
25         Socket s = ss.accept();
26         total++;
27         new Handler(s);
28       }
29     }catch(Exception ex)
30     {
31       ex.printStackTrace();
32     }
33   }
34
35   public class Handler implements Runnable
36   {
37     private Socket socket;
38     public Handler(Socket s)
39     {
40       this.socket = s;
41       System.out.println("Handler constructor - creating thread");
42       new Thread(this).start();
43     }
44     public void run()
45     {
46       System.out.println(Thread.currentThread().getName()+" - Handler - new thread created");
47       active++;
48       boolean loop = true;
49       try
50       {
51         BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
52         DataOutputStream out = new DataOutputStream(socket.getOutputStream());
53         while(loop)
54         {
55           String cmd = in.readLine();
56           if(cmd.equals("QUIT"))
57           {
58             loop = false;
59             socket.close();
60           }
61           else if(cmd.equals("INFO"))
62           {
63             System.out.println("Active Connections : "+active);
64             System.out.println("Total Connections : "+total);
65           }
66           else
67           {
68             System.out.println("Command = "+cmd);
69           }
70         }
71       }catch(Exception ex)
72       {
73         ex.printStackTrace();
74       }
75       active--;
76     }
77   }
78
79   public static void main(String[] args)
80   {
81     System.out.println("Starting Daemon");
82     new testPool(Integer.parseInt(args[0]));
83   }
84 }


Compile and run the program

jayant@jayantbox:~/myprogs/java$ java -cp . testPool 5000
Starting Daemon
Thread pool constructor - creating thread
Thread pool - new thread created


Now, as you keep on connecting to localhost 5000 port, you can see that new threads are created. Even when the old connections are closed, the same thread is not used again but new threads are created.

so, if i do

telnet localhost 5000

from 3 consoles and issue the "INFO" command from 4th console, the output is

jayant@jayantbox:~/myprogs/java$ java -cp . testPool 5000
Starting Daemon
Thread pool constructor - creating thread
Thread pool - new thread created
Handler constructor - creating thread
Thread-1 - Handler - new thread created
Handler constructor - creating thread
Thread-2 - Handler - new thread created
Handler constructor - creating thread
Thread-3 - Handler - new thread created
Handler constructor - creating thread
Thread-4 - Handler - new thread created
Active Connections : 1
Total Connections : 4

4 threads are created. In this case, everytime you connect to the server, a new thread would be created.

Now, lets modify the code to work on java 1.5 using thread Executors...

Add the following code snippets at or after the following line numbers

4 import java.util.concurrent.*;
10  private int pool;
12  this.pool=2;
22  ExecutorService threadExecutor = Executors.newFixedThreadPool(pool);
27  threadExecutor.execute(new Handler(s));

And comment the following 2 lines

27  new Handler(s);
42  new Thread(this).start();


Not lets run the program again on port 5000 and try connecting to it. We have created a pool of 2 threads here. Observer that:


  • pool-1-thread-1 & pool-1-thread-2 are the two threads created.

  • After two connections to the server, the connections are not rejected, but are queued.

  • If a thread becomes available, a connection from the queue is assigned to the thread for processing

  • The same two threads are used again and again. No new threads are created.

  • Connections which are in queue are not able to process any requests unless they are assigned to a worker thread



Process for testing

console 1: telnet localhost 5000
console 2: telnet localhost 5000
console 3: telnet localhost 5000
console 1: from 1
console 2: from 2
console 3: from 3
console 1: INFO
console 1: 1 quitting
console 1: QUIT


Output for the testing process


jjayant@jayantbox:~/myprogs/java$ java -cp . testPool 5000
Starting Daemon
Thread pool constructor - creating thread
Thread pool - new thread created
Handler constructor - creating thread
pool-1-thread-1 - Handler - new thread created
Handler constructor - creating thread
pool-1-thread-2 - Handler - new thread created
Handler constructor - creating thread
Command = from 1
Command = from 2
Active Connections : 2
Total Connections : 3
Command = 1 quitting
pool-1-thread-1 - Handler - new thread created
Command = from 3


As seen, processing of connection 3 starts only after connection 1 is closed by the client.

The benefits of using a thread executor instead of a normal thread are

a] Overhead of creating and destroying threads is avoided.
b] A queue of threads is created automatically when the no of tasks exceeds the no of threads in the pool. The tasks in queue are automatically executed when the threads become free.
c] A limit (maximum no of threads) could be imposed thus controlling the available resources for delivering better performance.

No comments: