Monday 23 November 2015

Elasticsearch: Java : Upsert update

This post is continuation to my previous post. In previous post, I explained how to update a document; there is one problem with updateDocument method. If the document that you want to update is not exist, then it throws an exception. We can handle these kind of situations using upsert.

How upsert solves my problem.
If the document does not already exist, the contents of the upsert element will be inserted as a new document.
This post is continuation to my previous post, so I am not going to give all java classes here. I updated UpdateUtil class by adding new function, upsertUpdate.
package com.self_learn.util;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.XContentBuilder;

import com.google.common.base.Preconditions;

/**
 * Provides various utility methods to update a document.
 * 
 * @author harikrishna_gurram
 */
public class UpdateUtil {

 /**
  * Updates document with given fields (Fields are given as map)
  * 
  * @param client
  * @param _index
  * @param _type
  * @param _id
  * @param map
  * @return {@link UpdateResponse}
  * @throws IOException
  * @throws InterruptedException
  * @throws ExecutionException
  */
 public static UpdateResponse updateDocument(Client client, String _index,
   String _type, String _id, Map<String, Object> map)
   throws IOException, InterruptedException, ExecutionException {
  Preconditions.checkNotNull(client, "client should not be null");
  Preconditions.checkNotNull(_index, "_index should not be null");
  Preconditions.checkNotNull(_type, "_type should not be null");
  Preconditions.checkNotNull(_id, "_id should not be null");
  Preconditions.checkNotNull(map, "map should not be null");

  Set<String> keys = map.keySet();

  XContentBuilder builder = jsonBuilder().startObject();
  for (String key : keys) {
   builder.field(key, map.get(key));
  }
  builder.endObject();

  UpdateRequest updateRequest = new UpdateRequest();
  updateRequest.index(_index);
  updateRequest.type(_type);
  updateRequest.id(_id);
  updateRequest.doc(builder);

  UpdateResponse resposne = client.update(updateRequest).get();
  return resposne;
 }

 /**
  * If the document does not already exist, the contents of the upsertMap
  * will be inserted as a new document.
  * 
  * @param client
  * @param _index
  * @param _type
  * @param _id
  * @param map
  * @param upsertMap
  * @return
  * @throws IOException
  * @throws ExecutionException
  * @throws InterruptedException
  */
 public static UpdateResponse upsertUpdate(Client client, String _index,
   String _type, String _id, Map<String, Object> map,
   Map<String, Object> upsertMap) throws IOException,
   InterruptedException, ExecutionException {
  Preconditions.checkNotNull(client, "client should not be null");
  Preconditions.checkNotNull(_index, "_index should not be null");
  Preconditions.checkNotNull(_type, "_type should not be null");
  Preconditions.checkNotNull(_id, "_id should not be null");
  Preconditions.checkNotNull(map, "map should not be null");
  Preconditions.checkNotNull(upsertMap, "upsertMap should not be null");

  IndexRequest indexRequest = new IndexRequest(_index, _type, _id);
  Set<String> keys1 = upsertMap.keySet();
  XContentBuilder builder1 = jsonBuilder().startObject();
  for (String key : keys1) {
   builder1.field(key, upsertMap.get(key));
  }
  builder1.endObject();
  indexRequest.source(builder1);

  Set<String> keys = map.keySet();
  XContentBuilder builder = jsonBuilder().startObject();
  for (String key : keys) {
   builder.field(key, map.get(key));
  }
  builder.endObject();

  UpdateRequest updateRequest = new UpdateRequest();
  updateRequest.index(_index);
  updateRequest.type(_type);
  updateRequest.id(_id);
  updateRequest.doc(builder);
  updateRequest.upsert(indexRequest);

  UpdateResponse response = client.update(updateRequest).get();
  return response;
 }
}

package com.self_learn.test;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;

import com.self_learn.util.ResponseUtil;
import com.self_learn.util.SearchUtil;
import com.self_learn.util.TransportClientUtil;
import com.self_learn.util.UpdateUtil;

public class Main {
 private static String clusterName = "my_cluster_1";
 private static String _index = "organization";
 private static String _type = "employee";

 private static void printStars() {
  System.out.println("*************************************************");
 }

 public static void main(String args[]) throws IOException,
   InterruptedException, ExecutionException {
  /* Get client instance for cluster */
  Client client = TransportClientUtil.getLocalTransportClient(
    clusterName, 9300);

  Map<String, Object> map = new HashMap<>();
  map.put("firstName", "Krishna");
  map.put("hobbies", Arrays.asList("Playing Cricket", "Watching movies"));

  Map<String, Object> upsertMap = new HashMap<>();
  upsertMap.put("firstName", "dummy");
  upsertMap.put("hobbies", Arrays.asList("hobby1", "hobby2"));

  /* Update document */
  UpdateResponse updateResponse = UpdateUtil.upsertUpdate(client, _index,
    _type, "7", map, upsertMap);
  System.out.println(ResponseUtil.getResponseInfo(updateResponse));
  printStars();

  /* Query for the object with id 1 */
  System.out.println("Reading data from Elasticsearch");
  GetResponse response = SearchUtil.getDocumentById(client, _index,
    _type, "7");
  System.out.println(ResponseUtil.getResponseInfo(response));

  client.close();
 }
}


Since document with _id 7 is not exist, when you ran above application, you will get following output.

Sep 09, 2015 9:13:06 PM org.elasticsearch.plugins.PluginsService <init>
INFO: [Apocalypse] loaded [], sites []
_index: organization
_type: employee
_id: 7
_version: 1
created: true
*************************************************
Reading data from Elasticsearch
_index: organization
_type: employee
_id: 7
_version: 1
_source: {"firstName":"dummy","hobbies":["hobby1","hobby2"]}



Prevoius                                                 Next                                                 Home

No comments:

Post a Comment