Bulk api
allows us to perform multiple operations at a time. One way to create bulk
request is using BulkRequestBuilder class.
String _index = "organization"; String _type = "employee"; String _id1 = 1; String _id2 = 2; BulkRequestBuilder bulkRequest = client.prepareBulk(); // either use client#prepare, or use Requests# to directly build index/delete requests bulkRequest.add(client.prepareIndex(_index, _type, _id1) .setSource(jsonBuilder() .startObject() .field("user", "Krishna") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() ) ); bulkRequest.add(client.prepareIndex(_index, _type, _id2) .setSource(jsonBuilder() .startObject() .field("user", "PYR") .field("postDate", new Date()) .field("message", "another post") .endObject() ) ); BulkResponse bulkResponse = bulkRequest.execute().actionGet(); if (bulkResponse.hasFailures()) { // process failures by iterating through each bulk response item }
Using BulkProcessor
The BulkProcessor class offers a simple interface to flush bulk operations automatically based on the number or size of requests, or after a given period.
The BulkProcessor class offers a simple interface to flush bulk operations automatically based on the number or size of requests, or after a given period.
import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; String _index = "organization"; String _type = "employee"; String _id1 = 1; String _id2 = 2; BulkProcessor bulkProcessor = BulkProcessor.builder( client, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { ... } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { ... } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { ... } }) .setBulkActions(10000) .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) .setFlushInterval(TimeValue.timeValueSeconds(5)) .setConcurrentRequests(1) .build(); bulkProcessor.add(new IndexRequest(_index, _type, _id1).source(/* your doc here */)); bulkProcessor.add(new DeleteRequest(_index, _type, _id2));
@Override
public void beforeBulk(long executionId,BulkRequest
request) { ... }
This method
is called just before bulk is executed.
@Override
public void afterBulk(long executionId, BulkRequest
request,BulkResponse response) { ... }
This method
is called after bulk execution. You can check for some failing requests with
response.hasFailures().
@Override
public void afterBulk(long executionId,BulkRequest
request,Throwable failure) { ... }
This method
is called when the bulk failed and raised a Throwable
setBulkActions(10000)
We want to
execute the bulk every 10000 requests
setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
We want to
flush the bulk every 1gb
setFlushInterval(TimeValue.timeValueSeconds(5))
We want to
flush the bulk every 5 seconds whatever the number of requests
.setConcurrentRequests(1)
Set the
number of concurrent requests. A value of 0 means that only a single request
will be allowed to be executed. A value of 1 means 1 concurrent request is
allowed to be executed while accumulating new bulk requests.
By default,
BulkProcessor:
sets bulkActions to 1000
sets bulkSize to 5mb
does not set flushInterval
sets concurrentRequests to 1
Following
application demonstrates simple Bulk utility.
Step 1:Define simple model class Employee.
package com.self_learn.model; import java.util.ArrayList; import java.util.List; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; import lombok.ToString; @EqualsAndHashCode() @ToString public class Employee { @Getter @Setter private String age; @Getter @Setter private String firstName; @Getter @Setter private String lastName; @Getter @Setter private List<String> hobbies = new ArrayList<>(); }
Step 2: Define Utility class to get Client instance.
package com.self_learn.util; import static com.self_learn.util.IPUtil.isValidHosts; import static com.self_learn.util.IPUtil.isValidPorts; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import com.google.common.base.Preconditions; public class TransportClientUtil { private static Map<Map<String, Integer>, Client> localMap = new HashMap<>(); /** * Take machine name and port addresses as map and return transport client. * Key is host name, value is port number * * @throws UnknownHostException */ public static Client getTransportClient(String clusterName, Map<String, Integer> map) throws UnknownHostException { Preconditions.checkNotNull(clusterName, "clusterName shouldn't be empty"); Preconditions.checkNotNull(map, "Map shouldn't be empty"); if (localMap.containsKey(map)) return localMap.get(map); Preconditions.checkState(isValidHostPorts(map), "Map contains invalid host (or) port"); Settings settings = ImmutableSettings.settingsBuilder() .put("cluster.name", clusterName) .put("client.transport.sniff", true).build(); TransportClient client = new TransportClient(settings); InetSocketTransportAddress addresses[] = getInetSocketTransportAddresses(map); client.addTransportAddresses(addresses); localMap.put(map, client); return client; } /** * @param map * @return true, if all the entries in map are valid host, ports. Else * false. */ private static boolean isValidHostPorts(Map<String, Integer> map) { Set<String> hostNames = map.keySet(); Set<Integer> ports = new HashSet<>(map.values()); if (!isValidHosts(hostNames.toArray(new String[hostNames.size()]))) return false; if (!isValidPorts(ports.toArray(new Integer[ports.size()]))) return false; return true; } private static InetSocketTransportAddress[] getInetSocketTransportAddresses( Map<String, Integer> map) throws UnknownHostException { InetSocketTransportAddress addresses[] = new InetSocketTransportAddress[map .size()]; int count = 0; Set<String> keys = map.keySet(); for (String key : keys) { InetAddress addr = InetAddress.getByName(key); InetSocketTransportAddress address = new InetSocketTransportAddress( addr, map.get(key)); addresses[count] = address; } return addresses; } /** * Get transport client for localhost. * * @param clusterName * @param port * @return * @throws UnknownHostException */ public static Client getLocalTransportClient(String clusterName, int port) throws UnknownHostException { Settings settings = ImmutableSettings.settingsBuilder() .put("cluster.name", clusterName) .put("client.transport.sniff", true).build(); TransportClient client = new TransportClient(settings); InetAddress addr = InetAddress.getByName("127.0.0.1"); InetSocketTransportAddress address = new InetSocketTransportAddress( addr, port); client.addTransportAddress(address); return client; } }
Following is
the IPUtil class to validate hostnames, ports.
package com.self_learn.util; import org.apache.commons.validator.routines.InetAddressValidator; import com.google.common.base.Preconditions; /** * Validate IPaddresses, ports * * @author harikrishna_gurram */ public class IPUtil { private static InetAddressValidator inetAddressValidator = InetAddressValidator .getInstance(); /** * @param ipAddress * @return true if ip address is valid, else false */ public static boolean isValidIPAddress(String ipAddress) { Preconditions.checkNotNull(ipAddress, "IP address should not be null"); return inetAddressValidator.isValid(ipAddress); } /** * @param port * : Port number * @return true if port number is valid, else false */ public static boolean isValidPort(int port) { if (port > 0 && port < 65536) return true; return false; } /** * @param hostNames * @return true if all the elements of array represents valid hosnames, else * false. */ public static boolean isValidHosts(String[] hostNames) { Preconditions.checkNotNull(hostNames, "Host names shouldn't be empty"); for (String hostName : hostNames) { if (!isValidIPAddress(hostName)) { return false; } } return true; } /** * * @param ports * @return true if all the elements of array represents valid ports, else * false. */ public static boolean isValidPorts(Integer[] ports) { Preconditions.checkNotNull(ports, "ports shouldn't be empty"); for (int port : ports) { if (!isValidPort(port)) { return false; } } return true; } }
Step 3: BulkRequestType class defines constants to identify
type of the request like Index, update, upsert update, and delete.
package com.self_learn.constants; /** * Specifies the type of BulkRequest * * @author harikrishna_gurram */ public enum BulkRequestType { INDEX, UPDATE, UPSERT_UPDATE, DELETE }
Step 4: Define BulkModelObject. Each BulkObject instance
represents a request like INDEX, UPDATE, UPSERT_UPDATE,DELETE.
package com.self_learn.model; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; import lombok.ToString; import com.self_learn.constants.BulkRequestType; @EqualsAndHashCode() @ToString public class BulkObject { @Getter @Setter private String _index; @Getter @Setter private String _type; @Getter @Setter private String _id; @Getter @Setter private String _source; @Getter @Setter private String _upsertSource; @Getter @Setter private BulkRequestType type; public BulkObject(String _index, String _type, String _id, BulkRequestType type) { this._index = _index; this._type = _type; this._id = _id; this.type = type; } public BulkObject(String _index, String _type, String _id, BulkRequestType type, String _source) { this(_index, _type, _id, type); this._source = _source; } public BulkObject(String _index, String _type, String _id, BulkRequestType type, String _source, String _upsertSource) { this(_index, _type, _id, type, _source); this._upsertSource = _upsertSource; } }
Step 5: Define BulkUtil class, which takes set of BulkObjects
and process them.
package com.self_learn.util; import java.util.Set; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.base.Preconditions; import com.self_learn.constants.BulkRequestType; import com.self_learn.model.BulkObject; /** * Utility class to execute bulk requests. * * @author harikrishna_gurram */ public class BulkUtil { private static BulkObject getBulkObject(String _index, String _type, String _id, BulkRequestType type) { return new BulkObject(_index, _type, _id, type); } private static BulkObject getBulkObject(String _index, String _type, String _id, BulkRequestType type, String _source) { return new BulkObject(_index, _type, _id, type, _source); } private static BulkObject getBulkObject(String _index, String _type, String _id, BulkRequestType type, String _source, String _upsertSource) { return new BulkObject(_index, _type, _id, type, _source, _upsertSource); } public static BulkObject getIndexObject(String _index, String _type, String _id, String _source) { return getBulkObject(_index, _type, _id, BulkRequestType.INDEX, _source); } public static BulkObject getDeleteObject(String _index, String _type, String _id) { return getBulkObject(_index, _type, _id, BulkRequestType.DELETE); } public static BulkObject getUpdateObject(String _index, String _type, String _id, String src) { return getBulkObject(_index, _type, _id, BulkRequestType.UPDATE, src); } public static BulkObject getUpsertUpdateObject(String _index, String _type, String _id, String _source, String _upsertSource) { return getBulkObject(_index, _type, _id, BulkRequestType.UPSERT_UPDATE, _source, _upsertSource); } public static BulkObject getIndexObject(String _index, String _type, String _id, Object _source) { return getIndexObject(_index, _type, _id, JSONUtil.getJson(_source)); } public static BulkObject getUpdateObject(String _index, String _type, String _id, Object _source) { return getUpdateObject(_index, _type, _id, JSONUtil.getJson(_source)); } public static BulkObject getUpsertUpdateObject(String _index, String _type, String _id, Object _source, Object _upsertSource) { return getUpsertUpdateObject(_index, _type, _id, JSONUtil.getJson(_source), JSONUtil.getJson(_upsertSource)); } private static boolean isNull(Object obj) { return (obj == null); } private static boolean isNotNull(Object obj) { return !isNull(obj); } /** * Validate BulkObject based on Request type. * * If Request type * * a. is @{link BulkRequestType.INDEX} then BulkObject must has _index, * _type, _id, _source. * * b. is @{link BulkRequestType.UPDATE} then BulkObject must has _index, * _type, _id, _source. * * c. is @{link BulkRequestType.UPSERT_UPDATE} then BulkObject must has * _index, _type, _id, _source, _upsertSource. * * d. is @{link BulkRequestType.DELETE} then BulkObject must has _index, * _type, _id. * * @param type * Represents Request type, it can be INDEX, UPDATE, * UPSERT_UPDATE, DELETE. * * @param obj * @return */ private static boolean isValidBulkObject(BulkObject obj) { if (obj == null) return false; String _index = obj.get_index(); String _type = obj.get_type(); String _id = obj.get_id(); String _source = obj.get_source(); String _upsertSource = obj.get_upsertSource(); BulkRequestType type = obj.getType(); switch (type) { case INDEX: if (isNotNull(_index) && isNotNull(_type) && isNotNull(_id) && isNotNull(_source)) return true; break; case UPDATE: if (isNotNull(_index) && isNotNull(_type) && isNotNull(_id) && isNotNull(_source)) return true; break; case UPSERT_UPDATE: if (isNotNull(_index) && isNotNull(_type) && isNotNull(_id) && isNotNull(_source) && isNotNull(_upsertSource)) return true; break; case DELETE: if (isNotNull(_index) && isNotNull(_type) && isNotNull(_id)) return true; break; } return false; } /** * method takes Map of requests and execute bulk requests. Key of map * represents type of request. * * @{link BulkRequestType.INDEX} represents Index request * @{link BulkRequestType.UPDATE} represents update request * @{link BulkRequestType.UPSERT_UPDATE} represents UPSERT_UPDATE request * @{link BulkRequestType.DELETE} represents delete request * * If BulkObject is invalid, then the request is simply ignored. * * @param bulkRequest * @return */ public static BulkResponse execute(Client client, Set<BulkObject> bulkReq) { Preconditions.checkNotNull(client, "client shouldn't be null"); Preconditions.checkNotNull(bulkReq, "bulkReq shouldn't be null"); BulkRequestBuilder bulkRequest = client.prepareBulk(); for (BulkObject obj : bulkReq) { if (!isValidBulkObject(obj)) continue; String _index = obj.get_index(); String _type = obj.get_type(); String _id = obj.get_id(); String _source = obj.get_source(); String _upsertSource = obj.get_upsertSource(); BulkRequestType type = obj.getType(); switch (type) { case INDEX: bulkRequest.add(new IndexRequest(_index, _type, _id) .source(_source)); break; case UPDATE: bulkRequest.add(new UpdateRequest(_index, _type, _id) .doc(_source)); break; case UPSERT_UPDATE: bulkRequest.add(new UpdateRequest(_index, _type, _id).doc( _source).upsert(_upsertSource)); break; case DELETE: bulkRequest.add(new DeleteRequest(_index, _type, _id)); break; default: System.out.println("Invalid Request"); } } return bulkRequest.execute().actionGet(); } }
Step 6: Define ResponseUtil class, used to return the
response of query in json format.
package com.self_learn.util; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetResponse; import com.google.common.base.Preconditions; /** * Utility class to return response in string format. * * @author harikrishna_gurram */ public class ResponseUtil { /** * Returns source of the result as string. * * @param response * @return */ public static String getSource(GetResponse response, boolean pretty) { Preconditions.checkNotNull(response, "response shouldn't be null"); return response.getSourceAsString(); } /** * @param response * @return string representation of {@link BulkResponse} */ public static String getResponseInfo(Object response, boolean pretty) { if (pretty) return JSONUtil.getPrettyJson(response); return JSONUtil.getJson(response); } }
Step 8: Main.java demonstrate how to use BulkUtil class.
package com.self_learn.test; import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.client.Client; import com.self_learn.model.BulkObject; import com.self_learn.model.Employee; import com.self_learn.util.BulkUtil; import com.self_learn.util.ResponseUtil; import com.self_learn.util.TransportClientUtil; public class Main { private static String clusterName = "my_cluster_1"; private static String _index = "organization"; private static String _type = "employee"; public static void main(String args[]) throws IOException, InterruptedException, ExecutionException { /* Get client instance for cluster */ Client client = TransportClientUtil.getLocalTransportClient( clusterName, 9300); Set<BulkObject> objects = new HashSet<>(); /* First 5 index requests */ for (int i = 1; i <= 5; i++) { Employee emp = new Employee(); emp.setAge("" + i); emp.setFirstName("firstName " + i); emp.setLastName("lastName " + i); emp.setHobbies(Arrays.asList("hobbies " + i)); BulkObject obj = BulkUtil .getIndexObject(_index, _type, "" + i, emp); objects.add(obj); } BulkResponse response = BulkUtil.execute(client, objects); System.out.println(ResponseUtil.getResponseInfo(response, true)); /* Delete documents 1 and 4 */ objects = new HashSet<>(); objects.add(BulkUtil.getDeleteObject(_index, _type, "1")); objects.add(BulkUtil.getDeleteObject(_index, _type, "4")); response = BulkUtil.execute(client, objects); System.out.println(ResponseUtil.getResponseInfo(response, true)); /* Update document2 and index document 6 */ objects = new HashSet<>(); Map<String, Object> doc = new HashMap<>(); doc.put("firstName", "Krishna"); doc.put("age", "26"); objects.add(BulkUtil.getUpdateObject(_index, _type, "2", doc)); Employee emp = new Employee(); emp.setAge("" + 6); emp.setFirstName("firstName " + 6); emp.setLastName("lastName " + 6); emp.setHobbies(Arrays.asList("hobbies " + 6)); objects.add(BulkUtil.getIndexObject(_index, _type, "6", emp)); response = BulkUtil.execute(client, objects); System.out.println(ResponseUtil.getResponseInfo(response, true)); client.close(); } }
Once you ran, Main.java, you will get following output.
Sep 10, 2015 2:02:14 PM org.elasticsearch.plugins.PluginsService <init> INFO: [Turner Century] loaded [], sites [] { "responses": [ { "id": 0, "opType": "index", "response": { "index": "organization", "id": "2", "type": "employee", "version": 1, "created": true } }, { "id": 1, "opType": "index", "response": { "index": "organization", "id": "4", "type": "employee", "version": 1, "created": true } }, { "id": 2, "opType": "index", "response": { "index": "organization", "id": "1", "type": "employee", "version": 1, "created": true } }, { "id": 3, "opType": "index", "response": { "index": "organization", "id": "3", "type": "employee", "version": 1, "created": true } }, { "id": 4, "opType": "index", "response": { "index": "organization", "id": "5", "type": "employee", "version": 1, "created": true } } ], "tookInMillis": 2, "remoteAddress": { "address": {} } } { "responses": [ { "id": 0, "opType": "delete", "response": { "index": "organization", "id": "4", "type": "employee", "version": 2, "found": true } }, { "id": 1, "opType": "delete", "response": { "index": "organization", "id": "1", "type": "employee", "version": 2, "found": true } } ], "tookInMillis": 1, "remoteAddress": { "address": {} } } { "responses": [ { "id": 0, "opType": "update", "response": { "index": "organization", "id": "2", "type": "employee", "version": 2, "created": false } }, { "id": 1, "opType": "index", "response": { "index": "organization", "id": "6", "type": "employee", "version": 1, "created": true } } ], "tookInMillis": 1, "remoteAddress": { "address": {} } }
No comments:
Post a Comment