Monday 23 November 2015

Utility classes for Elastic search



package com.self_learn.constants;

/**
 * Specifies the type of BulkRequest
 * 
 * @author harikrishna_gurram
 */
public enum BulkRequestType {
 INDEX, UPDATE, UPSERT_UPDATE, DELETE
}


package com.self_learn.model;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import com.self_learn.constants.BulkRequestType;

@EqualsAndHashCode()
@ToString
public class BulkObject {

 @Getter
 @Setter
 private String _index;
 @Getter
 @Setter
 private String _type;
 @Getter
 @Setter
 private String _id;
 @Getter
 @Setter
 private String _source;
 @Getter
 @Setter
 private String _upsertSource;
 @Getter
 @Setter
 private BulkRequestType type;

 public BulkObject(String _index, String _type, String _id,
   BulkRequestType type) {
  this._index = _index;
  this._type = _type;
  this._id = _id;
  this.type = type;
 }

 public BulkObject(String _index, String _type, String _id,
   BulkRequestType type, String _source) {
  this(_index, _type, _id, type);
  this._source = _source;
 }

 public BulkObject(String _index, String _type, String _id,
   BulkRequestType type, String _source, String _upsertSource) {
  this(_index, _type, _id, type, _source);
  this._upsertSource = _upsertSource;
 }
}


package com.self_learn.util;

import java.util.Set;

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.base.Preconditions;
import org.elasticsearch.common.lang3.StringUtils;

import com.self_learn.constants.BulkRequestType;
import com.self_learn.model.BulkObject;

/**
 * Utility class to execute bulk requests.
 * 
 * @author harikrishna_gurram
 */
public class BulkUtil {

 private static BulkObject getBulkObject(String _index, String _type,
   String _id, BulkRequestType type) {
  return new BulkObject(_index, _type, _id, type);
 }

 private static BulkObject getBulkObject(String _index, String _type,
   String _id, BulkRequestType type, String _source) {
  return new BulkObject(_index, _type, _id, type, _source);
 }

 private static BulkObject getBulkObject(String _index, String _type,
   String _id, BulkRequestType type, String _source,
   String _upsertSource) {
  return new BulkObject(_index, _type, _id, type, _source, _upsertSource);
 }

 public static BulkObject getIndexObject(String _index, String _type,
   String _id, String _source) {
  Preconditions.checkNotNull(_index, "_index shouldn't be null");
  Preconditions.checkNotNull(_type, "_type shouldn't be null");
  Preconditions.checkNotNull(_id, "_id shouldn't be null");
  Preconditions.checkNotNull(_source, "_source shouldn't be null");
  Preconditions.checkState((!StringUtils.isEmpty(_index)),
    "_index should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_type)),
    "_type should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_id)),
    "_id should not be empty");

  return getBulkObject(_index, _type, _id, BulkRequestType.INDEX, _source);
 }

 public static BulkObject getDeleteObject(String _index, String _type,
   String _id) {
  Preconditions.checkNotNull(_index, "_index shouldn't be null");
  Preconditions.checkNotNull(_type, "_type shouldn't be null");
  Preconditions.checkNotNull(_id, "_id shouldn't be null");
  Preconditions.checkState((!StringUtils.isEmpty(_index)),
    "_index should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_type)),
    "_type should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_id)),
    "_id should not be empty");
  return getBulkObject(_index, _type, _id, BulkRequestType.DELETE);
 }

 public static BulkObject getUpdateObject(String _index, String _type,
   String _id, String _source) {
  Preconditions.checkNotNull(_index, "_index shouldn't be null");
  Preconditions.checkNotNull(_type, "_type shouldn't be null");
  Preconditions.checkNotNull(_id, "_id shouldn't be null");
  Preconditions.checkNotNull(_source, "_source shouldn't be null");
  Preconditions.checkState((!StringUtils.isEmpty(_index)),
    "_index should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_type)),
    "_type should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_id)),
    "_id should not be empty");
  return getBulkObject(_index, _type, _id, BulkRequestType.UPDATE,
    _source);
 }

 public static BulkObject getUpsertUpdateObject(String _index, String _type,
   String _id, String _source, String _upsertSource) {
  Preconditions.checkNotNull(_index, "_index shouldn't be null");
  Preconditions.checkNotNull(_type, "_type shouldn't be null");
  Preconditions.checkNotNull(_id, "_id shouldn't be null");
  Preconditions.checkNotNull(_source, "_source shouldn't be null");
  Preconditions.checkNotNull(_upsertSource,
    "_upsertSource shouldn't be null");
  Preconditions.checkState((!StringUtils.isEmpty(_index)),
    "_index should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_type)),
    "_type should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_id)),
    "_id should not be empty");
  return getBulkObject(_index, _type, _id, BulkRequestType.UPSERT_UPDATE,
    _source, _upsertSource);
 }

 public static BulkObject getIndexObject(String _index, String _type,
   String _id, Object _source) {
  Preconditions.checkNotNull(_index, "_index shouldn't be null");
  Preconditions.checkNotNull(_type, "_type shouldn't be null");
  Preconditions.checkNotNull(_id, "_id shouldn't be null");
  Preconditions.checkNotNull(_source, "_source shouldn't be null");
  Preconditions.checkState((!StringUtils.isEmpty(_index)),
    "_index should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_type)),
    "_type should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_id)),
    "_id should not be empty");
  
  return getIndexObject(_index, _type, _id, JSONUtil.getJson(_source));
 }

 public static BulkObject getUpdateObject(String _index, String _type,
   String _id, Object _source) {
  Preconditions.checkNotNull(_index, "_index shouldn't be null");
  Preconditions.checkNotNull(_type, "_type shouldn't be null");
  Preconditions.checkNotNull(_id, "_id shouldn't be null");
  Preconditions.checkNotNull(_source, "_source shouldn't be null");
  Preconditions.checkState((!StringUtils.isEmpty(_index)),
    "_index should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_type)),
    "_type should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_id)),
    "_id should not be empty");
  return getUpdateObject(_index, _type, _id, JSONUtil.getJson(_source));
 }

 public static BulkObject getUpsertUpdateObject(String _index, String _type,
   String _id, Object _source, Object _upsertSource) {
  Preconditions.checkNotNull(_index, "_index shouldn't be null");
  Preconditions.checkNotNull(_type, "_type shouldn't be null");
  Preconditions.checkNotNull(_id, "_id shouldn't be null");
  Preconditions.checkNotNull(_source, "_source shouldn't be null");
  Preconditions.checkNotNull(_upsertSource, "_upsertSource shouldn't be null");
  Preconditions.checkState((!StringUtils.isEmpty(_index)),
    "_index should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_type)),
    "_type should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_id)),
    "_id should not be empty");
  
  return getUpsertUpdateObject(_index, _type, _id,
    JSONUtil.getJson(_source), JSONUtil.getJson(_upsertSource));
 }

 private static boolean isNull(Object obj) {
  return (obj == null);
 }

 private static boolean isNotNull(Object obj) {
  return !isNull(obj);
 }

 /**
  * Validate BulkObject based on Request type.
  * 
  * If Request type
  * 
  * a. is @{link BulkRequestType.INDEX} then BulkObject must has _index,
  * _type, _id, _source.
  * 
  * b. is @{link BulkRequestType.UPDATE} then BulkObject must has _index,
  * _type, _id, _source.
  * 
  * c. is @{link BulkRequestType.UPSERT_UPDATE} then BulkObject must has
  * _index, _type, _id, _source, _upsertSource.
  * 
  * d. is @{link BulkRequestType.DELETE} then BulkObject must has _index,
  * _type, _id.
  * 
  * @param type
  *            Represents Request type, it can be INDEX, UPDATE,
  *            UPSERT_UPDATE, DELETE.
  * 
  * @param obj
  * @return
  */
 private static boolean isValidBulkObject(BulkObject obj) {
  if (obj == null)
   return false;

  String _index = obj.get_index();
  String _type = obj.get_type();
  String _id = obj.get_id();
  String _source = obj.get_source();
  String _upsertSource = obj.get_upsertSource();
  BulkRequestType type = obj.getType();

  switch (type) {
  case INDEX:
   if (isNotNull(_index) && isNotNull(_type) && isNotNull(_id)
     && isNotNull(_source))
    return true;
   break;
  case UPDATE:
   if (isNotNull(_index) && isNotNull(_type) && isNotNull(_id)
     && isNotNull(_source))
    return true;
   break;
  case UPSERT_UPDATE:
   if (isNotNull(_index) && isNotNull(_type) && isNotNull(_id)
     && isNotNull(_source) && isNotNull(_upsertSource))
    return true;
   break;
  case DELETE:
   if (isNotNull(_index) && isNotNull(_type) && isNotNull(_id))
    return true;
   break;
  }
  return false;
 }

 /**
  * method takes Map of requests and execute bulk requests. Key of map
  * represents type of request.
  * 
  * @{link BulkRequestType.INDEX} represents Index request
  * @{link BulkRequestType.UPDATE} represents update request
  * @{link BulkRequestType.UPSERT_UPDATE} represents UPSERT_UPDATE request
  * @{link BulkRequestType.DELETE} represents delete request
  * 
  *        If BulkObject is invalid, then the request is simply ignored.
  * 
  * @param bulkRequest
  * @return
  */
 public static BulkResponse execute(Client client, Set<BulkObject> bulkReq) {
  Preconditions.checkNotNull(client, "client shouldn't be null");
  Preconditions.checkNotNull(bulkReq, "bulkReq shouldn't be null");

  BulkRequestBuilder bulkRequest = client.prepareBulk();

  for (BulkObject obj : bulkReq) {
   if (!isValidBulkObject(obj))
    continue;

   String _index = obj.get_index();
   String _type = obj.get_type();
   String _id = obj.get_id();
   String _source = obj.get_source();
   String _upsertSource = obj.get_upsertSource();

   BulkRequestType type = obj.getType();

   switch (type) {
   case INDEX:
    bulkRequest.add(new IndexRequest(_index, _type, _id)
      .source(_source));
    break;
   case UPDATE:
    bulkRequest.add(new UpdateRequest(_index, _type, _id)
      .doc(_source));
    break;
   case UPSERT_UPDATE:
    bulkRequest.add(new UpdateRequest(_index, _type, _id).doc(
      _source).upsert(_upsertSource));
    break;
   case DELETE:
    bulkRequest.add(new DeleteRequest(_index, _type, _id));
    break;

   default:
    System.out.println("Invalid Request");
   }
  }

  return bulkRequest.execute().actionGet();
 }
}


package com.self_learn.util;

import static org.elasticsearch.node.NodeBuilder.nodeBuilder;

import java.util.HashMap;
import java.util.Map;

import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.node.Node;

import com.google.common.base.Preconditions;

/**
 * Returns @link{Client} instance.
 * 
 * @author harikrishna_gurram
 */
public class ClientUtil {
 private static Node localNode = null;
 private static Node localPassiveNode = null;
 private static Map<String, Node> activeNodes = new HashMap<>();
 private static Map<String, Node> passiveNodes = new HashMap<>();

 public static void closeLocalNode() {
  Preconditions.checkNotNull(localNode, "localNode is not initialized");
  localNode.close();
 }

 public static void closeLocalPassiveNode() {
  Preconditions.checkNotNull(localPassiveNode,
    "localPassiveNode is not initialized");
  localPassiveNode.close();
 }

 public static void closeActiveNode(String clusterName) {
  Preconditions
    .checkNotNull(clusterName, "clusterName shouldn't be null");
  Preconditions.checkNotNull(activeNodes.get(clusterName),
    "There is no node for cluster " + clusterName);
  activeNodes.get(clusterName).close();
 }

 public static void closePassiveNode(String clusterName) {
  Preconditions
    .checkNotNull(clusterName, "clusterName shouldn't be null");
  Preconditions.checkNotNull(passiveNodes.get(clusterName),
    "There is no node for cluster " + clusterName);
  passiveNodes.get(clusterName).close();
 }

 public static Client getClient() {
  if (localNode == null) {
   localNode = nodeBuilder()
     .settings(
       ImmutableSettings.settingsBuilder().put(
         "http.enabled", false)).client(true).node();

   Client client = localNode.client();
   return client;
  }
  return null;
 }

 /* Used to return passive node (Don't store any data in it) */
 public static Client getPassiveClient() {

  if (localPassiveNode == null) {
   localPassiveNode = nodeBuilder()
     .settings(
       ImmutableSettings.settingsBuilder().put(
         "http.enabled", false)).client(true).node();
   Client client = localPassiveNode.client();
   return client;
  }

  return null;
 }

 public static Client getClientForCluster(String clusterName) {
  Preconditions
    .checkNotNull(clusterName, "clusterName shouldn't be null");

  if (activeNodes.get(clusterName) == null) {
   Node node = nodeBuilder()
     .settings(
       ImmutableSettings.settingsBuilder().put(
         "http.enabled", false))
     .clusterName(clusterName).node();
   Client client = node.client();
   activeNodes.put(clusterName, node);
   return client;
  }

  return activeNodes.get(clusterName).client();
 }

 /* Used to return passive node (Don't store any data in it) */
 public static Client getPassiveClientForCluster(String clusterName) {
  Preconditions
    .checkNotNull(clusterName, "clusterName shouldn't be null");

  if (passiveNodes.get(clusterName) == null) {
   Node node = nodeBuilder()
     .settings(
       ImmutableSettings.settingsBuilder().put(
         "http.enabled", false)).client(true)
     .clusterName(clusterName).node();
   Client client = node.client();
   passiveNodes.put(clusterName, node);
   return client;
  }
  return passiveNodes.get(clusterName).client();
 }
}


package com.self_learn.util;

import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.lang3.StringUtils;

import com.google.common.base.Preconditions;

/**
 * Provide utility methods to delete document from Elasticsearch.
 * 
 * @author harikrishna_gurram
 */
public class DeleteUtil {

 /**
  * Removes the docuement and return DeleteResponse.
  * 
  * @param client
  * @param _index
  * @param _type
  * @param _id
  * @return DeleteResponse
  */
 public static DeleteResponse deleteDocument(Client client, String _index,
   String _type, String _id) {
  Preconditions.checkNotNull(client, "client should not be null");
  Preconditions.checkNotNull(_index, "_index should not be null");
  Preconditions.checkNotNull(_type, "_type should not be null");
  Preconditions.checkNotNull(_id, "_id should not be null");

  Preconditions.checkState((!StringUtils.isEmpty(_index)),
    "_index should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_type)),
    "_type should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_id)),
    "_id should not be empty");

  DeleteResponse response = client.prepareDelete(_index, _type, _id)
    .execute().actionGet();
  return response;
 }
}


package com.self_learn.util;

import java.util.Set;

import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.hppc.ObjectLookupContainer;
import org.elasticsearch.common.lang3.StringUtils;

import com.google.common.base.Preconditions;

/**
 * Provides utility methods to store data into Elasticsearch.
 * 
 * @author harikrishna_gurram
 */
public class IndexUtil {

 /**
  * Index given document.
  * 
  * @param client
  *            : Client used to index data
  * @param _index
  *            : Document is stored in this index
  * @param _type
  *            : Document stored in this type
  * @param _id
  *            : Specifies _id of the document
  * @param document
  *            : Represents body of the document
  * @return {@link IndexResponse}
  */
 public static IndexResponse indexData(Client client, String _index,
   String _type, String _id, String document) {
  Preconditions.checkNotNull(client, "client should not be null");
  Preconditions.checkNotNull(_index, "_index should not be null");
  Preconditions.checkNotNull(_type, "_type should not be null");
  Preconditions.checkNotNull(_id, "_id should not be null");
  Preconditions.checkNotNull(document, "data should not be null");

  Preconditions.checkState((!StringUtils.isEmpty(_index)),
    "_index should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_type)),
    "_type should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_id)),
    "_id should not be empty");

  IndexResponse response = client.prepareIndex(_index, _type, _id)
    .setSource(document).execute().actionGet();
  return response;
 }

 /**
  * Index given object.
  * 
  * @param client
  *            : Client used to index data
  * @param _index
  *            : Document is stored in this index
  * @param _type
  *            : Document stored in this type
  * @param _id
  *            : Specifies _id of the document
  * @param obj
  *            : Object to index
  * @return {@link IndexResponse}
  */
 public static IndexResponse indexData(Client client, String _index,
   String _type, String _id, Object obj) {
  Preconditions.checkNotNull(client, "client should not be null");
  Preconditions.checkNotNull(_index, "_index should not be null");
  Preconditions.checkNotNull(_type, "_type should not be null");
  Preconditions.checkNotNull(_id, "_id should not be null");
  Preconditions.checkNotNull(obj, "data should not be null");
  Preconditions.checkState((!StringUtils.isEmpty(_index)),
    "_index should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_type)),
    "_type should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_id)),
    "_id should not be empty");

  return indexData(client, _index, _type, _id, JSONUtil.getJson(obj));
 }

 /**
  * Return all aliases of indices
  * 
  * @param client
  * @return
  */
 public static ImmutableOpenMap<String, ImmutableOpenMap<String, AliasMetaData>> getAllIndicesAlases(
   Client client) {
  Preconditions.checkNotNull(client, "Client shouldn't be null");
  return client.admin().cluster().prepareState().execute().actionGet()
    .getState().getMetaData().aliases();
 }

 /**
  * Return all indices from cluster.
  * 
  * @param client
  * @return
  */
 public static Set<String> getAllIndices(Client client) {
  Preconditions.checkNotNull(client, "client shouldn't be null");
  return client.admin().indices().stats(new IndicesStatsRequest())
    .actionGet().getIndices().keySet();
 }

 /**
  * Return all mappings of given index
  * 
  * @param client
  * @param index
  * @return
  */
 public static ImmutableOpenMap<String, MappingMetaData> getMappingsOfIndex(
   Client client, String index) {
  Preconditions.checkNotNull(client, "client shouldn't be null");
  Preconditions.checkNotNull(index, "index shouldn't be null");
  Preconditions.checkState((!StringUtils.isEmpty(index)),
    "index should not be empty");
  ClusterStateResponse clusterStateResponse = client.admin().cluster()
    .prepareState().execute().actionGet();
  return clusterStateResponse.getState().getMetaData().index(index)
    .getMappings();
 }

 /**
  * Get all types in given index
  * 
  * @param client
  * @param index
  * @return
  */
 public static ObjectLookupContainer<String> getAllTypesFromIndex(
   Client client, String index) {
  Preconditions.checkNotNull(client, "client shouldn't be null");
  Preconditions.checkNotNull(index, "index shouldn't be null");
  Preconditions.checkState((!StringUtils.isEmpty(index)),
    "index should not be empty");
  return getMappingsOfIndex(client, index).keys();
 }
}


package com.self_learn.util;

import org.apache.commons.validator.routines.InetAddressValidator;
import org.elasticsearch.common.lang3.StringUtils;

import com.google.common.base.Preconditions;

/**
 * Validate IPaddresses, ports
 * 
 * @author harikrishna_gurram
 */
public class IPUtil {
 private static InetAddressValidator inetAddressValidator = InetAddressValidator
   .getInstance();

 /**
  * @param ipAddress
  * @return true if ip address is valid, else false
  */
 public static boolean isValidIPAddress(String ipAddress) {
  Preconditions.checkNotNull(ipAddress, "IP address should not be null");
  Preconditions.checkState((!StringUtils.isEmpty(ipAddress)),
    "ipAddress should not be empty");
  return inetAddressValidator.isValid(ipAddress);
 }

 /**
  * @param port
  *            : Port number
  * @return true if port number is valid, else false
  */
 public static boolean isValidPort(int port) {
  if (port > 0 && port < 65536)
   return true;
  return false;
 }

 /**
  * @param hostNames
  * @return true if all the elements of array represents valid hosnames, else
  *         false.
  */
 public static boolean isValidHosts(String[] hostNames) {
  Preconditions.checkNotNull(hostNames, "Host names shouldn't be empty");
  for (String hostName : hostNames) {
   if (!isValidIPAddress(hostName)) {
    return false;
   }
  }
  return true;
 }

 /**
  * 
  * @param ports
  * @return true if all the elements of array represents valid ports, else
  *         false.
  */
 public static boolean isValidPorts(Integer[] ports) {
  Preconditions.checkNotNull(ports, "ports shouldn't be empty");
  for (int port : ports) {
   if (!isValidPort(port)) {
    return false;
   }
  }
  return true;
 }

}


package com.self_learn.util;

import org.elasticsearch.common.base.Preconditions;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

/**
 * Convert object to json string.
 * 
 * @author harikrishna_gurram
 */
public class JSONUtil {
 private static Gson gson = new Gson();
 private static Gson prettyGson = new GsonBuilder().setPrettyPrinting()
   .create();

 /**
  * @param obj
  * @return json string of this object.
  */
 public static String getJson(Object obj) {
  Preconditions.checkNotNull(obj, "obj shouldn't be null");
  return gson.toJson(obj);
 }

 /**
  * @param obj
  * @return json string of this object (Pretty json).
  */
 public static String getPrettyJson(Object obj) {
  Preconditions.checkNotNull(obj, "obj shouldn't be null");
  return prettyGson.toJson(obj);
 }

 /**
  * Convert given json string to object.
  * 
  * @param json
  * @param obj
  * @return an object by populating properties with json data.
  */
 public static <T> T getObject(String json, Class<T> clazz) {
  Preconditions.checkNotNull(json, "json data shouldn't be null");
  Preconditions.checkNotNull(clazz, "clazz shouldn't be null");
  return gson.fromJson(json, clazz);
 }
}

package com.self_learn.util;

import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;

import com.google.common.base.Preconditions;

/**
 * Utility class to return response in string format.
 * 
 * @author harikrishna_gurram
 */
public class ResponseUtil {

 /**
  * Returns source of the result as string.
  * 
  * @param response
  * @return
  */
 public static String getSource(GetResponse response, boolean pretty) {
  Preconditions.checkNotNull(response, "response shouldn't be null");
  return response.getSourceAsString();
 }

 /**
  * @param response
  * @return string representation of {@link BulkResponse}
  */
 public static String getResponseInfo(Object response) {
  return response.toString();
 }
}


package com.self_learn.util;

import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.lang3.StringUtils;
import org.elasticsearch.index.query.QueryBuilder;

import com.google.common.base.Preconditions;

/**
 * Provide various utility methods to query Elasticsearch.
 * 
 * @author harikrishna_gurram
 */
public class SearchUtil {

 /**
  * Returns the document by id (Takes _index, _type, _id as input).
  * 
  * @param client
  * @param _index
  * @param _type
  * @param _id
  * @return the document by id
  */
 public static GetResponse getDocumentById(Client client, String _index,
   String _type, String _id) {
  Preconditions.checkNotNull(client, "client should not be null");
  Preconditions.checkNotNull(_index, "_index should not be null");
  Preconditions.checkNotNull(_type, "_type should not be null");
  Preconditions.checkNotNull(_id, "_id should not be null");

  Preconditions.checkState((!StringUtils.isEmpty(_index)),
    "_index should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_type)),
    "_type should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_id)),
    "_id should not be empty");

  GetResponse response = client.prepareGet(_index, _type, _id).execute()
    .actionGet();
  return response;
 }

 /**
  * Return all the documents from a cluster.
  * 
  * @param client
  * @return
  */
 public static SearchResponse getAllDocuemnts(Client client) {
  Preconditions.checkNotNull(client, "client shouldn't be null");
  return client.prepareSearch().execute().actionGet();
 }

 /**
  * Return first n documents from cluster
  * 
  * @param client
  * @param n
  *            must > 0
  * @return first n documents from cluster
  */
 public static SearchResponse getFirstNDocuments(Client client, int n) {
  Preconditions.checkNotNull(client, "client shouldn't be null");
  Preconditions.checkState((n > 0), "n must greater than 0");
  return client.prepareSearch().setSize(n).execute().actionGet();
 }

 /**
  * Return documents from index from to from+size
  * 
  * @param client
  * @param from
  *            starting index
  * @param size
  *            Number of documents to return
  * @return documents from index from to from+size
  */
 public static SearchResponse getDocuments(Client client, int from, int size) {
  Preconditions.checkNotNull(client, "client shouldn't be null");
  Preconditions.checkState((from > 0), "from must greater than 0");
  Preconditions.checkState((size > 0), "size must greater than 0");
  return client.prepareSearch().setSize(size).setFrom(from).execute()
    .actionGet();
 }

 /**
  * Return all documents from given indices.
  * 
  * @param client
  * @param indices
  * @return
  */
 public static SearchResponse getAllDocuments(Client client,
   String... indices) {
  Preconditions.checkNotNull(client, "client shouldn't be null");
  Preconditions.checkNotNull(indices, "indeices shouldn't be null");
  return client.prepareSearch(indices).execute().actionGet();
 }

 /**
  * Return first n documents from given indices
  * 
  * @param client
  * @param n
  * @param indices
  * @return first n documents from given indices
  */
 public static SearchResponse getFirstNDocuments(Client client, int n,
   String... indices) {
  Preconditions.checkNotNull(client, "client shouldn't be null");
  Preconditions.checkState((n > 0), "n must greater than 0");
  Preconditions.checkNotNull(indices, "indices shouldn't be null");

  return client.prepareSearch(indices).setSize(n).execute().actionGet();
 }

 /**
  * Return documents from index from to from+size
  * 
  * @param client
  * @param from
  *            starting index
  * @param size
  *            Number of documents to return
  * @param indices
  * @return
  */
 public static SearchResponse getDocuments(Client client, int from,
   int size, String... indices) {
  Preconditions.checkNotNull(client, "client shouldn't be null");
  Preconditions.checkState((from > 0), "from must greater than 0");
  Preconditions.checkState((size > 0), "size must greater than 0");
  Preconditions.checkNotNull(indices, "indices shouldn't be null");
  return client.prepareSearch(indices).setSize(size).setFrom(from)
    .execute().actionGet();
 }

 /**
  * Return all documents from given indexes and types.
  * 
  * @param client
  * @param indices
  * @param types
  * @return @{link SearchResponse}
  */
 public static SearchResponse getAllDocuments(Client client,
   String[] indices, String[] types) {
  Preconditions.checkNotNull(client, "client shouldn't be null");
  Preconditions.checkNotNull(indices, "indeices shouldn't be null");
  Preconditions.checkNotNull(types, "types shouldn't be null");

  return client.prepareSearch(indices).setTypes(types).execute()
    .actionGet();
 }

 /**
  * Return all documents from given index and type.
  * 
  * @param client
  * @param _index
  * @param _type
  * @return @{link SearchResponse}
  */
 public static SearchResponse getAllDocuments(Client client, String _index,
   String _type) {
  Preconditions.checkNotNull(client, "client shouldn't be null");
  Preconditions.checkNotNull(_index, "indeices shouldn't be null");
  Preconditions.checkNotNull(_type, "types shouldn't be null");

  Preconditions.checkState((!StringUtils.isEmpty(_index)),
    "_index should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_type)),
    "_type should not be empty");

  return client.prepareSearch(_index).setTypes(_type).execute()
    .actionGet();
 }

 /**
  * Return first n documents from given indices and types.
  * 
  * @param client
  * @param n
  * @param indices
  * @param types
  * @return first n documents from given indices and types.
  */
 public static SearchResponse getFirstNDocuments(Client client, int n,
   String[] indices, String[] types) {
  Preconditions.checkNotNull(client, "client shouldn't be null");
  Preconditions.checkState((n > 0), "n must greater than 0");
  Preconditions.checkNotNull(indices, "indeices shouldn't be null");
  Preconditions.checkNotNull(types, "types shouldn't be null");

  return client.prepareSearch(indices).setTypes(types).setSize(n)
    .execute().actionGet();
 }

 /**
  * Return first n documents from given index and type.
  * 
  * @param client
  * @param n
  * @param _index
  * @param _type
  * @return first n documents from given index and type.
  */
 public static SearchResponse getFirstNDocuments(Client client, int n,
   String _index, String _type) {
  Preconditions.checkNotNull(client, "client shouldn't be null");
  Preconditions.checkState((n > 0), "n must greater than 0");
  Preconditions.checkNotNull(_index, "index shouldn't be null");
  Preconditions.checkNotNull(_type, "type shouldn't be null");

  Preconditions.checkState((!StringUtils.isEmpty(_index)),
    "_index should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_type)),
    "_type should not be empty");

  return client.prepareSearch(_index).setTypes(_type).setSize(n)
    .execute().actionGet();
 }

 /**
  * Return documents from index 'from' to 'from+size'.
  * 
  * @param client
  * @param from
  *            starting index
  * @param size
  *            Number of documents to return
  * @param indices
  * @param types
  * @return
  */
 public static SearchResponse getDocuments(Client client, int from,
   int size, String[] indices, String[] types) {
  Preconditions.checkNotNull(client, "client shouldn't be null");
  Preconditions.checkState((from > 0), "from must greater than 0");
  Preconditions.checkState((size > 0), "size must greater than 0");
  Preconditions.checkNotNull(indices, "indeices shouldn't be null");
  Preconditions.checkNotNull(types, "types shouldn't be null");

  return client.prepareSearch(indices).setTypes(types).setSize(size)
    .setFrom(from).execute().actionGet();
 }

 /**
  * Return documents from index 'from' to 'from+size'.
  * 
  * @param client
  * @param from
  *            starting index
  * @param size
  *            Number of documents to return
  * @param _index
  * @param _type
  * @return
  */
 public static SearchResponse getDocuments(Client client, int from,
   int size, String _index, String _type) {
  Preconditions.checkNotNull(client, "client shouldn't be null");
  Preconditions.checkState((from > 0), "from must greater than 0");
  Preconditions.checkState((size > 0), "size must greater than 0");
  Preconditions.checkNotNull(_index, "index shouldn't be null");
  Preconditions.checkNotNull(_type, "type shouldn't be null");

  Preconditions.checkState((!StringUtils.isEmpty(_index)),
    "_index should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_type)),
    "_type should not be empty");

  return client.prepareSearch(_index).setTypes(_type).setSize(size)
    .setFrom(from).execute().actionGet();
 }

 /**
  * Return number of matches for given query
  * 
  * @param client
  * @param builder
  * @param indices
  * @return
  */
 public static long getNumberOfMatches(Client client, QueryBuilder builder,
   String... indices) {
  Preconditions.checkNotNull(client, "client shouldn't be null");
  Preconditions.checkNotNull(builder, "builder shouldn't be null");
  Preconditions.checkNotNull(indices, "indices shouldn't be null");

  CountResponse response = client.prepareCount(indices).setQuery(builder)
    .execute().actionGet();
  return response.getCount();
 }

 /**
  * Get documents that matches to give query.
  * 
  * @param client
  * @param builder
  * @param indices
  * @return
  */
 public static SearchResponse getDocuments(Client client,
   QueryBuilder builder, String... indices) {
  Preconditions.checkNotNull(client, "Client shouldn't be null");
  Preconditions.checkNotNull(builder, "builder shouldn't be null");
  Preconditions.checkNotNull(indices, "indices shouldn't be null");

  return client.prepareSearch(indices).setQuery(builder).execute()
    .actionGet();
 }

 /**
  * Return first n documents that matches to given query.
  * 
  * @param client
  * @param n
  *            Number of documents to return
  * @param builder
  *            specifies query
  * @param indices
  *            indices to query
  * @return
  */
 public static SearchResponse getFirstNDocuments(Client client, int n,
   QueryBuilder builder, String... indices) {
  Preconditions.checkNotNull(client, "client shouldn't be null");
  Preconditions.checkState((n > 0), "n must greater than 0");
  Preconditions.checkNotNull(builder, "builder shouldn't be null");
  Preconditions.checkNotNull(indices, "indices shouldn't be null");

  return client.prepareSearch(indices).setQuery(builder).setSize(n)
    .execute().actionGet();
 }

 /**
  * Return documents from index 'from' to 'from+size'
  * 
  * @param client
  * @param from
  *            starting index
  * @param size
  *            number of documents to return
  * @param builder
  *            specifies query
  * @param indices
  *            indices to query
  * @return documents from index 'from' to 'from+size'
  */
 public static SearchResponse getDocuments(Client client, int from,
   int size, QueryBuilder builder, String... indices) {
  Preconditions.checkNotNull(client, "client shouldn't be null");
  Preconditions.checkState((from > 0), "from must greater than 0");
  Preconditions.checkState((size > 0), "size must greater than 0");
  Preconditions.checkNotNull(builder, "builder shouldn't be null");
  Preconditions.checkNotNull(indices, "indices shouldn't be null");

  return client.prepareSearch(indices).setQuery(builder).setSize(size)
    .setFrom(from).execute().actionGet();
 }

 /**
  * Return documents that matches to given query.
  * 
  * @param client
  * @param builder
  *            Specifies query
  * @param indices
  *            indices to query
  * @param types
  *            types to query
  * @return
  */
 public static SearchResponse getDocuments(Client client,
   QueryBuilder builder, String[] indices, String[] types) {
  Preconditions.checkNotNull(client, "client shouldn't be null");
  Preconditions.checkNotNull(builder, "builder shouldn't be null");
  Preconditions.checkNotNull(indices, "indices shouldn't be null");
  Preconditions.checkNotNull(types, "types shouldn't be null");

  return client.prepareSearch(indices).setTypes(types).setQuery(builder)
    .execute().actionGet();
 }

 /**
  * Return documents that matches to given query.
  * 
  * @param client
  * @param builder
  *            Specifies query
  * @param _index
  *            index to query
  * @param _type
  *            type to query
  * @return
  */
 public static SearchResponse getDocuments(Client client,
   QueryBuilder builder, String _index, String _type) {
  Preconditions.checkNotNull(client, "client shouldn't be null");
  Preconditions.checkNotNull(builder, "builder shouldn't be null");
  Preconditions.checkNotNull(_index, "index shouldn't be null");
  Preconditions.checkNotNull(_type, "type shouldn't be null");

  Preconditions.checkState((!StringUtils.isEmpty(_index)),
    "_index should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_type)),
    "_type should not be empty");

  return client.prepareSearch(_index).setTypes(_type).setQuery(builder)
    .execute().actionGet();
 }

 /**
  * Return first n documents that matches to given query.
  * 
  * @param client
  * @param n
  *            number of documents to return
  * @param builder
  *            specifies query
  * @param indices
  *            indices to query
  * @param types
  *            types to query
  * @return first n documents that matches to given query.
  */
 public static SearchResponse getFirstNDocuments(Client client, int n,
   QueryBuilder builder, String[] indices, String[] types) {
  Preconditions.checkNotNull(client, "client shouldn't be null");
  Preconditions.checkState((n > 0), "n must greater than 0");
  Preconditions.checkNotNull(builder, "builder shouldn't be null");
  Preconditions.checkNotNull(indices, "indices shouldn't be null");
  Preconditions.checkNotNull(types, "types shouldn't be null");

  return client.prepareSearch(indices).setTypes(types).setSize(n)
    .setQuery(builder).execute().actionGet();
 }

 /**
  * Return first n documents that matches to given query.
  * 
  * @param client
  * @param n
  *            number of documents to return
  * @param builder
  *            specifies query
  * @param index
  *            index to query
  * @param type
  *            type to query
  * @return first n documents that matches to given query.
  */
 public static SearchResponse getFirstNDocuments(Client client, int n,
   QueryBuilder builder, String index, String[] type) {
  Preconditions.checkNotNull(client, "client shouldn't be null");
  Preconditions.checkState((n > 0), "n must greater than 0");
  Preconditions.checkNotNull(builder, "builder shouldn't be null");
  Preconditions.checkNotNull(index, "index shouldn't be null");
  Preconditions.checkNotNull(type, "types shouldn't be null");

  return client.prepareSearch(index).setTypes(type).setSize(n)
    .setQuery(builder).execute().actionGet();
 }

 /**
  * Return documents from index 'from' to 'from+size'
  * 
  * @param client
  * @param from
  *            starting index of the document
  * @param size
  *            Number of documents to return
  * @param builder
  *            specifies the query
  * @param indices
  *            Indices to query
  * @param types
  *            types to query
  * @return
  */
 public static SearchResponse getDocuments(Client client, int from,
   int size, QueryBuilder builder, String[] indices, String[] types) {
  Preconditions.checkNotNull(client, "client shouldn't be null");
  Preconditions.checkState((from > 0), "from must greater than 0");
  Preconditions.checkState((size > 0), "size must greater than 0");
  Preconditions.checkNotNull(builder, "builder shouldn't be null");
  Preconditions.checkNotNull(indices, "indices shouldn't be null");
  Preconditions.checkNotNull(types, "types shouldn't be null");

  return client.prepareSearch(indices).setTypes(types).setSize(size)
    .setFrom(from).setQuery(builder).execute().actionGet();
 }

 /**
  * Return documents from index 'from' to 'from+size'
  * 
  * @param client
  * @param from
  *            starting index of the document
  * @param size
  *            Number of documents to return
  * @param builder
  *            specifies the query
  * @param _index
  *            index to query
  * @param _type
  *            type to query
  * @return
  */
 public static SearchResponse getDocuments(Client client, int from,
   int size, QueryBuilder builder, String _index, String _type) {
  Preconditions.checkNotNull(client, "client shouldn't be null");
  Preconditions.checkState((from > 0), "from must greater than 0");
  Preconditions.checkState((size > 0), "size must greater than 0");
  Preconditions.checkNotNull(builder, "builder shouldn't be null");
  Preconditions.checkNotNull(_index, "index shouldn't be null");
  Preconditions.checkNotNull(_type, "types shouldn't be null");
  Preconditions.checkState((!StringUtils.isEmpty(_index)),
    "index should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_type)),
    "type should not be empty");

  return client.prepareSearch(_index).setTypes(_type).setSize(size)
    .setFrom(from).setQuery(builder).execute().actionGet();
 }

}

package com.self_learn.util;

import static com.self_learn.util.IPUtil.isValidHosts;
import static com.self_learn.util.IPUtil.isValidPorts;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;

import com.google.common.base.Preconditions;

public class TransportClientUtil {
 private static Map<Map<String, Integer>, Client> localMap = new HashMap<>();

 /**
  * Take machine name and port addresses as map and return transport client.
  * Key is host name, value is port number
  * 
  * @throws UnknownHostException
  */
 public static Client getTransportClient(String clusterName,
   Map<String, Integer> map) throws UnknownHostException {

  Preconditions.checkNotNull(clusterName,
    "clusterName shouldn't be empty");
  Preconditions.checkNotNull(map, "Map shouldn't be empty");

  if (localMap.containsKey(map))
   return localMap.get(map);

  Preconditions.checkState(isValidHostPorts(map),
    "Map contains invalid host (or) port");

  Settings settings = ImmutableSettings.settingsBuilder()
    .put("cluster.name", clusterName)
    .put("client.transport.sniff", true).build();

  TransportClient client = new TransportClient(settings);

  InetSocketTransportAddress addresses[] = getInetSocketTransportAddresses(map);
  client.addTransportAddresses(addresses);
  localMap.put(map, client);
  return client;
 }

 /**
  * @param map
  * @return true, if all the entries in map are valid host, ports. Else
  *         false.
  */
 private static boolean isValidHostPorts(Map<String, Integer> map) {
  Set<String> hostNames = map.keySet();
  Set<Integer> ports = new HashSet<>(map.values());

  if (!isValidHosts(hostNames.toArray(new String[hostNames.size()])))
   return false;

  if (!isValidPorts(ports.toArray(new Integer[ports.size()])))
   return false;

  return true;
 }

 private static InetSocketTransportAddress[] getInetSocketTransportAddresses(
   Map<String, Integer> map) throws UnknownHostException {
  InetSocketTransportAddress addresses[] = new InetSocketTransportAddress[map
    .size()];
  int count = 0;

  Set<String> keys = map.keySet();
  for (String key : keys) {
   InetAddress addr = InetAddress.getByName(key);
   InetSocketTransportAddress address = new InetSocketTransportAddress(
     addr, map.get(key));
   addresses[count] = address;
  }

  return addresses;
 }

 /**
  * Get transport client for localhost.
  * 
  * @param clusterName
  * @param port
  * @return
  * @throws UnknownHostException
  */
 public static Client getLocalTransportClient(String clusterName, int port)
   throws UnknownHostException {
  Settings settings = ImmutableSettings.settingsBuilder()
    .put("cluster.name", clusterName)
    .put("client.transport.sniff", true).build();

  TransportClient client = new TransportClient(settings);
  InetAddress addr = InetAddress.getByName("127.0.0.1");
  InetSocketTransportAddress address = new InetSocketTransportAddress(
    addr, port);

  client.addTransportAddress(address);
  return client;
 }

}


package com.self_learn.util;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.lang3.StringUtils;

import com.google.common.base.Preconditions;

/**
 * Provides various utility methods to update a document.
 * 
 * @author harikrishna_gurram
 */
public class UpdateUtil {

 /**
  * Updates document with given fields (Fields are given as map)
  * 
  * @param client
  * @param _index
  * @param _type
  * @param _id
  * @param data
  * @return {@link UpdateResponse}
  * @throws IOException
  * @throws InterruptedException
  * @throws ExecutionException
  */
 public static UpdateResponse updateDocument(Client client, String _index,
   String _type, String _id, Map<String, Object> data)
   throws IOException, InterruptedException, ExecutionException {
  Preconditions.checkNotNull(data, "data should not be null");
  return updateDocument(client, _index, _type, _id,
    JSONUtil.getJson(data));
 }

 public static UpdateResponse updateDocument(Client client, String _index,
   String _type, String _id, String data) throws InterruptedException,
   ExecutionException {
  Preconditions.checkNotNull(client, "client should not be null");
  Preconditions.checkNotNull(_index, "_index should not be null");
  Preconditions.checkNotNull(_type, "_type should not be null");
  Preconditions.checkNotNull(_id, "_id should not be null");
  Preconditions.checkNotNull(data, "data should not be null");

  Preconditions.checkState((!StringUtils.isEmpty(_index)),
    "_index should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_type)),
    "_type should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_id)),
    "_id should not be empty");

  UpdateRequest updateRequest = new UpdateRequest(_index, _type, _id);
  updateRequest.doc(data);

  UpdateResponse resposne = client.update(updateRequest).get();
  return resposne;
 }

 /**
  * If the document does not already exist, the contents of the upsertMap
  * will be inserted as a new document.
  * 
  * @param client
  * @param _index
  * @param _type
  * @param _id
  * @param _source
  * @param _upsertSource
  * @return
  * @throws IOException
  * @throws ExecutionException
  * @throws InterruptedException
  */
 public static UpdateResponse upsertUpdate(Client client, String _index,
   String _type, String _id, Map<String, Object> _source,
   Map<String, Object> _upsertSource) throws IOException,
   InterruptedException, ExecutionException {
  Preconditions.checkNotNull(_source, "data should not be null");
  Preconditions.checkNotNull(_upsertSource,
    "upsertData should not be null");

  return upsertUpdate(client, _index, _type, _id,
    JSONUtil.getJson(_source), JSONUtil.getJson(_upsertSource));
 }

 public static UpdateResponse upsertUpdate(Client client, String _index,
   String _type, String _id, String _source, String _upsertSource)
   throws IOException, InterruptedException, ExecutionException {
  Preconditions.checkNotNull(client, "client should not be null");
  Preconditions.checkNotNull(_index, "_index should not be null");
  Preconditions.checkNotNull(_type, "_type should not be null");
  Preconditions.checkNotNull(_id, "_id should not be null");
  Preconditions.checkNotNull(_source, "data should not be null");
  Preconditions.checkNotNull(_upsertSource,
    "upsertData should not be null");

  Preconditions.checkState((!StringUtils.isEmpty(_index)),
    "_index should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_type)),
    "_type should not be empty");
  Preconditions.checkState((!StringUtils.isEmpty(_id)),
    "_id should not be empty");

  UpdateRequest updateRequest = new UpdateRequest(_index, _type, _id);
  updateRequest.doc(_source);
  updateRequest.upsert(_upsertSource);

  UpdateResponse response = client.update(updateRequest).get();
  return response;
 }
}



Prevoius                                                 Next                                                 Home

No comments:

Post a Comment