Monday 23 November 2015

Elasticsearch: Java: Bulk API: To perform bulk operations


Bulk api allows us to perform multiple operations at a time. One way to create bulk request is using BulkRequestBuilder class.

String _index = "organization";
String _type = "employee";
String _id1 = 1;
String _id2 = 2;

BulkRequestBuilder bulkRequest = client.prepareBulk();

// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex(_index, _type, _id1)
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "Krishna")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        );

bulkRequest.add(client.prepareIndex(_index, _type, _id2)
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "PYR")
                        .field("postDate", new Date())
                        .field("message", "another post")
                    .endObject()
                  )
        );

BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
    // process failures by iterating through each bulk response item
}

Using BulkProcessor
The BulkProcessor class offers a simple interface to flush bulk operations automatically based on the number or size of requests, or after a given period.

import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;

String _index = "organization";
String _type = "employee";
String _id1 = 1;
String _id2 = 2;

BulkProcessor bulkProcessor = BulkProcessor.builder(
        client,  
        new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId,
                                   BulkRequest request) { ... } 

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  BulkResponse response) { ... } 

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  Throwable failure) { ... } 
        })
        .setBulkActions(10000) 
        .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) 
        .setFlushInterval(TimeValue.timeValueSeconds(5)) 
        .setConcurrentRequests(1) 
        .build();
        
bulkProcessor.add(new IndexRequest(_index, _type, _id1).source(/* your doc here */));
bulkProcessor.add(new DeleteRequest(_index, _type, _id2));


@Override
public void beforeBulk(long executionId,BulkRequest request) { ... }
This method is called just before bulk is executed.

@Override
public void afterBulk(long executionId, BulkRequest request,BulkResponse response) { ... }
This method is called after bulk execution. You can check for some failing requests with response.hasFailures().

@Override
public void afterBulk(long executionId,BulkRequest request,Throwable failure) { ... }
This method is called when the bulk failed and raised a Throwable

setBulkActions(10000)
We want to execute the bulk every 10000 requests

setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
We want to flush the bulk every 1gb

setFlushInterval(TimeValue.timeValueSeconds(5))
We want to flush the bulk every 5 seconds whatever the number of requests

.setConcurrentRequests(1)
Set the number of concurrent requests. A value of 0 means that only a single request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests. 

By default, BulkProcessor:
    sets bulkActions to 1000
    sets bulkSize to 5mb
    does not set flushInterval
    sets concurrentRequests to 1

Following application demonstrates simple Bulk utility.

Step 1:Define simple model class Employee.

package com.self_learn.model;

import java.util.ArrayList;
import java.util.List;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

@EqualsAndHashCode()
@ToString
public class Employee {
  @Getter @Setter private String age;
  @Getter @Setter private String firstName;
  @Getter @Setter private String lastName;
  @Getter @Setter private List<String> hobbies = new ArrayList<>();
}


Step 2: Define Utility class to get Client instance.

package com.self_learn.util;

import static com.self_learn.util.IPUtil.isValidHosts;
import static com.self_learn.util.IPUtil.isValidPorts;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;

import com.google.common.base.Preconditions;

public class TransportClientUtil {
  private static Map<Map<String, Integer>, Client> localMap = new HashMap<>();

  /**
   * Take machine name and port addresses as map and return transport client.
   * Key is host name, value is port number
   * 
   * @throws UnknownHostException
   */
  public static Client getTransportClient(String clusterName,
      Map<String, Integer> map) throws UnknownHostException {

    Preconditions.checkNotNull(clusterName,
        "clusterName shouldn't be empty");
    Preconditions.checkNotNull(map, "Map shouldn't be empty");

    if (localMap.containsKey(map))
      return localMap.get(map);

    Preconditions.checkState(isValidHostPorts(map),
        "Map contains invalid host (or) port");

    Settings settings = ImmutableSettings.settingsBuilder()
        .put("cluster.name", clusterName)
        .put("client.transport.sniff", true).build();

    TransportClient client = new TransportClient(settings);

    InetSocketTransportAddress addresses[] = getInetSocketTransportAddresses(map);
    client.addTransportAddresses(addresses);
    localMap.put(map, client);
    return client;
  }

  /**
   * @param map
   * @return true, if all the entries in map are valid host, ports. Else
   *         false.
   */
  private static boolean isValidHostPorts(Map<String, Integer> map) {
    Set<String> hostNames = map.keySet();
    Set<Integer> ports = new HashSet<>(map.values());

    if (!isValidHosts(hostNames.toArray(new String[hostNames.size()])))
      return false;

    if (!isValidPorts(ports.toArray(new Integer[ports.size()])))
      return false;

    return true;
  }

  private static InetSocketTransportAddress[] getInetSocketTransportAddresses(
      Map<String, Integer> map) throws UnknownHostException {
    InetSocketTransportAddress addresses[] = new InetSocketTransportAddress[map
        .size()];
    int count = 0;

    Set<String> keys = map.keySet();
    for (String key : keys) {
      InetAddress addr = InetAddress.getByName(key);
      InetSocketTransportAddress address = new InetSocketTransportAddress(
          addr, map.get(key));
      addresses[count] = address;
    }

    return addresses;
  }

  /**
   * Get transport client for localhost.
   * 
   * @param clusterName
   * @param port
   * @return
   * @throws UnknownHostException
   */
  public static Client getLocalTransportClient(String clusterName, int port)
      throws UnknownHostException {
    Settings settings = ImmutableSettings.settingsBuilder()
        .put("cluster.name", clusterName)
        .put("client.transport.sniff", true).build();

    TransportClient client = new TransportClient(settings);
    InetAddress addr = InetAddress.getByName("127.0.0.1");
    InetSocketTransportAddress address = new InetSocketTransportAddress(
        addr, port);

    client.addTransportAddress(address);
    return client;
  }

}


Following is the IPUtil class to validate hostnames, ports.

package com.self_learn.util;

import org.apache.commons.validator.routines.InetAddressValidator;

import com.google.common.base.Preconditions;

/**
 * Validate IPaddresses, ports
 * 
 * @author harikrishna_gurram
 */
public class IPUtil {
  private static InetAddressValidator inetAddressValidator = InetAddressValidator
      .getInstance();

  /**
   * @param ipAddress
   * @return true if ip address is valid, else false
   */
  public static boolean isValidIPAddress(String ipAddress) {
    Preconditions.checkNotNull(ipAddress, "IP address should not be null");
    return inetAddressValidator.isValid(ipAddress);
  }

  /**
   * @param port
   *            : Port number
   * @return true if port number is valid, else false
   */
  public static boolean isValidPort(int port) {
    if (port > 0 && port < 65536)
      return true;
    return false;
  }

  /**
   * @param hostNames
   * @return true if all the elements of array represents valid hosnames, else
   *         false.
   */
  public static boolean isValidHosts(String[] hostNames) {
    Preconditions.checkNotNull(hostNames, "Host names shouldn't be empty");
    for (String hostName : hostNames) {
      if (!isValidIPAddress(hostName)) {
        return false;
      }
    }
    return true;
  }

  /**
   * 
   * @param ports
   * @return true if all the elements of array represents valid ports, else
   *         false.
   */
  public static boolean isValidPorts(Integer[] ports) {
    Preconditions.checkNotNull(ports, "ports shouldn't be empty");
    for (int port : ports) {
      if (!isValidPort(port)) {
        return false;
      }
    }
    return true;
  }

}


Step 3: BulkRequestType class defines constants to identify type of the request like Index, update, upsert update, and delete.

package com.self_learn.constants;

/**
 * Specifies the type of BulkRequest
 * 
 * @author harikrishna_gurram
 */
public enum BulkRequestType {
  INDEX, UPDATE, UPSERT_UPDATE, DELETE
}


Step 4: Define BulkModelObject. Each BulkObject instance represents a request like INDEX, UPDATE, UPSERT_UPDATE,DELETE.

package com.self_learn.model;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import com.self_learn.constants.BulkRequestType;

@EqualsAndHashCode()
@ToString
public class BulkObject {

  @Getter
  @Setter
  private String _index;
  @Getter
  @Setter
  private String _type;
  @Getter
  @Setter
  private String _id;
  @Getter
  @Setter
  private String _source;
  @Getter
  @Setter
  private String _upsertSource;
  @Getter
  @Setter
  private BulkRequestType type;

  public BulkObject(String _index, String _type, String _id,
      BulkRequestType type) {
    this._index = _index;
    this._type = _type;
    this._id = _id;
    this.type = type;
  }

  public BulkObject(String _index, String _type, String _id,
      BulkRequestType type, String _source) {
    this(_index, _type, _id, type);
    this._source = _source;
  }

  public BulkObject(String _index, String _type, String _id,
      BulkRequestType type, String _source, String _upsertSource) {
    this(_index, _type, _id, type, _source);
    this._upsertSource = _upsertSource;
  }
}


Step 5: Define BulkUtil class, which takes set of BulkObjects and process them.

package com.self_learn.util;

import java.util.Set;

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.base.Preconditions;

import com.self_learn.constants.BulkRequestType;
import com.self_learn.model.BulkObject;

/**
 * Utility class to execute bulk requests.
 * 
 * @author harikrishna_gurram
 */
public class BulkUtil {

  private static BulkObject getBulkObject(String _index, String _type,
      String _id, BulkRequestType type) {
    return new BulkObject(_index, _type, _id, type);
  }

  private static BulkObject getBulkObject(String _index, String _type,
      String _id, BulkRequestType type, String _source) {
    return new BulkObject(_index, _type, _id, type, _source);
  }

  private static BulkObject getBulkObject(String _index, String _type,
      String _id, BulkRequestType type, String _source,
      String _upsertSource) {
    return new BulkObject(_index, _type, _id, type, _source, _upsertSource);
  }

  public static BulkObject getIndexObject(String _index, String _type,
      String _id, String _source) {
    return getBulkObject(_index, _type, _id, BulkRequestType.INDEX, _source);
  }

  public static BulkObject getDeleteObject(String _index, String _type,
      String _id) {
    return getBulkObject(_index, _type, _id, BulkRequestType.DELETE);
  }

  public static BulkObject getUpdateObject(String _index, String _type,
      String _id, String src) {
    return getBulkObject(_index, _type, _id, BulkRequestType.UPDATE, src);
  }

  public static BulkObject getUpsertUpdateObject(String _index, String _type,
      String _id, String _source, String _upsertSource) {
    return getBulkObject(_index, _type, _id, BulkRequestType.UPSERT_UPDATE,
        _source, _upsertSource);
  }

  public static BulkObject getIndexObject(String _index, String _type,
      String _id, Object _source) {
    return getIndexObject(_index, _type, _id, JSONUtil.getJson(_source));
  }

  public static BulkObject getUpdateObject(String _index, String _type,
      String _id, Object _source) {
    return getUpdateObject(_index, _type, _id, JSONUtil.getJson(_source));
  }

  public static BulkObject getUpsertUpdateObject(String _index, String _type,
      String _id, Object _source, Object _upsertSource) {
    return getUpsertUpdateObject(_index, _type, _id,
        JSONUtil.getJson(_source), JSONUtil.getJson(_upsertSource));
  }

  private static boolean isNull(Object obj) {
    return (obj == null);
  }

  private static boolean isNotNull(Object obj) {
    return !isNull(obj);
  }

  /**
   * Validate BulkObject based on Request type.
   * 
   * If Request type
   * 
   * a. is @{link BulkRequestType.INDEX} then BulkObject must has _index,
   * _type, _id, _source.
   * 
   * b. is @{link BulkRequestType.UPDATE} then BulkObject must has _index,
   * _type, _id, _source.
   * 
   * c. is @{link BulkRequestType.UPSERT_UPDATE} then BulkObject must has
   * _index, _type, _id, _source, _upsertSource.
   * 
   * d. is @{link BulkRequestType.DELETE} then BulkObject must has _index,
   * _type, _id.
   * 
   * @param type
   *            Represents Request type, it can be INDEX, UPDATE,
   *            UPSERT_UPDATE, DELETE.
   * 
   * @param obj
   * @return
   */
  private static boolean isValidBulkObject(BulkObject obj) {
    if (obj == null)
      return false;

    String _index = obj.get_index();
    String _type = obj.get_type();
    String _id = obj.get_id();
    String _source = obj.get_source();
    String _upsertSource = obj.get_upsertSource();
    BulkRequestType type = obj.getType();

    switch (type) {
    case INDEX:
      if (isNotNull(_index) && isNotNull(_type) && isNotNull(_id)
          && isNotNull(_source))
        return true;
      break;
    case UPDATE:
      if (isNotNull(_index) && isNotNull(_type) && isNotNull(_id)
          && isNotNull(_source))
        return true;
      break;
    case UPSERT_UPDATE:
      if (isNotNull(_index) && isNotNull(_type) && isNotNull(_id)
          && isNotNull(_source) && isNotNull(_upsertSource))
        return true;
      break;
    case DELETE:
      if (isNotNull(_index) && isNotNull(_type) && isNotNull(_id))
        return true;
      break;
    }
    return false;
  }

  /**
   * method takes Map of requests and execute bulk requests. Key of map
   * represents type of request.
   * 
   * @{link BulkRequestType.INDEX} represents Index request
   * @{link BulkRequestType.UPDATE} represents update request
   * @{link BulkRequestType.UPSERT_UPDATE} represents UPSERT_UPDATE request
   * @{link BulkRequestType.DELETE} represents delete request
   * 
   *        If BulkObject is invalid, then the request is simply ignored.
   * 
   * @param bulkRequest
   * @return
   */
  public static BulkResponse execute(Client client, Set<BulkObject> bulkReq) {
    Preconditions.checkNotNull(client, "client shouldn't be null");
    Preconditions.checkNotNull(bulkReq, "bulkReq shouldn't be null");

    BulkRequestBuilder bulkRequest = client.prepareBulk();

    for (BulkObject obj : bulkReq) {
      if (!isValidBulkObject(obj))
        continue;

      String _index = obj.get_index();
      String _type = obj.get_type();
      String _id = obj.get_id();
      String _source = obj.get_source();
      String _upsertSource = obj.get_upsertSource();

      BulkRequestType type = obj.getType();

      switch (type) {
      case INDEX:
        bulkRequest.add(new IndexRequest(_index, _type, _id)
            .source(_source));
        break;
      case UPDATE:
        bulkRequest.add(new UpdateRequest(_index, _type, _id)
            .doc(_source));
        break;
      case UPSERT_UPDATE:
        bulkRequest.add(new UpdateRequest(_index, _type, _id).doc(
            _source).upsert(_upsertSource));
        break;
      case DELETE:
        bulkRequest.add(new DeleteRequest(_index, _type, _id));
        break;

      default:
        System.out.println("Invalid Request");
      }
    }

    return bulkRequest.execute().actionGet();
  }
}


Step 6: Define ResponseUtil class, used to return the response of query in json format.

package com.self_learn.util;

import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;

import com.google.common.base.Preconditions;

/**
 * Utility class to return response in string format.
 * 
 * @author harikrishna_gurram
 */
public class ResponseUtil {

  /**
   * Returns source of the result as string.
   * 
   * @param response
   * @return
   */
  public static String getSource(GetResponse response, boolean pretty) {
    Preconditions.checkNotNull(response, "response shouldn't be null");
    return response.getSourceAsString();
  }

  /**
   * @param response
   * @return string representation of {@link BulkResponse}
   */
  public static String getResponseInfo(Object response, boolean pretty) {
    if (pretty)
      return JSONUtil.getPrettyJson(response);

    return JSONUtil.getJson(response);
  }
}


Step 8: Main.java demonstrate how to use BulkUtil class.

package com.self_learn.test;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;

import com.self_learn.model.BulkObject;
import com.self_learn.model.Employee;
import com.self_learn.util.BulkUtil;
import com.self_learn.util.ResponseUtil;
import com.self_learn.util.TransportClientUtil;

public class Main {
  private static String clusterName = "my_cluster_1";
  private static String _index = "organization";
  private static String _type = "employee";

  public static void main(String args[]) throws IOException,
      InterruptedException, ExecutionException {
    /* Get client instance for cluster */
    Client client = TransportClientUtil.getLocalTransportClient(
        clusterName, 9300);

    Set<BulkObject> objects = new HashSet<>();

    /* First 5 index requests */
    for (int i = 1; i <= 5; i++) {
      Employee emp = new Employee();
      emp.setAge("" + i);
      emp.setFirstName("firstName " + i);
      emp.setLastName("lastName " + i);
      emp.setHobbies(Arrays.asList("hobbies " + i));
      BulkObject obj = BulkUtil
          .getIndexObject(_index, _type, "" + i, emp);
      objects.add(obj);
    }

    BulkResponse response = BulkUtil.execute(client, objects);
    System.out.println(ResponseUtil.getResponseInfo(response, true));

    /* Delete documents 1 and 4 */
    objects = new HashSet<>();
    objects.add(BulkUtil.getDeleteObject(_index, _type, "1"));
    objects.add(BulkUtil.getDeleteObject(_index, _type, "4"));
    response = BulkUtil.execute(client, objects);
    System.out.println(ResponseUtil.getResponseInfo(response, true));

    /* Update document2 and index document 6 */
    objects = new HashSet<>();
    Map<String, Object> doc = new HashMap<>();
    doc.put("firstName", "Krishna");
    doc.put("age", "26");
    objects.add(BulkUtil.getUpdateObject(_index, _type, "2", doc));

    Employee emp = new Employee();
    emp.setAge("" + 6);
    emp.setFirstName("firstName " + 6);
    emp.setLastName("lastName " + 6);
    emp.setHobbies(Arrays.asList("hobbies " + 6));
    objects.add(BulkUtil.getIndexObject(_index, _type, "6", emp));
    response = BulkUtil.execute(client, objects);
    System.out.println(ResponseUtil.getResponseInfo(response, true));

    client.close();
  }
}

Once you ran, Main.java, you will get following output.    
Sep 10, 2015 2:02:14 PM org.elasticsearch.plugins.PluginsService <init>
INFO: [Turner Century] loaded [], sites []
{
  "responses": [
    {
      "id": 0,
      "opType": "index",
      "response": {
        "index": "organization",
        "id": "2",
        "type": "employee",
        "version": 1,
        "created": true
      }
    },
    {
      "id": 1,
      "opType": "index",
      "response": {
        "index": "organization",
        "id": "4",
        "type": "employee",
        "version": 1,
        "created": true
      }
    },
    {
      "id": 2,
      "opType": "index",
      "response": {
        "index": "organization",
        "id": "1",
        "type": "employee",
        "version": 1,
        "created": true
      }
    },
    {
      "id": 3,
      "opType": "index",
      "response": {
        "index": "organization",
        "id": "3",
        "type": "employee",
        "version": 1,
        "created": true
      }
    },
    {
      "id": 4,
      "opType": "index",
      "response": {
        "index": "organization",
        "id": "5",
        "type": "employee",
        "version": 1,
        "created": true
      }
    }
  ],
  "tookInMillis": 2,
  "remoteAddress": {
    "address": {}
  }
}
{
  "responses": [
    {
      "id": 0,
      "opType": "delete",
      "response": {
        "index": "organization",
        "id": "4",
        "type": "employee",
        "version": 2,
        "found": true
      }
    },
    {
      "id": 1,
      "opType": "delete",
      "response": {
        "index": "organization",
        "id": "1",
        "type": "employee",
        "version": 2,
        "found": true
      }
    }
  ],
  "tookInMillis": 1,
  "remoteAddress": {
    "address": {}
  }
}
{
  "responses": [
    {
      "id": 0,
      "opType": "update",
      "response": {
        "index": "organization",
        "id": "2",
        "type": "employee",
        "version": 2,
        "created": false
      }
    },
    {
      "id": 1,
      "opType": "index",
      "response": {
        "index": "organization",
        "id": "6",
        "type": "employee",
        "version": 1,
        "created": true
      }
    }
  ],
  "tookInMillis": 1,
  "remoteAddress": {
    "address": {}
  }
}





Prevoius                                                 Next                                                 Home

No comments:

Post a Comment