Wednesday, 24 August 2016

Spymemcached: cas function

CAS command is very useful in concurrent applications. For example, you want to deduct some amount from customer account on every transaction.

Thread 1 reads cust1 account balance as 1500
Thread 2 also reads the cust1 account balance as 1500
Thread 2 deduct 300 from cust1 account = 1500-300 = 1200
Thread 1 deduct 100 from cust1 account = 1500-100 = 1400.

As you see above flow, Actual balance to be deducted from cust1 account is 400, but final amount is 1400. To handle these kind of scenarios, we should use CAS command.

Syntax
cas key flags exptime bytes unique_cas_token [noreply]
value

Option
Description
key
Name of the key.
flags
It is 32-bit unsigned integer, stored along with the data in the server and retrieved when you get the <key value> pair.
exptime
Expiry time of the data stored in cache. It is represented in seconds.
bytes
length of the data in bytes that needs to be stored in Memcached.
unique_cas_token
It is a unique token number obtained from gets comamand.
noreply
It is optional, when you pass this parameter, server don’t send any reply back to you.
value
Value to be saved to memcached


$ telnet 127.0.0.1 11211
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
set balance 0 900 4
1500
STORED
gets balance
VALUE balance 0 4 36
1500
END


From the above output, you can see the key ‘balance’ jas cas version 36.
set balance 0 900 4
1200
STORED
gets balance
VALUE balance 0 4 37
1200
END


As you see, after the updation of balance, cas number is incremented to 37. Now let me try to update the balance amount, by specifying the cas version 36.
cas balance 0 900 4 36
1400
EXISTS
gets balance
VALUE balance 0 4 37
1200
END


As you see, balance is not reflected when I tried to update the data with lower cas version. Let me try to update data with current version.
cas balance 0 900 4 37
1400
STORED
gets balance
VALUE balance 0 4 38
1400
END


Following is the complete working Java application.
package com.sampe.cache;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import net.spy.memcached.MemcachedClient;

/**
 * Utility class to create client.
 * 
 * @author harikrishna_gurram
 *
 */
public class CacheClientUtil {

  private static MemcachedClient client = null;
  private static Logger logger = LogManager.getLogger();

  public static Optional<MemcachedClient> getClient(InetSocketAddress addr) {
    if (Objects.isNull(addr)) {
      logger.error("getClient: addr shouldn't be null");
      return Optional.empty();
    }

    try {
      client = new MemcachedClient(addr);
    } catch (IOException e) {
      logger.error(e);
      return Optional.empty();
    }

    return Optional.of(client);
  }

  public static Optional<MemcachedClient> getClient(List<InetSocketAddress> addrs) {
    if (Objects.isNull(addrs)) {
      logger.error("getClient: addrs shouldn't be null");
      return Optional.empty();
    }

    try {
      client = new MemcachedClient(addrs);
    } catch (IOException e) {
      logger.error(e);
      return Optional.empty();
    }

    return Optional.of(client);
  }

  public static void shutdownClient(MemcachedClient client) {
    if (Objects.isNull(client)) {
      logger.error("shutdownClient: client object is already closed");
      return;
    }

    client.shutdown();
  }
}

package com.sampe.cache;

import java.net.InetSocketAddress;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import net.spy.memcached.CASValue;
import net.spy.memcached.MemcachedClient;

public class Test {
  private static Logger logger = LogManager.getLogger();

  public static void main(String args[]) throws InterruptedException, ExecutionException {
    /* Get MemcachedClient */
    InetSocketAddress address = new InetSocketAddress("127.0.0.1", 11211);

    Optional<MemcachedClient> client = CacheClientUtil.getClient(address);

    if (!client.isPresent()) {
      logger.error("Unable to create client instance");
      return;
    }

    MemcachedClient memClient = client.get();

    /* Set some data */
    String data = "Tutorial";

    Future<Boolean> future = memClient.set("12345", 900, data);

    System.out.println("set status:" + future.get());

    System.out.println("Value of the key 12345 is " + memClient.get("12345"));

    CASValue casValue = memClient.gets("12345");

    System.out.println("cas value is " + casValue.getCas());

    System.out.println("\nUpdating data using cas function\n");

    memClient.cas("12345", casValue.getCas(), 900, "Memcached Tutorial");

    System.out.println("Value of the key 12345 is " + memClient.get("12345"));

    casValue = memClient.gets("12345");

    System.out.println("cas value is " + casValue.getCas());

    CacheClientUtil.shutdownClient(memClient);

  }
}


Output
set status:true
Value of the key 12345 is Tutorial
cas value is 41

Updating data using cas function

Value of the key 12345 is Memcached Tutorial
cas value is 42



Previous                                                 Next                                                 Home

No comments:

Post a Comment