Wednesday, November 01, 2006

SEDA Based Server Using Reactor Pattern

In one of my earlier blog entry on SEDA I mentioned that it would be interesting to marry SEDA and Reactor design pattern to implement a highly scalable, asynchronous event-driven driven network server. In this blog I describe design of a generic SEDA based server architecture and a Java implementation using Java NIO and Java Concurrent package.

A generic SEDA server architecture is shown in figure 2 below. Most server application design have following basic structure:
The SEDA based server architecture described here groups these activities of the server in distinct stages. Each stage is described in SEDA paper by Matt Welsh as shown in Figure 1 below.

Figure 1

A SEDA stage consists of an incoming event queue queue, a thread pool and an application supplied event handler. What I have not shown here is stage controller that manages stage's operation and adjusts resource allocation and scheduling dynamically because these components are not implemented in the Java implementation of this architecture discussed later.

Geeting back to the server architecture, the stages in this architecture are:

Reactor - An asynchronous I/O event handler for network events occuring on sockets. Once an event is detected it is queued for Accepor stage to handle it. The Reactor provides asynchronous network I/O multiplexing.

Acceptor - This stage basically accepts client connection request, creates an IO session (more on this later) and queues the IO Session for Reader stage to take over.

Reader - This stage reads request from the socket stream and potentially after decoding, puts the request on the input queue of the next stage - the processor.

Processor - This stage is essentially calls application supplied event handler which process the client request.

Sender - This stage encodes and sends the response back to the client.

Figure 2

By separating I/O bound activities of the server - accepting connection, reading request and sending response in its own queue and thread pool, and keeping the core request processing in its own thread pool, the system can achieve very high scalability and performance. Under extreme load condition the admission control at each queue can throttle the request admission thus allowing the server provide reasonably good performance to existing clients and not accepting new client requests (this is one of the key aspect of SEDA architecture). If the Processor stage has required data cached in memory using various caching technologies, the Processor stage may be able to avoid I/O completly (such as database calls).

Figure 3 shows the Class diagram of a SEDA based server using Reactor pattern implemented in Java 5 using java NIO and concurrent package for thread pool management.


Figure 3 SEDA Server Implemented Using Reactor Design Pattern

Now the Java source code (for brevity only key aspects of each class are shown below:

Reactor

public class Reactor {
// Static variables and initializers

private final static Reactor INSTANCE = new Reactor();
final Logger logger = LoggerFactory.getLogger(Reactor.class);
// Instance Variables
private Selector selector;
private final Map channels = new HashMap();
private ArrayBlockingQueue acceptQueue =
new ArrayBlockingQueue(50);
private ThreadPoolExecutor acceptExecutor =
new ThreadPoolExecutor(5, 10,
2000, TimeUnit.SECONDS, sessionQueue);

public void registerService(SocketAddress address,
ServiceHandler handler) throws IOException {
ServerSocketChannel ssc = null;
ssc = ServerSocketChannel.open();
ssc.configureBlocking( false );

ssc.socket().setReuseAddress(true);

// and bind.
ssc.socket().bind(address);

ssc.register( selector,

SelectionKey.OP_ACCEPT, new Acceptor(handler, ssc) );

synchronized( channels )
{
channels.put( address, ssc );
}
}

public void run() {
for( ; ; )
{
try
{

int nKeys = selector.select();

if( nKeys > 0 )
{
processIOEvents( selector.selectedKeys() );
}
}
catch( IOException e )
{
logger.error("Main event loop thread exception:" + e );
....
}
}
}

private void processIOEvents( Set keys ) throws IOException
{
Iterator it = keys.iterator();
while( it.hasNext() )
{
SelectionKey key =
( SelectionKey ) it.next();
it.remove();
dispatch(key);
}
}

private void dispatch(SelectionKey key) {
Runnable r =
(Runnable) (key.attachment());
if (r != null)
acceptExecutor.execute(r);
}
}

Acceptor

public class Acceptor implements Runnable {
private ServiceHandler handler;
private ServerSocketChannel serverSocket;
private ArrayBlockingQueue sessionQueue = new ArrayBlockingQueue(50);
private ThreadPoolExecutor sessionExecutor = new ThreadPoolExecutor(5, 10, 2000,
TimeUnit.SECONDS, sessionQueue);

public Acceptor(ServiceHandler handler, ServerSocketChannel ssc) {
this.handler = handler;
serverSocket = ssc;
}

public void run() {
try {

SocketChannel ch = serverSocket.accept();

if( ch == null )
{
return;
}
SocketSessionImpl session =
new SocketSessionImpl(selector, ch, handler );

sessionExecutor.execute(session);

}
catch( Throwable t )
{
logger.error( "Error handling request for"+ t );
}
}
}

SocketSessionImpl

public class SocketSessionImpl implements IoSession, Runnable {

//-------------------------------------------------------------------------
// Static variables & static initializers.
//-------------------------------------------------------------------------
final Logger logger = LoggerFactory.getLogger(SocketSessionImpl.class);

//-------------------------------------------------------------------------
// Instance variables.
//-------------------------------------------------------------------------
public enum SESSIONSTATE {READING, SENDING}
public final int MAXIN=4096;
public final int MAXOUT=4096;

private final SocketChannel ch;
private final ServiceHandler handler;
private final SocketAddress remoteAddress;
private final SocketAddress localAddress;
private SelectionKey sk;
private ByteBuffer input = ByteBuffer.allocate(MAXIN);
private ByteBuffer output = ByteBuffer.allocate(MAXOUT);
private int readBufferSize;
private int bytesWritten;
private int writeTimeout;
private final Map attributes = new HashMap();
private final long creationTime;
private volatile SESSIONSTATE state = SESSIONSTATE.READING;
private static ArrayBlockingQueue ioQueue = new ArrayBlockingQueue(50);
private static ThreadPoolExecutor ioExecutor =
new ThreadPoolExecutor(5, 10, 2000,
TimeUnit.SECONDS, ioQueue);
private static ArrayBlockingQueue processQueue = new ArrayBlockingQueue(50);
private static ThreadPoolExecutor processExecutor = new
ThreadPoolExecutor(5, 10, 2000,
TimeUnit.SECONDS, processQueue);

SocketSessionImpl( Selector sel, SocketChannel ch,
ServiceHandler defaultHandler) throws IOException
{
this.ch = ch;
this.handler = defaultHandler;
this.remoteAddress = ch.socket().getRemoteSocketAddress();
this.localAddress = ch.socket().getLocalSocketAddress();
this.creationTime = System.currentTimeMillis();
this.output.clear();
ch.configureBlocking(false);
sk = ch.register(sel, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}

public void run() {
try {
if (state == SESSIONSTATE.READING) read();
else if (state == SESSIONSTATE.SENDING) send();
}
catch (IOException ioe) {
logger.error("run():", ioe);
}
finally {
...
}
}

public synchronized void read() throws IOException
{
try
{
input.clear();

int readBytes = 0;
int ret;

try
{
while( ( ret = ch.read( input ) ) > 0 )
{
readBytes += ret;
}
}
finally
{
input.flip();
}

BaseServiceRequest req = new BaseServiceRequest(input);
BaseServiceResponse res = new BaseServiceResponse(output);

processExecutor.execute(new Processor(req,res));

}
catch( Throwable e )
{
logger.error("read():", e);
}
finally
{
input.clear();
}
}

public synchronized void send() throws IOException
{
int count = ch.write( output );
bytesWritten = bytesWritten + count;
if( !output.hasRemaining() )
{
logger.debug("Written " + bytesWritten + " bytes. Changing state to READING.");
state = SESSIONSTATE.READING;
sk.interestOps(SelectionKey.OP_READ);
bytesWritten = 0;
output.clear();
}
}
}

Processor

// Inner Class of SocketIoSessionImpl
class Processor implements Runnable {
private ServiceRequest req;
private ServiceResponse res;

public Processor(ServiceRequest req, ServiceResponse res) {
this.req = req;
this.res = res;
}

public void run() {
try {

handler.process(req, res);

state = SESSIONSTATE.SENDING;
sk.interestOps(SelectionKey.OP_WRITE);
sk.selector().wakeup();

ioExecutor.execute(new Sender());

}
catch (Throwable t) {
logger.error("Error Processing request, Reason:", t);
}
}
}

Sender

// Inner Class of SocketIoSessionImpl
class Sender implements Runnable {
public void run() {
try {
send();
}
catch (Throwable t) {
logger.error("Error sending response, Reason:", t);
}
}

A service implementation using this framework has to implement the ServiceHandler interface which the Processor calls to invoke the service. An echo service could be implemented as:

public class EchoServiceHandler implements
ServiceHandler {

public void process(ServiceRequest req, ServiceResponse res) {
ByteBuffer rb = ( ByteBuffer ) req.getReadBuffer();
// Write the received data back to remote peer
ByteBuffer wb = res.getSendBuffer();
wb.put( rb );
wb.flip();
res.send();
}
}

Thread Pool Size

How many threads should the thread pools have? It depends on the kinds of tasks the worker threads perform. If the tasks in the work queue are compute-bound, then a thread pool of N or N+1 threads on an N-processor system will generally achieve maximum CPU utilization.

For tasks that may wait for I/O to complete i.e. they are mix of compute and I/O bound then application has to be profiled using profiling tools to get ratio of waiting time to service time for a typical service request. For an N-processor system a good thumb rule is to have N * (1 + waiting time/service time) threads in the pool for optimal performance and CPU utilization.

This page is powered by Blogger. Isn't yours?