Friday, 15 December 2023

Building a Persistent Key-Value Store with Log file

 

In this post, I am going to explain the process of constructing a key-value storage system from the scratch. The methodology employed here is based on the principles outlined in the Bitcask paper (https://riak.com/assets/bitcask-intro.pdf).

 

Our approach involves utilizing RAM to maintain a hash map of file pointers to values, complemented by a log-structured file system to optimize write operations.

 

Key considerations for this approach include:

a.   Storing keys in-memory using a HashMap-like data structure for fast lookups. All the keys and respective offsets in the file are stored in RAM.

b.   The actual <key, value> pairs are stored in a file.

c.    When appending a new key-value pair to the file, the hash map is updated to reflect the offset of the recently written data. This mechanism accommodates both the insertion of new keys and updates to existing ones.

d.   To retrieve a value, the hash map is employed to locate the offset in the data file. Subsequently, the system seeks to that location and reads the corresponding value.

e.   Write operations follow an append-only paradigm, ensuring that writes progress sequentially without the need for seeking. The system always considers the latest entry for a given key in the file.

f.     To optimize space utilization, old and deleted values undergo compaction or merging processes.

 

Let me explain the procedure in detail.

 

Step 1: Suppose I want to store a key-value pair in the storage system, such as <1, hello>.

 

In this scenario:

a.   We open a file and append the data <1, hello> to it.

b.   The key and the starting offset of this entry are stored in the HashMap.

 

This process can be illustrated in the figure below.

 


 

Let me add following entries.

 

<2, hi>

<3, there>

 


 

How to read the entry?

To retrieve the value linked to the key 2, I will query the HashMap using the key 2, obtain the offset of that entry, and then navigate to the corresponding file offset location to retrieve and return the value.

 

How to update the value associated with an entry?

We simply append this new entry to the file and update the offset value linked to this key in the HashMap. This process can be visualized in the image below.

 

engine.writeKey("2", "Good morning....");

 


 

How to delete an entry?

Similar to the update operation, the delete operation is lightweight, involving a disk write and an in-memory update. In the delete operation, the previous entries related to the deleted keys remain untouched and are explicitly cleared during merging and compaction.

 

To delete an entry, we need to choose value that is very unique so that it does not interfere with the existing value. Suppose if I choose the value as 'DELETED' to represent the deleted entry, then the delete operation is equivalent to write(key, "DELETED").

 

Suppose I delete the entry with key 1; the diagram illustrates the resulting state as follows.




Compaction:

An issue with this strategy is the potential accumulation of numerous stale entries over time. It becomes necessary to periodically remove these stale entries to prevent any adverse impact on the application's performance.

 

Implementation of the Storage Engine described above

Given the requirement to access file content using specific offsets, the use of RandomAccessFile enables the execution of CRUD (Create, Read, Update, Delete) operations on a file.

 

Write an entry to the storage engine

void write(String key, String value) throws IOException {
	// Move the file pointer to the end of the file
	long offset = randomAccessFile.length();
	randomAccessFile.seek(offset);

	randomAccessFile.writeUTF(key);
	randomAccessFile.writeUTF(value);

	if(this.deletePlaceholder.equals(value)) {
		offsetMapForKeys.remove(key);
	}else {
		offsetMapForKeys.put(key, offset);
	}
	
}

 

Above snippet append key-value pairs to a file while maintaining an offset map for later retrieval.

 

Read an entry from the storage engine

String read(String key) throws IOException {
	Long offset = offsetMapForKeys.get(key);
	if (offset == null) {
		return null;
	}

	randomAccessFile.seek(offset);

	// First UTF represents the key
	randomAccessFile.readUTF();

	String value = randomAccessFile.readUTF();

	if (deletePlaceholder.equals(value)) {
		return null;
	}
	return value;
}

Above snippet get the file offset associated with given key, and return the value at specific offset in the file.

 

Delete a key

public void delete(String key) throws IOException {
	write(key, "DELETED");
}

Find the below working application.

 

Define utility class

 

DateUtil.java

package com.sample.loghash.util;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

public class DateUtil {

	public static String getTimestamp() {
		LocalDateTime now = LocalDateTime.now();
		DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy_MM_dd_HH_mm_ss");
		return now.format(formatter);
	}

}

FileUtil.java

package com.sample.loghash.util;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

public class FileUtil {
	public static void createFileIfNotExists(String file) throws IOException {
		Path path = Paths.get(file);

		try {
			// Create the file, including any necessary parent directories
			Files.createDirectories(path.getParent());
			Files.createFile(path);
			//System.out.println("File created successfully: " + path);

		} catch (FileAlreadyExistsException e) {
			System.out.println("File already exists: " + path);
			//e.printStackTrace();

		} catch (IOException e) {
			//e.printStackTrace();
			System.err.println("An error occurred while creating the file: " + e.getMessage());
		}

	}

	public static String readFileContentViaReadingAllBytes(String filePath) throws IOException {
		return new String(Files.readAllBytes(Paths.get(filePath)), StandardCharsets.UTF_8);
	}

	public static void write(final String filePath, final String data) {

		if (data == null || data.isEmpty()) {
			//System.out.println("data is null (or) empty");
			return;
		}

		if (filePath == null) {
			//System.out.println("file is null");
			return;
		}

		File file = new File(filePath);

		try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) {
			writer.write(data);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public static void renameFile(String oldFileName, String newFileName) {
		File oldFile = new File(oldFileName);
		File newFile = new File(newFileName);

		if (oldFile.exists()) {
			boolean success = oldFile.renameTo(newFile);

			if (success) {
				//System.out.println("File renamed successfully.");
			} else {
				System.err.println("Failed to rename the file.");
			}
		} else {
			System.err.println("File does not exist: " + oldFileName);
		}
	}

	public static String getParentDirectory(String filePath) {
		Path path = Paths.get(filePath);
		Path parent = path.getParent();

		if (parent != null) {
			return parent.toString();
		} else {
			return "No parent directory (it might be the root directory)";
		}
	}

	public static String getFileNameWithoutExtension(String filePath) {
		Path path = Paths.get(filePath);
		String fileNameWithExtension = path.getFileName().toString();

		int lastDotIndex = fileNameWithExtension.lastIndexOf('.');

		if (lastDotIndex != -1) {
			return fileNameWithExtension.substring(0, lastDotIndex);
		} else {
			// File has no extension
			return fileNameWithExtension;
		}
	}

	public static String getFileExtension(String filePath) {
		Path path = Paths.get(filePath);
		String fileNameWithExtension = path.getFileName().toString();

		int lastDotIndex = fileNameWithExtension.lastIndexOf('.');

		if (lastDotIndex != -1 && lastDotIndex < fileNameWithExtension.length() - 1) {
			return fileNameWithExtension.substring(lastDotIndex + 1);
		} else {
			// No file extension
			return "";
		}
	}

}

JsonUtil.java

package com.sample.loghash.util;

import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;

public class JsonUtil {
	private static Gson gson = new Gson();
	private static Gson exclusedGson = new GsonBuilder().excludeFieldsWithoutExposeAnnotation().create();
	private static Gson prettyGson = new GsonBuilder().setPrettyPrinting().create();

	public static String getJson(Object obj) {
		return gson.toJson(obj);
	}

	public static String getOnlyExposedJson(Object obj) {
		return exclusedGson.toJson(obj);
	}

	public static String getPrettyJson(Object obj) {
		return prettyGson.toJson(obj);
	}

	public static <T> T getObject(String json, Class<T> clazz) {
		return gson.fromJson(json, clazz);
	}

	public static Map<String, Long> offsetMap(String json) {
		if (json == null || json.isEmpty()) {
			return new HashMap<>();
		}
		Type type = new TypeToken<Map<String, Long>>() {
		}.getType();
		return gson.fromJson(json, type);
	}
}

Define storage engine class.

 

StorageEngine.java


package com.sample.loghash.storage;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.HashMap;
import java.util.Map;

import com.sample.loghash.util.DateUtil;
import com.sample.loghash.util.FileUtil;
import com.sample.loghash.util.JsonUtil;

/**
 * Keep in mind that concurrent access to the same file by multiple threads or
 * processes may lead to unexpected behavior, so proper synchronization
 * mechanisms should be used if necessary.
 */
public class StorageEngine implements Closeable {
	private Map<String, Long> offsetMapForKeys;

	private String contentFilePath;
	private String offsetFilePath;
	private RandomAccessFile randomAccessFile;
	private boolean compact_operation_in_progress = false;
	private final String deletePlaceholder;

	public static StorageEngine of(String contentFilePath, String offsetFilePath, String deletePlaceholder)
			throws IOException {
		return new StorageEngine(contentFilePath, offsetFilePath, deletePlaceholder);
	}

	private StorageEngine(String contentFilePath, String offsetFilePath, String deletePlaceholder) throws IOException {
		FileUtil.createFileIfNotExists(contentFilePath);
		FileUtil.createFileIfNotExists(offsetFilePath);

		this.contentFilePath = contentFilePath;
		this.offsetFilePath = offsetFilePath;

		String offsetFileContent = FileUtil.readFileContentViaReadingAllBytes(offsetFilePath);
		offsetMapForKeys = JsonUtil.offsetMap(offsetFileContent);
		randomAccessFile = new RandomAccessFile(contentFilePath, "rw");
		this.deletePlaceholder = deletePlaceholder;
		System.out.println("***************** Storage Engine is ready to use ***************** ");
	}

	private void checkForCompactOperation() throws IOException {

		if (compact_operation_in_progress) {
			throw new IOException("Compact operations is in progress");
		}
	}

	private void write(String key, String value) throws IOException {
		// Move the file pointer to the end of the file
		long offset = randomAccessFile.length();
		randomAccessFile.seek(offset);

		randomAccessFile.writeUTF(key);
		randomAccessFile.writeUTF(value);

		if(this.deletePlaceholder.equals(value)) {
			offsetMapForKeys.remove(key);
		}else {
			offsetMapForKeys.put(key, offset);
		}
		
	}

	private void write(Map<String, String> map) throws IOException {

		for (Map.Entry<String, String> entry : map.entrySet()) {
			write(entry.getKey(), entry.getValue());
		}

	}

	private String read(String key) throws IOException {
		Long offset = offsetMapForKeys.get(key);
		if (offset == null) {
			return null;
		}

		randomAccessFile.seek(offset);

		// First UTF represents the key
		randomAccessFile.readUTF();

		String value = randomAccessFile.readUTF();

		if (deletePlaceholder.equals(value)) {
			return null;
		}
		return value;
	}

	public void writeKey(String key, String value) throws IOException {
		checkForCompactOperation();
		write(key, value);
	}

	public void writeKeys(Map<String, String> map) throws IOException {

		for (Map.Entry<String, String> entry : map.entrySet()) {
			writeKey(entry.getKey(), entry.getValue());
		}

	}

	public void delete(String key) throws IOException {
		writeKey(key, deletePlaceholder);
	}

	public String readKey(String key) throws IOException {
		checkForCompactOperation();
		return read(key);
	}

	public Map<String, String> readKeys() throws IOException {
		Map<String, String> map = new HashMap<>();
		for (String key : offsetMapForKeys.keySet()) {
			String value = readKey(key);
			if (value != null) {
				map.put(key, value);
			}

		}

		return map;
	}

	/**
	 * Drop all the stale entries
	 * 
	 * @throws IOException
	 */
	public void compact() throws IOException {

		try {
			Map<String, String> activeEntries = readKeys();

			compact_operation_in_progress = true;

			// Close the file Handle
			close();

			// Take backup of old one and write new one
			String timestamp = DateUtil.getTimestamp();
			String parentDirPath = FileUtil.getParentDirectory(contentFilePath);
			String backupContentFilePath = FileUtil.getFileNameWithoutExtension(contentFilePath) + timestamp + "."
					+ FileUtil.getFileExtension(contentFilePath);
			String backupOffsetFilePath = FileUtil.getFileNameWithoutExtension(offsetFilePath) + timestamp + "."
					+ FileUtil.getFileExtension(offsetFilePath);

			FileUtil.renameFile(contentFilePath, parentDirPath + File.separator + backupContentFilePath);
			FileUtil.renameFile(offsetFilePath, parentDirPath + File.separator + backupOffsetFilePath);

			FileUtil.createFileIfNotExists(contentFilePath);
			FileUtil.createFileIfNotExists(offsetFilePath);

			randomAccessFile = new RandomAccessFile(contentFilePath, "rw");
			write(activeEntries);

		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			compact_operation_in_progress = false;
		}

	}

	@Override
	public void close() {
		String offsetsInfo = JsonUtil.getJson(offsetMapForKeys);
		FileUtil.write(offsetFilePath, offsetsInfo);
		try {
			randomAccessFile.close();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

Define Employee class.

 

Employee.java

package com.sample.app.dto;

public class Employee {
	private String id;
	private String name;
	private String city;

	public Employee() {
	}

	public Employee(String id, String name, String city) {
		this.id = id;
		this.name = name;
		this.city = city;
	}

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public String getCity() {
		return city;
	}

	public void setCity(String city) {
		this.city = city;
	}

}

Define Application class to test the application.

 

App.java

package com.sample.app;

import java.io.IOException;

import com.sample.app.dto.Employee;
import com.sample.loghash.storage.*;
import com.sample.loghash.util.JsonUtil;

import java.util.*;

public class App {

	private static List<Employee> getNEmployees(int noOfEmployees, int startId) {
		List<Employee> emps = new ArrayList<>();
		for (int i = startId; i < (startId + noOfEmployees); i++) {
			emps.add(new Employee("" + i, "name_" + i, "city_" + i));
		}
		return emps;
	}

	private static Map<String, String> convertToKeyValuePairs(List<Employee> emps) {
		Map<String, String> resultMap = new HashMap<>();

		for (Employee emp : emps) {
			String key = emp.getId();
			String value = JsonUtil.getJson(emp);
			resultMap.put(key, value);
		}
		return resultMap;
	}

	public static void main(String[] args) throws IOException {
		String contentFilePath = "/Users/Shared/storage-engine/content.dat";
		String offsetFilePath = "/Users/Shared/storage-engine/offsets.json";

		try (StorageEngine engine = StorageEngine.of(contentFilePath, offsetFilePath, "DELETED")) {

			List<Employee> emps = getNEmployees(5, 1);
			Map<String, String> entries = convertToKeyValuePairs(emps);
			engine.writeKeys(entries);

			System.out.println("All the entries in storage engine are : ");
			System.out.println(engine.readKeys());

			System.out.println("\nUpdate entry with id '3'");
			entries = convertToKeyValuePairs(Arrays.asList(new Employee("3", "updated_name", "updated_city")));
			engine.writeKeys(entries);

			System.out.println("\nAll the entries in storage engine are : ");
			System.out.println(engine.readKeys());

			System.out.println("\nDelete entries with keys 1 and 4");
			engine.delete("1");
			engine.delete("4");
			System.out.println("\nAll the entries in storage engine are : ");
			System.out.println(engine.readKeys());

			System.out.println("\nEntry with key 2 is ");
			System.out.println(engine.readKey("2"));
			
			// Call this when you see there are many stale entries to cleanup
			// engine.compact();
		}
	}

}

Output

***************** Storage Engine is ready to use ***************** 
All the entries in storage engine are : 
{1={"id":"1","name":"name_1","city":"city_1"}, 2={"id":"2","name":"name_2","city":"city_2"}, 3={"id":"3","name":"name_3","city":"city_3"}, 4={"id":"4","name":"name_4","city":"city_4"}, 5={"id":"5","name":"name_5","city":"city_5"}}

Update entry with id '3'

All the entries in storage engine are : 
{1={"id":"1","name":"name_1","city":"city_1"}, 2={"id":"2","name":"name_2","city":"city_2"}, 3={"id":"3","name":"updated_name","city":"updated_city"}, 4={"id":"4","name":"name_4","city":"city_4"}, 5={"id":"5","name":"name_5","city":"city_5"}}

Delete entries with keys 1 and 4

All the entries in storage engine are : 
{2={"id":"2","name":"name_2","city":"city_2"}, 3={"id":"3","name":"updated_name","city":"updated_city"}, 5={"id":"5","name":"name_5","city":"city_5"}}

Entry with key 2 is 
{"id":"2","name":"name_2","city":"city_2"}

Limitation on value lengths

This example use readUTF, writeUTF methods to write the data to a file. Both writeUTF and readUTF methods in Java have limitations on the length of strings they can handle. These methods are designed to support maximum of 65535 characters (including non-ASCII characters). Exceeding this limit can lead to unpredictable behavior or even exceptions.

 

You can download this application from this link.


Enhancements to the above app

a. Rather than maintaining a separate JSON file for storing offsets, we can read the actual content file and populate the HashMap. This approach proves beneficial in addressing crash recovery issues. Refer this https://github.com/harikrishna553/java-libs/blob/master/log-structured-hash-table/log-structured-hash-table/src/main/java/com/sample/loghash/storage/StorageEngineRecoverFromFile.java

 

b. The application is susceptible to crashes at any point, even in the middle of appending a record to the log. Incorporating checksums enables the detection and exclusion of corrupted sections in the log.

 

c. Concurrent reading of data is permissible, but concurrency must be managed during the writing process.

 

d. While storing <key, value> pairs, it's possible to include timestamps.

 

e. In this context, I utilized the writeUTF and readUTF methods, which have limitations on value length. Alternatively, you can employ a binary format that initially encodes the length of a string in bytes, followed by the raw string.

 

References

http://highscalability.com/blog/2011/1/10/riaks-bitcask-a-log-structured-hash-table-for-fast-keyvalue.html/




                                                             System Design Questions

No comments:

Post a Comment