Sunday 20 November 2016

Bandwidth Throttling in Java

In this post, I am going to explain what is bandwidth throttling, why do we require it & how can we throttle bandwidth in a Java application.

Bandwidth Throttling
Bandwidth throttling is a technique to control the speed of the network connection.

Why do we control network speed?
Let me explain with some basic example.

Example1: System level bandwidth controlling
Suppose you are running a startup company of 10 employees, where each employee has his/her dedicated computer. You want to migrate all your employees’ data from their computers to cloud server. Suppose your network bandwidth is 25MBPS, what if every computer tries to use 25MBPS bandwidth. Only one computer wins the race, remaining all computers wait until other computer frees the bandwidth.

Example2: Application level bandwidth controlling
Suppose you developed a content management client, it syncs the data of your system to the cloud periodically. Assume the bandwidth allocated to your system is 10MBPS, what if your content management client is using maximum bandwidth of 10MBPS. Since content management client using the bandwidth of 10MBPS, which is the maximum allocated to your system, no other application from your system can access the network. It definitely make your users unhappy. One way to solve is you can restrict the bandwidth usage at application level. Now a days, most of the application provides settings to control the bandwidth usage of their application. By setting the bandwidth usage to some value, you can restrict the bandwidth usage.

I am going to give two sample applications that control the bandwidth usage while uploading & downloading a document.

Class
Description
ThrottledInputStream.java
Throttle the bandwidth while downloading the data.
ThrottledOutputStream.java
Throttle the bandwidth while uploading the data.


Designing ThrottledInputStream.java
First step is to extend the InputStream class and override the read methods. Following table summarizes the read() methods provided by InputStream class.

Method
Description
public abstract int read() throws IOException
It return the next byte of data from input stream, if no data is available, then it return -1. Since this is abstract method, all the subclasses provide implementation to this method.
public int read(byte[] b) throws IOException
Equivalent to ‘read(b, 0, b.length)’.
public int read(byte[] b, int off, int len) throws IOException
Reads up to len bytes of data from the input stream into an array of bytes. ‘off’ specifies the start offset in array b at which the data is written.

How can we throttle while reading the data?
Input from User: Number of bytes to read per second

Method: On every read() function call, we should check the number of bytes read, and the elapsed time. Based on the number of bytes read in elapsed time, we should adjust the bandwidth download rate, by making the thread to sleep for given amount of time.

Following is the implementation of ThrottledInputStream.java class.

ThrottledInputStream.java
package bandwidthThrotling;

import java.io.*;

public class ThrottledInputStream extends InputStream {

 private final InputStream inputStream;
 private final long maxBytesPerSec;
 private final long startTime = System.nanoTime();

 private long bytesRead = 0;
 private long totalSleepTime = 0;

 private static final long SLEEP_DURATION_MS = 30;

 public ThrottledInputStream(InputStream inputStream) {
  this(inputStream, Long.MAX_VALUE);
 }

 public ThrottledInputStream(InputStream inputStream, long maxBytesPerSec) {
  if (maxBytesPerSec < 0) {
   throw new IllegalArgumentException("maxBytesPerSec shouldn't be negative");
  }
  if (inputStream == null) {
   throw new IllegalArgumentException("inputStream shouldn't be null");
  }

  this.inputStream = inputStream;
  this.maxBytesPerSec = maxBytesPerSec;
 }

 @Override
 public void close() throws IOException {
  inputStream.close();
 }

 @Override
 public int read() throws IOException {
  throttle();
  int data = inputStream.read();
  if (data != -1) {
   bytesRead++;
  }
  return data;
 }

 @Override
 public int read(byte[] b) throws IOException {
  throttle();
  int readLen = inputStream.read(b);
  if (readLen != -1) {
   bytesRead += readLen;
  }
  return readLen;
 }

 @Override
 public int read(byte[] b, int off, int len) throws IOException {
  throttle();
  int readLen = inputStream.read(b, off, len);
  if (readLen != -1) {
   bytesRead += readLen;
  }
  return readLen;
 }

 private void throttle() throws IOException {
  while (getBytesPerSec() > maxBytesPerSec) {
   try {
    Thread.sleep(SLEEP_DURATION_MS);
    totalSleepTime += SLEEP_DURATION_MS;
   } catch (InterruptedException e) {
    System.out.println("Thread interrupted" + e.getMessage());
    throw new IOException("Thread interrupted", e);
   }
  }
 }

 public long getTotalBytesRead() {
  return bytesRead;
 }

 /**
  * Return the number of bytes read per second
  */
 public long getBytesPerSec() {
  long elapsed = (System.nanoTime() - startTime) / 1000000000;
  if (elapsed == 0) {
   return bytesRead;
  } else {
   return bytesRead / elapsed;
  }
 }

 public long getTotalSleepTime() {
  return totalSleepTime;
 }

 @Override
 public String toString() {
  return "ThrottledInputStream{" + "bytesRead=" + bytesRead + ", maxBytesPerSec=" + maxBytesPerSec
    + ", bytesPerSec=" + getBytesPerSec() + ", totalSleepTimeInSeconds=" + totalSleepTime / 1000 + '}';
 }
}

Testing the application
I am going to download the file ‘abc.pdf’ of size 131KB. Following test application has 3 methods.

Method
Description
test1()
Test the ThrottledInputStream by downloading 1000 bits/second
test2()
Test the ThrottledInputStream by downloading 2000 bits/second
test3()
Test the ThrottledInputStream by downloading 3000 bits/second

ThrottledInputStreamTest.java
package bandwidthThrotling;

import java.io.*;

public class ThrottledInputStreamTest {
 private static final File file = new File("abc.pdf");

 public static void test1() throws IOException {
  InputStream is = new FileInputStream(file);

  try (ThrottledInputStream throttledStream = new ThrottledInputStream(is, 1000);) {

   int data;

   while ((data = throttledStream.read()) != -1) {
    // System.out.print((char) data);
   }

   System.out.println("test1 : " + throttledStream.toString());
  }

 }

 public static void test2() throws IOException {
  InputStream is = new FileInputStream(file);

  try (ThrottledInputStream throttledStream = new ThrottledInputStream(is, 2000);) {

   int data;

   while ((data = throttledStream.read()) != -1) {
    // System.out.print((char) data);
   }

   System.out.println("test2 : " + throttledStream.toString());
  }

 }

 public static void test3() throws IOException {
  InputStream is = new FileInputStream(file);

  try (ThrottledInputStream throttledStream = new ThrottledInputStream(is, 3000);
    BufferedReader br = new BufferedReader(new InputStreamReader(throttledStream))) {

   String data;

   while ((data = br.readLine()) != null) {
    //System.out.print(data);
   }

   System.out.println("test3 : " + throttledStream.toString());
  }

 }

 public static void main(String args[]) throws IOException {
  test1();
  test2();
  test3();
 }
}

Output
test1 : ThrottledInputStream{bytesRead=133830, maxBytesPerSec=1000, bytesPerSec=998, totalSleepTimeInSeconds=130}
test2 : ThrottledInputStream{bytesRead=133830, maxBytesPerSec=2000, bytesPerSec=1997, totalSleepTimeInSeconds=64}
test3 : ThrottledInputStream{bytesRead=133830, maxBytesPerSec=3000, bytesPerSec=2974, totalSleepTimeInSeconds=44}


Designing ThrottledOutputStream.java
First step is to extend the OutputStream class and override the write methods. Following table summarizes the write() methods provided by OutputStream class.

Method
Description
public void write(byte[] b) throws IOException
Writes b.length bytes from the specified byte array to this output stream.
public void write(byte[] b, int off, int len)throws IOException
Writes len bytes from the specified byte array starting at offset off to this output stream.
public abstract void write(int b)throws IOException
Writes the specified byte to this output stream. Since this is abstract method, sub classes must provide implementation to this method.

How can we throttle while reading the data?
Input from User: Maximum Number of bytes to write per second

Method: While writing the data to the stream, throttle by checking the total number of bytes write to the stream, if (total number of bytes writes/elapsed time) are greater than maximum number of bytes per second then sleep until the condition satisfies.

ThrottledOutputStream.java
package bandwidthThrotling;

import java.io.IOException;
import java.io.OutputStream;

public class ThrottledOutputStream extends OutputStream {
 private OutputStream outputStream;
 private final long maxBytesPerSecond;
 private final long startTime = System.nanoTime();

 private long bytesWrite = 0;
 private long totalSleepTime = 0;
 private static final long SLEEP_DURATION_MS = 30;

 public ThrottledOutputStream(OutputStream outputStream) {
  this(outputStream, Long.MAX_VALUE);
 }

 public ThrottledOutputStream(OutputStream outputStream, long maxBytesPerSecond) {
  if (outputStream == null) {
   throw new IllegalArgumentException("outputStream shouldn't be null");
  }

  if (maxBytesPerSecond <= 0) {
   throw new IllegalArgumentException("maxBytesPerSecond should be greater than zero");
  }

  this.outputStream = outputStream;
  this.maxBytesPerSecond = maxBytesPerSecond;
 }

 @Override
 public void write(int arg0) throws IOException {
  throttle();
  outputStream.write(arg0);
  bytesWrite++;
 }

 @Override
 public void write(byte[] b, int off, int len) throws IOException {
  if (len < maxBytesPerSecond) {
   throttle();
   bytesWrite = bytesWrite + len;
   outputStream.write(b, off, len);
   return;
  }

  long currentOffSet = off;
  long remainingBytesToWrite = len;

  do {
   throttle();
   remainingBytesToWrite = remainingBytesToWrite - maxBytesPerSecond;
   bytesWrite = bytesWrite + maxBytesPerSecond;
   outputStream.write(b, (int) currentOffSet, (int) maxBytesPerSecond);
   currentOffSet = currentOffSet + maxBytesPerSecond;

  } while (remainingBytesToWrite > maxBytesPerSecond);

  throttle();
  bytesWrite = bytesWrite + remainingBytesToWrite;
  outputStream.write(b, (int) currentOffSet, (int) remainingBytesToWrite);
 }

 @Override
 public void write(byte[] b) throws IOException {
  this.write(b, 0, b.length);
 }

 public void throttle() throws IOException {
  while (getBytesPerSec() > maxBytesPerSecond) {
   try {
    Thread.sleep(SLEEP_DURATION_MS);
    totalSleepTime += SLEEP_DURATION_MS;
   } catch (InterruptedException e) {
    System.out.println("Thread interrupted" + e.getMessage());
    throw new IOException("Thread interrupted", e);
   }
  }
 }

 /**
  * Return the number of bytes read per second
  */
 public long getBytesPerSec() {
  long elapsed = (System.nanoTime() - startTime) / 1000000000;
  if (elapsed == 0) {
   return bytesWrite;
  } else {
   return bytesWrite / elapsed;
  }
 }

 @Override
 public String toString() {
  return "ThrottledOutputStream{" + "bytesWrite=" + bytesWrite + ", maxBytesPerSecond=" + maxBytesPerSecond
    + ", bytesPerSec=" + getBytesPerSec() + ", totalSleepTimeInSeconds=" + totalSleepTime / 1000 + '}';
 }

 public void close() throws IOException {
  outputStream.close();
 }
}

ThrottledOutputStreamTest.java
package bandwidthThrotling;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;

public class ThrottledOutputStreamTest {

 private static final File file1 = new File("test1.txt");
 private static final File file2 = new File("test2.txt");

 public static void test1() throws IOException {
  OutputStream os = new FileOutputStream(file1);

  try (ThrottledOutputStream throttledStream = new ThrottledOutputStream(os, 1000);) {

   for (int i = 0; i < 10000; i++) {
    String str = "Hello World, Throttled Stream\n";

    throttledStream.write(str.getBytes());
   }
   System.out.println("test1 : " + throttledStream.toString());
  }

 }

 public static void test2() throws IOException {
  OutputStream os = new FileOutputStream(file2);

  try (ThrottledOutputStream throttledStream = new ThrottledOutputStream(os, 2000);) {

   for (int i = 0; i < 10000; i++) {
    String str = "Hello World, Throttled Stream\n";

    throttledStream.write(str.getBytes(), 0, str.length());
   }
   System.out.println("test2 : " + throttledStream.toString());
  }
 }

 public static void main(String args[]) throws IOException {
  test1();
  test2();
 }
}


Output
test1 : ThrottledOutputStream{bytesWrite=300000, maxBytesPerSecond=1000, bytesPerSec=1000, totalSleepTimeInSeconds=295}
test2 : ThrottledOutputStream{bytesWrite=300000, maxBytesPerSecond=2000, bytesPerSec=2000, totalSleepTimeInSeconds=147}


No comments:

Post a Comment