Sunday 16 June 2019

Selectors: Asynchronous Operations with NIO


Blocked IO Operations
All the standard IO operations are blocked. Suppose, you trigger a read() operation on InputStream, thread that trigger this read() operation blocks until data is ready to be read. In blocked state, thred will not perform any other operations, leads use resource wastage.

Asynchronous IO operation
With NIO, we can make IO operations asynchronous. We trigger a read operation and continue to work on something else. When the data is ready to be read, we are called back.

Why Asynchronous IO matter?
As compared to CPU centric and in-memory operation, IO operations (such as file, network specific) are pretty slow. With Asynchronous IO, a single thread request for some data to be read and can perform other operations mean time, when the data is ready, call back gets called. With this, we can keep the thread busy with other tasks, and a single thread can able to perform multiple operations.

What is Selector?
Selector is used to inspect one or more NIO channels and determine which channels are ready for writing or reading.

Why to use selector?
Using selector, single thread can able to handle multiple channels.


How to create a selector?
Selector selector = Selector.open();

How to register a channel to selector?
Using register method, you can register a channel to selector.
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);

serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

The channel must be in non-blocking mode to be used by the selector.

Second argument of register method specifies, what event you are interested to listen on this channel via selector.

There are 4 types of events. Below table summarizes the events.
Event
Description
OP_READ
Specifies read operation
OP_WRITE
Specifies write operation
OP_ACCEPT
Specifies socket accept operation
OP_CONNECT
Specifies socket connect operations

If you are interested in more than one event, combine them using bitwise | operator.

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT;

How to build server using Selector?
Step 1: Create ServerSocketChannel and make it to work in non-blocking mode.

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);

Step 2: Create an instance of ServerSocket from ServerSocketChannel and bind it to the port.
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(1234));

Step 3: Create a selctor and register server socket channel to the selector.
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

Step 4: Wait for the events to come.
int noOfEvents = selector.select();

‘select()’ method Selects a set of keys whose corresponding channels are ready for I/O operations. This method blocks until atleast one channel is selected.

Step 5: Get all the Selected keys and iterate over each selected key and take necessary action based on key's ready-operation set.
Set<SelectionKey> selectionKeys = selector.selectedKeys();

for (SelectionKey selectionKey : selectionKeys) {
 if ((selectionKey.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {

  // Accepting the client connection.
  System.out.println("Accepting the connection.......");
  ServerSocketChannel tempChannel = (ServerSocketChannel) selectionKey.channel();
  SocketChannel socketChannel = tempChannel.accept();
  socketChannel.configureBlocking(false);
  socketChannel.register(selector, SelectionKey.OP_READ);
  selectionKeys.remove(selectionKey);

 } else if ((selectionKey.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {

  // Reading the content from the channel
  try (SocketChannel tempChannel = (SocketChannel) (selectionKey.channel());) {
   ByteBuffer buffer = ByteBuffer.allocate(1024);
   tempChannel.read(buffer);
   buffer.flip();

   CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer);

   String data = new String(charBuffer.array());

   System.out.println("Received : " + data);

   buffer.clear();
   selectionKeys.remove(selectionKey);

   selectionKey.cancel();
  }

 }

}

Build a client socket to communicate with server socket
Step 1: Open SocketChannel for given address.
InetSocketAddress inetSocketAddress = new InetSocketAddress(1234);
SocketChannel socketChannel = SocketChannel.open(inetSocketAddress);

Step 2: Get the Socket instance from socketChannel.
Socket socket = socketChannel.socket();

Step 3: Write the data to SocketChannel.
CharBuffer charBuffer = CharBuffer.allocate(1024);
charBuffer.put("Hello World");
charBuffer.flip();

ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode(charBuffer);

socketChannel.write(byteBuffer);

Step 4: Close the socket.
socket.close();

Find the below working application.


AsyncServerSocket.java
package com.sample.app;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Set;

public class AsyncServerSocket {

 public static void main(String... args) throws IOException {

  // Create ServerSocketChannel and make it to work in non-blocking mode
  ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  serverSocketChannel.configureBlocking(false);

  // Create an instance of ServerSocket from ServerSocketChannel and bind it to
  // the port
  ServerSocket serverSocket = serverSocketChannel.socket();
  serverSocket.bind(new InetSocketAddress(1234));

  // Create a Selector
  Selector selector = Selector.open();

  // Register the ServerSocketChannel with the selector. IO Channel is going to
  // send OP_ACCEPT events to the selector.
  serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

  while (true) {
   System.out.println("Waiting for the events......");

   // select() is a blocking call, it will be unblocked if there is atleast one
   // channel is selected.
   int noOfEvents = selector.select();
   System.out.println("Received " + noOfEvents + " events");

   Set<SelectionKey> selectionKeys = selector.selectedKeys();

   for (SelectionKey selectionKey : selectionKeys) {
    if ((selectionKey.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {

     // Accepting the client connection.
     System.out.println("Accepting the connection.......");
     ServerSocketChannel tempChannel = (ServerSocketChannel) selectionKey.channel();
     SocketChannel socketChannel = tempChannel.accept();
     socketChannel.configureBlocking(false);
     socketChannel.register(selector, SelectionKey.OP_READ);
     selectionKeys.remove(selectionKey);

    } else if ((selectionKey.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {

     // Reading the content from the channel
     try (SocketChannel tempChannel = (SocketChannel) (selectionKey.channel());) {
      ByteBuffer buffer = ByteBuffer.allocate(1024);
      tempChannel.read(buffer);
      buffer.flip();

      CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer);

      String data = new String(charBuffer.array());

      System.out.println("Received : " + data);

      buffer.clear();
      selectionKeys.remove(selectionKey);

      selectionKey.cancel();
     }

    }

   }
  }
 }
}


MySocketClient.java
package com.sample.app;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;

public class MySocketClient {

 public static void main(String args[]) throws IOException {

  InetSocketAddress inetSocketAddress = new InetSocketAddress(1234);
  SocketChannel socketChannel = SocketChannel.open(inetSocketAddress);

  Socket socket = socketChannel.socket();

  CharBuffer charBuffer = CharBuffer.allocate(1024);
  charBuffer.put("Hello World");
  charBuffer.flip();

  ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode(charBuffer);

  socketChannel.write(byteBuffer);

  socket.close();

 }
}

Run 'AsyncServerSocket' application.

Run MySocketClient application.

You can able to see below messages in console of 'AsyncServerSocket' application.

Waiting for the events......
Received 1 events
Accepting the connection.......
Waiting for the events......
Received 1 events
Received : Hello World
Waiting for the events......



Previous                                                 Next                                                 Home

No comments:

Post a Comment