In this post, I am going to explain what is
bandwidth throttling, why do we require it & how can we throttle bandwidth
in a Java application.
Bandwidth
Throttling
Bandwidth throttling is a technique to
control the speed of the network connection.
Why do
we control network speed?
Let me explain with some basic example.
Example1:
System level bandwidth controlling
Suppose you are running a startup company of
10 employees, where each employee has his/her dedicated computer. You want to
migrate all your employees’ data from their computers to cloud server. Suppose
your network bandwidth is 25MBPS, what if every computer tries to use 25MBPS
bandwidth. Only one computer wins the race, remaining all computers wait until
other computer frees the bandwidth.
Example2:
Application level bandwidth controlling
Suppose you developed a content management
client, it syncs the data of your system to the cloud periodically. Assume the
bandwidth allocated to your system is 10MBPS, what if your content management
client is using maximum bandwidth of 10MBPS. Since content management client
using the bandwidth of 10MBPS, which is the maximum allocated to your system, no
other application from your system can access the network. It definitely make your
users unhappy. One way to solve is you can restrict the bandwidth usage at
application level. Now a days, most of the application provides settings to
control the bandwidth usage of their application. By setting the bandwidth
usage to some value, you can restrict the bandwidth usage.
I am going to give two sample applications
that control the bandwidth usage while uploading & downloading a document.
Class
|
Description
|
ThrottledInputStream.java
|
Throttle the bandwidth while downloading
the data.
|
ThrottledOutputStream.java
|
Throttle the bandwidth while uploading the
data.
|
Designing
ThrottledInputStream.java
First step is to extend the InputStream class
and override the read methods. Following table summarizes the read() methods
provided by InputStream class.
Method
|
Description
|
public abstract int read() throws
IOException
|
It return the next byte of data from input
stream, if no data is available, then it return -1. Since this is abstract
method, all the subclasses provide implementation to this method.
|
public int read(byte[] b) throws
IOException
|
Equivalent to ‘read(b, 0, b.length)’.
|
public int read(byte[] b, int off, int len)
throws IOException
|
Reads up to len bytes of data from the
input stream into an array of bytes. ‘off’ specifies the start offset in
array b at which the data is written.
|
How can
we throttle while reading the data?
Input
from User: Number
of bytes to read per second
Method:
On
every read() function call, we should check the number of bytes read, and the
elapsed time. Based on the number of bytes read in elapsed time, we should
adjust the bandwidth download rate, by making the thread to sleep for given
amount of time.
Following is the implementation of
ThrottledInputStream.java class.
ThrottledInputStream.java
package bandwidthThrotling; import java.io.*; public class ThrottledInputStream extends InputStream { private final InputStream inputStream; private final long maxBytesPerSec; private final long startTime = System.nanoTime(); private long bytesRead = 0; private long totalSleepTime = 0; private static final long SLEEP_DURATION_MS = 30; public ThrottledInputStream(InputStream inputStream) { this(inputStream, Long.MAX_VALUE); } public ThrottledInputStream(InputStream inputStream, long maxBytesPerSec) { if (maxBytesPerSec < 0) { throw new IllegalArgumentException("maxBytesPerSec shouldn't be negative"); } if (inputStream == null) { throw new IllegalArgumentException("inputStream shouldn't be null"); } this.inputStream = inputStream; this.maxBytesPerSec = maxBytesPerSec; } @Override public void close() throws IOException { inputStream.close(); } @Override public int read() throws IOException { throttle(); int data = inputStream.read(); if (data != -1) { bytesRead++; } return data; } @Override public int read(byte[] b) throws IOException { throttle(); int readLen = inputStream.read(b); if (readLen != -1) { bytesRead += readLen; } return readLen; } @Override public int read(byte[] b, int off, int len) throws IOException { throttle(); int readLen = inputStream.read(b, off, len); if (readLen != -1) { bytesRead += readLen; } return readLen; } private void throttle() throws IOException { while (getBytesPerSec() > maxBytesPerSec) { try { Thread.sleep(SLEEP_DURATION_MS); totalSleepTime += SLEEP_DURATION_MS; } catch (InterruptedException e) { System.out.println("Thread interrupted" + e.getMessage()); throw new IOException("Thread interrupted", e); } } } public long getTotalBytesRead() { return bytesRead; } /** * Return the number of bytes read per second */ public long getBytesPerSec() { long elapsed = (System.nanoTime() - startTime) / 1000000000; if (elapsed == 0) { return bytesRead; } else { return bytesRead / elapsed; } } public long getTotalSleepTime() { return totalSleepTime; } @Override public String toString() { return "ThrottledInputStream{" + "bytesRead=" + bytesRead + ", maxBytesPerSec=" + maxBytesPerSec + ", bytesPerSec=" + getBytesPerSec() + ", totalSleepTimeInSeconds=" + totalSleepTime / 1000 + '}'; } }
Testing the
application
I
am going to download the file ‘abc.pdf’ of size 131KB. Following test
application has 3 methods.
Method
|
Description
|
test1()
|
Test
the ThrottledInputStream by downloading 1000 bits/second
|
test2()
|
Test
the ThrottledInputStream by downloading 2000 bits/second
|
test3()
|
Test
the ThrottledInputStream by downloading 3000 bits/second
|
ThrottledInputStreamTest.java
package bandwidthThrotling; import java.io.*; public class ThrottledInputStreamTest { private static final File file = new File("abc.pdf"); public static void test1() throws IOException { InputStream is = new FileInputStream(file); try (ThrottledInputStream throttledStream = new ThrottledInputStream(is, 1000);) { int data; while ((data = throttledStream.read()) != -1) { // System.out.print((char) data); } System.out.println("test1 : " + throttledStream.toString()); } } public static void test2() throws IOException { InputStream is = new FileInputStream(file); try (ThrottledInputStream throttledStream = new ThrottledInputStream(is, 2000);) { int data; while ((data = throttledStream.read()) != -1) { // System.out.print((char) data); } System.out.println("test2 : " + throttledStream.toString()); } } public static void test3() throws IOException { InputStream is = new FileInputStream(file); try (ThrottledInputStream throttledStream = new ThrottledInputStream(is, 3000); BufferedReader br = new BufferedReader(new InputStreamReader(throttledStream))) { String data; while ((data = br.readLine()) != null) { //System.out.print(data); } System.out.println("test3 : " + throttledStream.toString()); } } public static void main(String args[]) throws IOException { test1(); test2(); test3(); } }
Output
test1 : ThrottledInputStream{bytesRead=133830, maxBytesPerSec=1000, bytesPerSec=998, totalSleepTimeInSeconds=130} test2 : ThrottledInputStream{bytesRead=133830, maxBytesPerSec=2000, bytesPerSec=1997, totalSleepTimeInSeconds=64} test3 : ThrottledInputStream{bytesRead=133830, maxBytesPerSec=3000, bytesPerSec=2974, totalSleepTimeInSeconds=44}
Designing
ThrottledOutputStream.java
First
step is to extend the OutputStream class and override the write methods.
Following table summarizes the write() methods provided by OutputStream class.
Method
|
Description
|
public
void write(byte[] b) throws IOException
|
Writes
b.length bytes from the specified byte array to this output stream.
|
public
void write(byte[] b, int off, int len)throws IOException
|
Writes
len bytes from the specified byte array starting at offset off to this output
stream.
|
public
abstract void write(int b)throws IOException
|
Writes
the specified byte to this output stream. Since this is abstract method, sub
classes must provide implementation to this method.
|
How can we throttle
while reading the data?
Input from User: Maximum Number of
bytes to write per second
Method: While writing the
data to the stream, throttle by checking the total number of bytes write to the
stream, if (total number of bytes writes/elapsed time) are greater than maximum
number of bytes per second then sleep until the condition satisfies.
ThrottledOutputStream.java
package bandwidthThrotling; import java.io.IOException; import java.io.OutputStream; public class ThrottledOutputStream extends OutputStream { private OutputStream outputStream; private final long maxBytesPerSecond; private final long startTime = System.nanoTime(); private long bytesWrite = 0; private long totalSleepTime = 0; private static final long SLEEP_DURATION_MS = 30; public ThrottledOutputStream(OutputStream outputStream) { this(outputStream, Long.MAX_VALUE); } public ThrottledOutputStream(OutputStream outputStream, long maxBytesPerSecond) { if (outputStream == null) { throw new IllegalArgumentException("outputStream shouldn't be null"); } if (maxBytesPerSecond <= 0) { throw new IllegalArgumentException("maxBytesPerSecond should be greater than zero"); } this.outputStream = outputStream; this.maxBytesPerSecond = maxBytesPerSecond; } @Override public void write(int arg0) throws IOException { throttle(); outputStream.write(arg0); bytesWrite++; } @Override public void write(byte[] b, int off, int len) throws IOException { if (len < maxBytesPerSecond) { throttle(); bytesWrite = bytesWrite + len; outputStream.write(b, off, len); return; } long currentOffSet = off; long remainingBytesToWrite = len; do { throttle(); remainingBytesToWrite = remainingBytesToWrite - maxBytesPerSecond; bytesWrite = bytesWrite + maxBytesPerSecond; outputStream.write(b, (int) currentOffSet, (int) maxBytesPerSecond); currentOffSet = currentOffSet + maxBytesPerSecond; } while (remainingBytesToWrite > maxBytesPerSecond); throttle(); bytesWrite = bytesWrite + remainingBytesToWrite; outputStream.write(b, (int) currentOffSet, (int) remainingBytesToWrite); } @Override public void write(byte[] b) throws IOException { this.write(b, 0, b.length); } public void throttle() throws IOException { while (getBytesPerSec() > maxBytesPerSecond) { try { Thread.sleep(SLEEP_DURATION_MS); totalSleepTime += SLEEP_DURATION_MS; } catch (InterruptedException e) { System.out.println("Thread interrupted" + e.getMessage()); throw new IOException("Thread interrupted", e); } } } /** * Return the number of bytes read per second */ public long getBytesPerSec() { long elapsed = (System.nanoTime() - startTime) / 1000000000; if (elapsed == 0) { return bytesWrite; } else { return bytesWrite / elapsed; } } @Override public String toString() { return "ThrottledOutputStream{" + "bytesWrite=" + bytesWrite + ", maxBytesPerSecond=" + maxBytesPerSecond + ", bytesPerSec=" + getBytesPerSec() + ", totalSleepTimeInSeconds=" + totalSleepTime / 1000 + '}'; } public void close() throws IOException { outputStream.close(); } }
ThrottledOutputStreamTest.java
package bandwidthThrotling; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; public class ThrottledOutputStreamTest { private static final File file1 = new File("test1.txt"); private static final File file2 = new File("test2.txt"); public static void test1() throws IOException { OutputStream os = new FileOutputStream(file1); try (ThrottledOutputStream throttledStream = new ThrottledOutputStream(os, 1000);) { for (int i = 0; i < 10000; i++) { String str = "Hello World, Throttled Stream\n"; throttledStream.write(str.getBytes()); } System.out.println("test1 : " + throttledStream.toString()); } } public static void test2() throws IOException { OutputStream os = new FileOutputStream(file2); try (ThrottledOutputStream throttledStream = new ThrottledOutputStream(os, 2000);) { for (int i = 0; i < 10000; i++) { String str = "Hello World, Throttled Stream\n"; throttledStream.write(str.getBytes(), 0, str.length()); } System.out.println("test2 : " + throttledStream.toString()); } } public static void main(String args[]) throws IOException { test1(); test2(); } }
Output
test1 : ThrottledOutputStream{bytesWrite=300000, maxBytesPerSecond=1000, bytesPerSec=1000, totalSleepTimeInSeconds=295} test2 : ThrottledOutputStream{bytesWrite=300000, maxBytesPerSecond=2000, bytesPerSec=2000, totalSleepTimeInSeconds=147}
No comments:
Post a Comment