Tuesday 6 February 2024

Rate limiting: Token Bucket Algorithm implementation in Java

 

The Token Bucket Algorithm helps to control the rate at which requests are made to a system or server. It works by simulating a bucket with a limited number of tokens inside. Each token represents permission to process a request.

 

When the tokens available in bucket

If there are available tokens in the bucket when a request arrives, the request is processed immediately. The token is then consumed from the bucket, reducing the available token count.

 

When No Tokens are Available

When there are no available tokens in the bucket when a request arrives, the behaviour can be determined by the implementation of the rate limiter.

In some implementations,

a.   the request may be delayed (waited for) until tokens become available. This means that the request will be queued or put on hold until there are enough tokens in the bucket to process it.

b.   In other implementations, the request may be rejected if there are no available tokens. This means that the rate limiter will not allow the request to be processed until there are sufficient tokens available in the bucket.

 

Here's how it works:

 

Setting up the Bucket:

You decide how big the bucket should be, which sets how many tokens it can hold. This specifies the maximum rate of requests. You also set how quickly the bucket gets refilled with new tokens.

 

Processing Requests:

When a request comes in, the system checks if there are any tokens available in the bucket. If there are tokens, one is taken out, and the request is handled. If the bucket is empty, the request is either rejected or put in a waiting state, depending on how you set it up.

 

Refilling the Bucket:

Over time, the bucket gets more tokens added to it according to the refill rate. This helps deal with sudden increases in requests while still keeping within the overall rate limit.

 

In the Token Bucket algorithm, the refill rate determines how frequently new tokens are added to the bucket. If the refill rate is set to 10 seconds and the bucket size is 5, it means that one token will be added to the bucket every 10 seconds until the bucket is full.

 

So, to clarify further, every 10 seconds, one token will be added to the bucket. Once the bucket is full (reaches its maximum size of 5 tokens), further tokens won't be added until tokens are consumed and space becomes available in the bucket.

 

Advantages:

a.   Easy to understand and set up.

b.   You can adjust the bucket size and refill rate to fit your needs.

c.    It can handle short bursts of traffic over the average rate.

d.   It treats all requests fairly.

 

Disadvantages:

a.   It might struggle with handling lots of requests very quickly in real-time systems.

b.   It can be tricky to keep the token count consistent across multiple servers, I mean in a distributed environment.

 


 

 

Find the below working application.

 

RateLimiter.java 

package com.sample.app.ratelimiter.interfaces;

import com.sample.app.ratelimiter.exceptions.RateLimiterException;

/**
 * Interface for defining a rate-limiting algorithm.
 */
public interface RateLimiter {

	/**
	 * Checks if access to a resource is allowed based on the rate-limiting
	 * algorithm.
	 * 
	 * @param resourceIdentifier The identifier of the resource (e.g., URI, file
	 *                           path, connection).
	 * @return True if access is allowed, false otherwise.
	 */
	boolean allowAccess(String resourceIdentifier) throws RateLimiterException;

	/**
	 * Resets the rate-limiting state for a given resource.
	 * 
	 * The reset method in the RateLimiter interface serves the purpose of resetting
	 * the rate-limiting state for a specific resource. Let's delve into why this
	 * method is useful:
	 * 
	 * <ol>
	 * 
	 * <li>Clearing state: The rate-limiting algorithms often maintain internal
	 * state information for each resource they are controlling access to. This
	 * state may include variables such as the current token count in a token
	 * bucket, the count of requests within a fixed window, or the state of a
	 * priority queue. The reset method allows you to clear this state, essentially
	 * resetting the algorithm's tracking for that particular resource.</li>
	 * 
	 * <li>Handling State Changes: In dynamic systems, the characteristics of
	 * resources may change over time. For example, the rate of access to a specific
	 * URI might need adjustment due to changes in usage patterns or system
	 * conditions. By providing a reset method, you enable users of your
	 * rate-limiting library to dynamically modify the rate-limiting parameters or
	 * reset the state of a resource when necessary.</li>
	 * 
	 * <li>Error Recovery: If an error condition occurs while processing requests
	 * for a resource, it may be necessary to reset the rate-limiting state to
	 * recover from that error gracefully. For instance, if there's an unexpected
	 * surge in traffic that causes the rate limiter to behave erroneously,
	 * resetting the state can help restore normal operation.
	 * 
	 * <li>Testing and Debugging: During testing and debugging of applications using
	 * the rate-limiting library, being able to reset the rate-limiting state for
	 * specific resources can be invaluable. It allows developers to start with a
	 * clean slate for each test scenario and ensures consistent behavior across
	 * multiple test runs.</li>
	 * 
	 * </ol>
	 * 
	 * @param resourceIdentifier The identifier of the resource (e.g., URI, file
	 *                           path, connection).
	 */
	void reset(String resourceIdentifier) throws RateLimiterException;
}

 

Define utility classes.

 

NumberUtil.java
package com.sample.app.ratelmiter.util;

public class NumberUtil {

	public static void checkForPositiveNumber(Integer val, String message) {
		if (val == null || val <= 0) {
			throw new IllegalArgumentException(message);
		}
	}
}

 

TimeUtil.java

package com.sample.app.ratelmiter.util;

import java.util.concurrent.TimeUnit;

public class TimeUtil {
	
	public static int milliSecondsForSeconds(int n) {
		return n * 1000;
	}
	
	public static void sleepNSeconds(int noOfSeconds) {
		try {
			TimeUnit.SECONDS.sleep(noOfSeconds);
		} catch (InterruptedException e) {
			// ignore
		}
	}

	public static void sleepNMilliSeconds(int noOfMilliseconds) {
		try {
			TimeUnit.MILLISECONDS.sleep(noOfMilliseconds);
		} catch (InterruptedException e) {
			// ignore
		}
	}
}

 

Define Exception classes.

 

RateLimiterException.java

 

package com.sample.app.ratelimiter.exceptions;

public class RateLimiterException extends Exception {

	public RateLimiterException(Throwable t) {
		super(t);
	}
	
	public RateLimiterException(String msg) {
		super(msg);
	}

}

TokenBucketNotFoundException.java

package com.sample.app.ratelimiter.exceptions;

public class TokenBucketNotFoundException extends RateLimiterException {

	public TokenBucketNotFoundException(Throwable t) {
		super(t);
	}

	public TokenBucketNotFoundException(String msg) {
		super(msg);
	}

}

Define TokenBucket specific classes.

 

TokenBucketNewRequestStrategy.java

package com.sample.app.ratelimiter.tokenbucket;

public enum TokenBucketNewRequestStrategy {
	/**
	 * When there are no new tokens, then the application delay the requests until the tokens are available
	 */
	DELAY, 
	
	/**
	 * Where there are no new tokens, then the application reject the request straight away
	 */
	REJECT

}

TokenBucket.java

package com.sample.app.ratelimiter.tokenbucket;

import com.sample.app.ratelmiter.util.NumberUtil;

public class TokenBucket {
	// Fixed state of the token bucket
	private final int bucketSize;
	private final int refillRateInSeconds;
	private final TokenBucketNewRequestStrategy tokenBucketNewRequestStrategy;

	// Dynamic state
	private int tokens;
	private long lastRefillTime;

	private TokenBucket(int bucketSize, int refillRate, TokenBucketNewRequestStrategy tokenBucketNewRequestStrategy) {
		NumberUtil.checkForPositiveNumber(bucketSize, "bucketSize should be positive");
		NumberUtil.checkForPositiveNumber(refillRate, "refillRate should be positive");

		this.bucketSize = bucketSize;
		this.refillRateInSeconds = refillRate;
		this.tokens = bucketSize;
		this.lastRefillTime = System.currentTimeMillis();
		this.tokenBucketNewRequestStrategy = tokenBucketNewRequestStrategy;
	}

	/**
	 * Let's refill the tokens before check for the consumption
	 * 
	 * @return
	 */
	public synchronized boolean tryConsume() {
		refillTokens();

		if (tokenBucketNewRequestStrategy == TokenBucketNewRequestStrategy.DELAY) {
			while (tokens <= 0) {
				try {

					// if waitTime is <=0, means that wait time is already passed
					long waitTime = calculateWaitTime();
					System.out.println("waitTime : " + waitTime);
					if (waitTime > 0) {
						wait(waitTime);
					}

					this.refillTokens();
				} catch (InterruptedException e) {
					Thread.currentThread().interrupt();
					return false; // Interrupted while waiting
				}
			}
		}

		if (tokens > 0) {
			tokens--;
			return true;
		}
		return false;

	}

	private void refillTokens() {
		long currentTime = System.currentTimeMillis();
		long elapsedTime = currentTime - lastRefillTime;
		int tokensToAdd = (int) (elapsedTime / (refillRateInSeconds * 1000.0));

		if (tokensToAdd > 0) {
			int newTokens = Math.min(bucketSize, tokens + tokensToAdd);

			// Refill time should be updated only when the tokens are added
			if (newTokens != tokens) {
				lastRefillTime = currentTime;
				tokens = newTokens;
			}
		}

		this.notify();
	}

	private long calculateWaitTime() {
		long currentTime = System.currentTimeMillis();
		long elapsedTime = currentTime - lastRefillTime;
		long millisecondsToWait = ((refillRateInSeconds * 1000) - elapsedTime);
		return millisecondsToWait;
	}

	public static TokenBucket tokenBucket(int bucketSize, int refillRate,
			TokenBucketNewRequestStrategy tokenBucketNewRequestStrategy) {
		return new TokenBucket(bucketSize, refillRate, tokenBucketNewRequestStrategy);
	}

	public static TokenBucket delayRequestTokenBucket(int bucketSize, int refillRate) {
		return tokenBucket(bucketSize, refillRate, TokenBucketNewRequestStrategy.DELAY);
	}

	public static TokenBucket rejectRequestTokenBucket(int bucketSize, int refillRate) {
		return tokenBucket(bucketSize, refillRate, TokenBucketNewRequestStrategy.REJECT);
	}

	public TokenBucket deepClone() {
		return new TokenBucket(this.bucketSize, this.refillRateInSeconds, this.tokenBucketNewRequestStrategy);
	}

	@Override
	public String toString() {
		return "TokenBucket [bucketSize=" + bucketSize + ", refillRateInMilliseconds=" + refillRateInSeconds
				+ ", tokenBucketNewRequestStrategy=" + tokenBucketNewRequestStrategy + ", tokens=" + tokens
				+ ", lastRefillTime=" + lastRefillTime + "]";
	}

}

TokenBucketRateLimiter.java

package com.sample.app.ratelimiter.tokenbucket;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.sample.app.ratelimiter.exceptions.TokenBucketNotFoundException;
import com.sample.app.ratelimiter.interfaces.RateLimiter;

/**
 * Implementation of the Token Bucket rate-limiting algorithm.
 * 
 * <p>
 * The Token Bucket algorithm uses a metaphor of a bucket that can hold a
 * maximum number of tokens. Tokens are added to the bucket at a constant rate.
 * Each request consumes a certain number of tokens from the bucket. If there
 * are not enough tokens in the bucket, the request is delayed or rejected.
 * </p>
 */
public class TokenBucketRateLimiter implements RateLimiter {
	private final Map<String, TokenBucket> BUCKETS;

	public TokenBucketRateLimiter() {
		BUCKETS = new ConcurrentHashMap<>();
	}

	private TokenBucket getBucketMappedToThisResource(String resourceIdentifier) throws TokenBucketNotFoundException {
		TokenBucket bucket = BUCKETS.get(resourceIdentifier);
		if (bucket == null) {
			throw new TokenBucketNotFoundException("No bucket mapped to the resource identifier");
		}
		return bucket;
	}

	@Override
	public boolean allowAccess(String resourceIdentifier) throws TokenBucketNotFoundException {
		TokenBucket tokenBucket = getBucketMappedToThisResource(resourceIdentifier);
		return tokenBucket.tryConsume();
	}

	@Override
	public void reset(String resourceIdentifier) throws TokenBucketNotFoundException {
		TokenBucket tokenBucket = getBucketMappedToThisResource(resourceIdentifier);
		BUCKETS.put(resourceIdentifier, tokenBucket);

	}

	public void mapBucketToAResource(String resourceIdentifier, TokenBucket tokenBucket) {
		BUCKETS.put(resourceIdentifier, tokenBucket);
	}
}

Test applications

TokenBucketRateLimiterDemoForDelayStrategy.java

package com.sample.app.ratelimiter.tokenbucket;

import java.util.Date;
import java.util.UUID;

import com.sample.app.ratelmiter.util.TimeUtil;

public class TokenBucketRateLimiterDemoForDelayStrategy {

	public static void main(String[] args) {
		TokenBucketRateLimiter rateLimiter = new TokenBucketRateLimiter();

		String resourceIdentifier = UUID.randomUUID().toString();
		TokenBucket tokenBucket = TokenBucket.delayRequestTokenBucket(5, 2);

		rateLimiter.mapBucketToAResource(resourceIdentifier, tokenBucket);

		for (int i = 0; i < 100; i++) {
			boolean canIConsume = tokenBucket.tryConsume();

			if (canIConsume) {
				System.out.println("Token is assigned " + new Date());
			} else {
				TimeUtil.sleepNMilliSeconds(500);
				System.out.println("Tokens are not available");
			}

		}
	}

}

TokenBucketRateLimiterDemoForRejectStrategy.java

package com.sample.app.ratelimiter.tokenbucket;

import java.util.UUID;

import com.sample.app.ratelmiter.util.TimeUtil;

public class TokenBucketRateLimiterDemoForRejectStrategy {

	public static void main(String[] args) {
		TokenBucketRateLimiter rateLimiter = new TokenBucketRateLimiter();

		String resourceIdentifier = UUID.randomUUID().toString();
		TokenBucket tokenBucket = TokenBucket.rejectRequestTokenBucket(5, 5);

		rateLimiter.mapBucketToAResource(resourceIdentifier, tokenBucket);

		for (int i = 0; i < 100; i++) {
			boolean canIConsume = tokenBucket.tryConsume();

			if (canIConsume) {
				System.out.println("Token is assigned");
			} else {
				TimeUtil.sleepNMilliSeconds(500);
				System.out.println("Tokens are not available");
			}

		}
	}

}

Explanation on core methods of TokenBucket class.

refillTokens() method is responsible for refilling tokens in the bucket based on the elapsed time and the refill rate. It ensures that tokens are only added when necessary and updates the state accordingly. Finally, it notifies any waiting threads after tokens are added.

 

calculateWaitTime() method specifies the time to wait for the bucket to get a new token.

 

tryConsume() method represents a synchronized token consumption mechanism where the thread waits for tokens if they are not immediately available, adhering to the specified delay strategy, and returns true if tokens are successfully consumed or false if token consumption fails or is interrupted.





 

 

 

No comments:

Post a Comment