Saturday 28 December 2019

Spring boot WebFlux: Handler and routing functions Handler Functions


Incoming http requests are handled by handler functions. A handler function takes a ServerRequest and returns a Mono<ServerResponse>.

Example
public Mono<ServerResponse> getAllEmployees(ServerRequest serverRequest) {
 Flux<Employee> emps = employeeRepo.findAll();

 return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(emps, Employee.class);
}

Router functions
Router functions route the incoming htto request to handler functions.


Example
route(GET("/api/v2/employees").and(accept(MediaType.APPLICATION_JSON)), employeeHandler::getAllEmployees)

Handler functions to perform CRUD operations

Handler function to get all employees
public Mono<ServerResponse> getAllEmployees(ServerRequest serverRequest) {
 Flux<Employee> emps = employeeRepo.findAll();

 return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(emps, Employee.class);
}


Handler function to get specific employee
public Mono<ServerResponse> getEmployee(ServerRequest serverRequest) {
 String id = serverRequest.pathVariable("id");

 Mono<Employee> empMono = employeeRepo.findById(id);

 return empMono.flatMap(emp -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(fromObject(emp)))
   .switchIfEmpty(NOT_FOUND);

}


Handler function to create new employee
public Mono<ServerResponse> createEmployee(ServerRequest serverRequest) {
 Mono<Employee> empMono = serverRequest.bodyToMono(Employee.class);

 return empMono.flatMap(emp -> ServerResponse.status(HttpStatus.CREATED).contentType(MediaType.APPLICATION_JSON)
   .body(employeeRepo.save(emp), Employee.class));
}


Handler function to update an employee
public Mono<ServerResponse> updateEmployee(ServerRequest serverRequest) {
 String id = serverRequest.pathVariable("id");

 Mono<Employee> persistedEmpMono = employeeRepo.findById(id);

 Mono<Employee> payLoadMono = serverRequest.bodyToMono(Employee.class);

 return payLoadMono
   .zipWith(persistedEmpMono,
     (employee, persistedEmployee) -> new Employee(persistedEmployee.getId(),
       employee.getFirstName(), employee.getLastName()))
   .flatMap(employee -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
     .body(employeeRepo.save(employee), Employee.class))
   .switchIfEmpty(NOT_FOUND);

}


Handler function to delete an employee
public Mono<ServerResponse> deleteEmployee(ServerRequest serverRequest) {

 String id = serverRequest.pathVariable("id");

 Mono<Employee> persistedEmpMono = employeeRepo.findById(id);

 return persistedEmpMono.flatMap(employee -> ServerResponse.ok().build(employeeRepo.delete(employee)))
   .switchIfEmpty(NOT_FOUND);

}


Handler function to delete all the employees
public Mono<ServerResponse> deleteAllEmployees(ServerRequest serverRequest) {
 return ServerResponse.ok().build(employeeRepo.deleteAll());
}


Handler to receive push events from server
public Mono<ServerResponse> employeeEvents(ServerRequest serverRequest) {
 Flux<EmployeeEvent> empEvents = Flux.interval(Duration.ofSeconds(1))
   .map(val -> new EmployeeEvent(val, "Employee Event"));

 return ServerResponse.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(empEvents, EmployeeEvent.class);
}


Router function to route the incoming requests to specific handler functions
@Bean
RouterFunction<ServerResponse> routes(EmployeeHandler employeeHandler) {
 return route(GET("/api/v2/employees").and(accept(MediaType.APPLICATION_JSON)), employeeHandler::getAllEmployees)
   .andRoute(POST("/api/v2/employees").and(accept(MediaType.APPLICATION_JSON)),
     employeeHandler::createEmployee)
   .andRoute(GET("/api/v2/employees/{id}").and(accept(MediaType.APPLICATION_JSON)),
     employeeHandler::getEmployee)
   .andRoute(PUT("/api/v2/employees/{id}").and(accept(MediaType.APPLICATION_JSON)),
     employeeHandler::updateEmployee)
   .andRoute(DELETE("/api/v2/employees").and(accept(MediaType.APPLICATION_JSON)),
     employeeHandler::deleteAllEmployees)
   .andRoute(DELETE("/api/v2/employees/{id}").and(accept(MediaType.APPLICATION_JSON)),
     employeeHandler::deleteEmployee)
   .andRoute(GET("/api/v2/employees/events").and(accept(MediaType.APPLICATION_JSON)),
     employeeHandler::employeeEvents);
}

Find the below working example.


Employee.java
package com.sample.app.entity;

import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

@Document
public class Employee {

 @Id
 private String id;

 private String firstName;

 private String lastName;

 public Employee() {
 }

 public Employee(String firstName, String lastName) {
  this.firstName = firstName;
  this.lastName = lastName;
 }

 public Employee(String id, String firstName, String lastName) {
  super();
  this.id = id;
  this.firstName = firstName;
  this.lastName = lastName;
 }

 public String getFirstName() {
  return firstName;
 }

 public String getId() {
  return id;
 }

 public String getLastName() {
  return lastName;
 }

 public void setFirstName(String firstName) {
  this.firstName = firstName;
 }

 public void setId(String id) {
  this.id = id;
 }

 public void setLastName(String lastName) {
  this.lastName = lastName;
 }

 @Override
 public String toString() {
  StringBuilder builder = new StringBuilder();
  builder.append("Employee [id=").append(id).append(", firstName=").append(firstName).append(", lastName=")
    .append(lastName).append("]");
  return builder.toString();
 }

}


EmployeeEvent.java
package com.sample.app.events;

public class EmployeeEvent {
 private Long eventId;
 private String eventType;

 public EmployeeEvent(Long eventId, String eventType) {
  super();
  this.eventId = eventId;
  this.eventType = eventType;
 }

 public Long getEventId() {
  return eventId;
 }

 public void setEventId(Long eventId) {
  this.eventId = eventId;
 }

 public String getEventType() {
  return eventType;
 }

 public void setEventType(String eventType) {
  this.eventType = eventType;
 }

 @Override
 public String toString() {
  StringBuilder builder = new StringBuilder();
  builder.append("EmployeeEvent [eventId=").append(eventId).append(", eventType=").append(eventType).append("]");
  return builder.toString();
 }

}


EmployeeHandler.java
package com.sample.app.handler;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.reactive.function.server.ServerRequest;

import com.sample.app.entity.Employee;
import com.sample.app.events.EmployeeEvent;
import com.sample.app.repository.EmployeeRepository;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import static org.springframework.web.reactive.function.BodyInserters.fromObject;

import java.time.Duration;

@Component
public class EmployeeHandler {
 private static final Mono<ServerResponse> NOT_FOUND = ServerResponse.notFound().build();

 @Autowired
 private EmployeeRepository employeeRepo;

 public Mono<ServerResponse> getAllEmployees(ServerRequest serverRequest) {
  Flux<Employee> emps = employeeRepo.findAll();

  return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(emps, Employee.class);
 }

 public Mono<ServerResponse> getEmployee(ServerRequest serverRequest) {
  String id = serverRequest.pathVariable("id");

  Mono<Employee> empMono = employeeRepo.findById(id);

  return empMono.flatMap(emp -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(fromObject(emp)))
    .switchIfEmpty(NOT_FOUND);

 }

 public Mono<ServerResponse> createEmployee(ServerRequest serverRequest) {
  Mono<Employee> empMono = serverRequest.bodyToMono(Employee.class);

  return empMono.flatMap(emp -> ServerResponse.status(HttpStatus.CREATED).contentType(MediaType.APPLICATION_JSON)
    .body(employeeRepo.save(emp), Employee.class));
 }

 public Mono<ServerResponse> updateEmployee(ServerRequest serverRequest) {
  String id = serverRequest.pathVariable("id");

  Mono<Employee> persistedEmpMono = employeeRepo.findById(id);

  Mono<Employee> payLoadMono = serverRequest.bodyToMono(Employee.class);

  return payLoadMono
    .zipWith(persistedEmpMono,
      (employee, persistedEmployee) -> new Employee(persistedEmployee.getId(),
        employee.getFirstName(), employee.getLastName()))
    .flatMap(employee -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
      .body(employeeRepo.save(employee), Employee.class))
    .switchIfEmpty(NOT_FOUND);

 }

 public Mono<ServerResponse> deleteEmployee(ServerRequest serverRequest) {

  String id = serverRequest.pathVariable("id");

  Mono<Employee> persistedEmpMono = employeeRepo.findById(id);

  return persistedEmpMono.flatMap(employee -> ServerResponse.ok().build(employeeRepo.delete(employee)))
    .switchIfEmpty(NOT_FOUND);

 }

 public Mono<ServerResponse> deleteAllEmployees(ServerRequest serverRequest) {
  return ServerResponse.ok().build(employeeRepo.deleteAll());
 }

 public Mono<ServerResponse> employeeEvents(ServerRequest serverRequest) {
  Flux<EmployeeEvent> empEvents = Flux.interval(Duration.ofSeconds(1))
    .map(val -> new EmployeeEvent(val, "Employee Event"));

  return ServerResponse.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(empEvents, EmployeeEvent.class);
 }
}


EmployeeRepository.java
package com.sample.app.repository;

import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.stereotype.Repository;

import com.sample.app.entity.Employee;

import reactor.core.publisher.Flux;

@Repository
public interface EmployeeRepository extends ReactiveMongoRepository<Employee, String> {

 public Flux<Employee> findByFirstName(String firstName);
}


EmployeeController.java
package com.sample.app.comntroller;

import java.time.Duration;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import com.sample.app.entity.Employee;
import com.sample.app.events.EmployeeEvent;
import com.sample.app.repository.EmployeeRepository;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
@RequestMapping("api/v1/")
public class EmployeeController {

 @Autowired
 private EmployeeRepository empRepository;

 @RequestMapping(value = "employees", method = RequestMethod.GET)
 public Flux<Employee> all() {
  return empRepository.findAll();
 }

 @RequestMapping(value = "employees", method = RequestMethod.DELETE)
 public Mono<Void> deleteAll() {
  return empRepository.deleteAll();
 }

 @RequestMapping(value = "employees/{id}", method = RequestMethod.GET)
 public Mono<ResponseEntity<Employee>> byId(@PathVariable("id") String id) {
  return empRepository.findById(id).map(emp -> ResponseEntity.ok(emp))
    .defaultIfEmpty(ResponseEntity.notFound().build());
 }

 @RequestMapping(value = "employees/{id}", method = RequestMethod.PUT)
 public Mono<ResponseEntity<Employee>> update(@PathVariable("id") String id, @RequestBody Employee emp) {

  return empRepository.findById(id).flatMap(persistedEmp -> {
   persistedEmp.setFirstName(emp.getFirstName());
   persistedEmp.setLastName(emp.getLastName());
   return empRepository.save(persistedEmp);
  }).map(persistedEmp -> ResponseEntity.status(HttpStatus.CREATED).body(persistedEmp))
    .defaultIfEmpty(ResponseEntity.notFound().build());
 }

 @RequestMapping(value = "employees/{id}", method = RequestMethod.DELETE)
 public Mono<ResponseEntity<Void>> delete(@PathVariable("id") String id) {

  return empRepository.findById(id)
    .flatMap(persistedEmployee -> empRepository.delete(persistedEmployee)
      .then(Mono.just(ResponseEntity.ok().<Void>build())))
    .defaultIfEmpty(ResponseEntity.notFound().build());

 }

 @RequestMapping(value = "employees", method = RequestMethod.POST)
 public Mono<ResponseEntity<Employee>> create(@RequestBody Employee emp) {

  return empRepository.save(emp)
    .map(persistedEmp -> ResponseEntity.status(HttpStatus.CREATED).body(persistedEmp));
 }

 @RequestMapping(value = "employees/events", method = RequestMethod.GET, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
 public Flux<EmployeeEvent> getEmployeeEvents() {
  return Flux.interval(Duration.ofSeconds(1)).map(val -> new EmployeeEvent(val, "Employee Event"));

 }
}


App.java
package com.sample.app;

import static org.springframework.web.reactive.function.server.RequestPredicates.DELETE;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RequestPredicates.PUT;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;

import com.sample.app.entity.Employee;
import com.sample.app.handler.EmployeeHandler;
import com.sample.app.repository.EmployeeRepository;

import reactor.core.publisher.Flux;

@SpringBootApplication
public class App {
 public static void main(String[] args) {

  SpringApplication.run(App.class, args);
 }

 @Bean
 public CommandLineRunner demo(EmployeeRepository empRepository) {
  return (args) -> {
   Employee emp1 = new Employee("Phalgun", "Garimella");
   Employee emp2 = new Employee("Sankalp", "Dubey");
   Employee emp3 = new Employee("Arpan", "Debroy");

   Flux<Employee> empsFlux = Flux.just(emp1, emp2, emp3);

   empsFlux.map(emp -> empRepository.save(emp))
     .subscribe(result -> System.out.println("Created employee : " + result.block()));

   System.out.println("\nGetting all the employees from database\n");
   empRepository.findAll().subscribe(System.out::println);

  };
 }

 @Bean
 RouterFunction<ServerResponse> routes(EmployeeHandler employeeHandler) {
  return route(GET("/api/v2/employees").and(accept(MediaType.APPLICATION_JSON)), employeeHandler::getAllEmployees)
    .andRoute(POST("/api/v2/employees").and(accept(MediaType.APPLICATION_JSON)),
      employeeHandler::createEmployee)
    .andRoute(GET("/api/v2/employees/{id}").and(accept(MediaType.APPLICATION_JSON)),
      employeeHandler::getEmployee)
    .andRoute(PUT("/api/v2/employees/{id}").and(accept(MediaType.APPLICATION_JSON)),
      employeeHandler::updateEmployee)
    .andRoute(DELETE("/api/v2/employees").and(accept(MediaType.APPLICATION_JSON)),
      employeeHandler::deleteAllEmployees)
    .andRoute(DELETE("/api/v2/employees/{id}").and(accept(MediaType.APPLICATION_JSON)),
      employeeHandler::deleteEmployee)
    .andRoute(GET("/api/v2/employees/events").and(accept(MediaType.APPLICATION_JSON)),
      employeeHandler::employeeEvents);

 }

}


pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>springWebflux</groupId>
 <artifactId>springWebflux</artifactId>
 <version>1</version>

 <parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.1.6.RELEASE</version>
 </parent>


 <dependencies>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-webflux</artifactId>
  </dependency>

  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
  </dependency>


  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-test</artifactId>
   <scope>test</scope>
  </dependency>

  <dependency>
   <groupId>de.flapdoodle.embed</groupId>
   <artifactId>de.flapdoodle.embed.mongo</artifactId>
   <!-- <scope>test</scope> -->
  </dependency>


 </dependencies>
</project>

Run App.java.

How to get all the Employees?
Method: GET

API: localhost:8080/api/v2/employees/

How to get specific employee details?
Method: GET

URL: localhost:8080/api/v2/employees/{employee_id}

How to update specific employee details?
Method: PUT
API: localhost:8080/api/v2/employees/{id}
RequestBody:
{
    "firstName": "Ram",
    "lastName": "Gurram"

}

How to delete an employee?
Method: DELETE
API: localhost:8080/api/v2/employees/{id}

How to delete all the employees?
Method: DELETE
API: localhost:8080/api/v2/employees

You can download complete working application from this link.

Previous                                                    Next                                                    Home

No comments:

Post a Comment