Thursday 30 April 2015

Apache Avro

Apache Avro is a data serialization system. Serialization is the process of converting object state into sequence of bytes. de-Serialization is a process to bring back the object’s state from bytes.

Avro provides:
   1.   Rich data structures.
   2.   A compact, fast, binary data format.
   3.   A container file, to store persistent data.
   4.   Remote procedure call (RPC).
   5.   Simple integration with dynamic languages. Code generation is not required to read or write data files nor to use or implement RPC protocols. Code generation as an optional optimization, only worth implementing for statically typed languages.

Avro relies on schemas. When Avro data stored in a file, the schema also stored with it. Avro Schemas defined with JSON.  I will explain in detail, by serializing and de-serializing the Employee object using Avro.

In brief, following are the steps.
   1.   Define schema for Employee
   2.   Generate class for the Employee schema
   3.   Use the generated class to define one (or) more employee objects.
   4.   Serialize employee object defined in step3.
   5.   De-serialize and print employee object.

Step 1: Define Employee schema

Create “employee.avsc” file with following data.
{"namespace": "tutorial.model",
 "type": "record",
 "name": "Employee",
 "fields": [
     {"name": "firstName", "type": "string"},
     {"name": "lastName", "type": "string"},
     {"name": "age",  "type": "int"},
     {"name": "id",  "type": "string"},
     {"name" : "company", "type" : "string"}
 ]
}
When you compile above schema, it creates Employee.java file inside a package “tutorial.model”. As per schema, Employee has five fields firstName, lastName, age, id and company. Fields firstName, lastName, id, company are of type String and age is of type int.

Step 2: Generate Employee.java, from employee.avsc file.

"avro-tools jar" file is used to generate code from the schema file.

Syntax
java -jar avro-tools-1.7.7.jar compile schema <schema file> <destination>

To generate the code for employee schema, run the following command.

java -jar avro-tools-1.7.7.jar compile schema employee.avsc .

It generates Employee.java file. It is time to use Employee.java file to serialize and de-serialize using using avro.

Step 3: Create Eclipse maven project “avro_tutorial”.
File -> New -> Other

Select Maven Project and press Next

Select the check box “Create a simple project (Skip archetype selection) and press Next.
Give Group Id and Artifact Id as “avro_tutorial” and press Finish.

Step 4: Open “pom.xml” and update dependencies for avro.

pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>avro_tutorial</groupId>
 <artifactId>avro_tutorial</artifactId>
 <version>0.0.1-SNAPSHOT</version>
 <properties>
  <avro_version>1.7.7</avro_version>
 </properties>

 <dependencies>
  <dependency>
   <groupId>org.apache.avro</groupId>
   <artifactId>avro</artifactId>
   <version>${avro_version}</version>
  </dependency>
 </dependencies>
</project>


Step 5: Create new package “tutorial.model”. Place the file Employee.java (generated in Step2) in this package.

Employee.java
/**
 * Autogenerated by Avro
 * 
 * DO NOT EDIT DIRECTLY
 */
package tutorial.model;

@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class Employee extends org.apache.avro.specific.SpecificRecordBase
  implements org.apache.avro.specific.SpecificRecord {
 public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser()
   .parse("{\"type\":\"record\",\"name\":\"Employee\",\"namespace\":\"tutorial.model\",\"fields\":[{\"name\":\"firstName\",\"type\":\"string\"},{\"name\":\"lastName\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"company\",\"type\":\"string\"}]}");

 public static org.apache.avro.Schema getClassSchema() {
  return SCHEMA$;
 }

 @Deprecated
 public java.lang.CharSequence firstName;
 @Deprecated
 public java.lang.CharSequence lastName;
 @Deprecated
 public int age;
 @Deprecated
 public java.lang.CharSequence id;
 @Deprecated
 public java.lang.CharSequence company;

 /**
  * Default constructor. Note that this does not initialize fields to their
  * default values from the schema. If that is desired then one should use
  * <code>newBuilder()</code>.
  */
 public Employee() {
 }

 /**
  * All-args constructor.
  */
 public Employee(java.lang.CharSequence firstName,
   java.lang.CharSequence lastName, java.lang.Integer age,
   java.lang.CharSequence id, java.lang.CharSequence company) {
  this.firstName = firstName;
  this.lastName = lastName;
  this.age = age;
  this.id = id;
  this.company = company;
 }

 public org.apache.avro.Schema getSchema() {
  return SCHEMA$;
 }

 // Used by DatumWriter. Applications should not call.
 public java.lang.Object get(int field$) {
  switch (field$) {
  case 0:
   return firstName;
  case 1:
   return lastName;
  case 2:
   return age;
  case 3:
   return id;
  case 4:
   return company;
  default:
   throw new org.apache.avro.AvroRuntimeException("Bad index");
  }
 }

 // Used by DatumReader. Applications should not call.
 @SuppressWarnings(value = "unchecked")
 public void put(int field$, java.lang.Object value$) {
  switch (field$) {
  case 0:
   firstName = (java.lang.CharSequence) value$;
   break;
  case 1:
   lastName = (java.lang.CharSequence) value$;
   break;
  case 2:
   age = (java.lang.Integer) value$;
   break;
  case 3:
   id = (java.lang.CharSequence) value$;
   break;
  case 4:
   company = (java.lang.CharSequence) value$;
   break;
  default:
   throw new org.apache.avro.AvroRuntimeException("Bad index");
  }
 }

 /**
  * Gets the value of the 'firstName' field.
  */
 public java.lang.CharSequence getFirstName() {
  return firstName;
 }

 /**
  * Sets the value of the 'firstName' field.
  * 
  * @param value
  *            the value to set.
  */
 public void setFirstName(java.lang.CharSequence value) {
  this.firstName = value;
 }

 /**
  * Gets the value of the 'lastName' field.
  */
 public java.lang.CharSequence getLastName() {
  return lastName;
 }

 /**
  * Sets the value of the 'lastName' field.
  * 
  * @param value
  *            the value to set.
  */
 public void setLastName(java.lang.CharSequence value) {
  this.lastName = value;
 }

 /**
  * Gets the value of the 'age' field.
  */
 public java.lang.Integer getAge() {
  return age;
 }

 /**
  * Sets the value of the 'age' field.
  * 
  * @param value
  *            the value to set.
  */
 public void setAge(java.lang.Integer value) {
  this.age = value;
 }

 /**
  * Gets the value of the 'id' field.
  */
 public java.lang.CharSequence getId() {
  return id;
 }

 /**
  * Sets the value of the 'id' field.
  * 
  * @param value
  *            the value to set.
  */
 public void setId(java.lang.CharSequence value) {
  this.id = value;
 }

 /**
  * Gets the value of the 'company' field.
  */
 public java.lang.CharSequence getCompany() {
  return company;
 }

 /**
  * Sets the value of the 'company' field.
  * 
  * @param value
  *            the value to set.
  */
 public void setCompany(java.lang.CharSequence value) {
  this.company = value;
 }

 /** Creates a new Employee RecordBuilder */
 public static tutorial.model.Employee.Builder newBuilder() {
  return new tutorial.model.Employee.Builder();
 }

 /** Creates a new Employee RecordBuilder by copying an existing Builder */
 public static tutorial.model.Employee.Builder newBuilder(
   tutorial.model.Employee.Builder other) {
  return new tutorial.model.Employee.Builder(other);
 }

 /**
  * Creates a new Employee RecordBuilder by copying an existing Employee
  * instance
  */
 public static tutorial.model.Employee.Builder newBuilder(
   tutorial.model.Employee other) {
  return new tutorial.model.Employee.Builder(other);
 }

 /**
  * RecordBuilder for Employee instances.
  */
 public static class Builder extends
   org.apache.avro.specific.SpecificRecordBuilderBase<Employee>
   implements org.apache.avro.data.RecordBuilder<Employee> {

  private java.lang.CharSequence firstName;
  private java.lang.CharSequence lastName;
  private int age;
  private java.lang.CharSequence id;
  private java.lang.CharSequence company;

  /** Creates a new Builder */
  private Builder() {
   super(tutorial.model.Employee.SCHEMA$);
  }

  /** Creates a Builder by copying an existing Builder */
  private Builder(tutorial.model.Employee.Builder other) {
   super(other);
   if (isValidValue(fields()[0], other.firstName)) {
    this.firstName = data().deepCopy(fields()[0].schema(),
      other.firstName);
    fieldSetFlags()[0] = true;
   }
   if (isValidValue(fields()[1], other.lastName)) {
    this.lastName = data().deepCopy(fields()[1].schema(),
      other.lastName);
    fieldSetFlags()[1] = true;
   }
   if (isValidValue(fields()[2], other.age)) {
    this.age = data().deepCopy(fields()[2].schema(), other.age);
    fieldSetFlags()[2] = true;
   }
   if (isValidValue(fields()[3], other.id)) {
    this.id = data().deepCopy(fields()[3].schema(), other.id);
    fieldSetFlags()[3] = true;
   }
   if (isValidValue(fields()[4], other.company)) {
    this.company = data().deepCopy(fields()[4].schema(),
      other.company);
    fieldSetFlags()[4] = true;
   }
  }

  /** Creates a Builder by copying an existing Employee instance */
  private Builder(tutorial.model.Employee other) {
   super(tutorial.model.Employee.SCHEMA$);
   if (isValidValue(fields()[0], other.firstName)) {
    this.firstName = data().deepCopy(fields()[0].schema(),
      other.firstName);
    fieldSetFlags()[0] = true;
   }
   if (isValidValue(fields()[1], other.lastName)) {
    this.lastName = data().deepCopy(fields()[1].schema(),
      other.lastName);
    fieldSetFlags()[1] = true;
   }
   if (isValidValue(fields()[2], other.age)) {
    this.age = data().deepCopy(fields()[2].schema(), other.age);
    fieldSetFlags()[2] = true;
   }
   if (isValidValue(fields()[3], other.id)) {
    this.id = data().deepCopy(fields()[3].schema(), other.id);
    fieldSetFlags()[3] = true;
   }
   if (isValidValue(fields()[4], other.company)) {
    this.company = data().deepCopy(fields()[4].schema(),
      other.company);
    fieldSetFlags()[4] = true;
   }
  }

  /** Gets the value of the 'firstName' field */
  public java.lang.CharSequence getFirstName() {
   return firstName;
  }

  /** Sets the value of the 'firstName' field */
  public tutorial.model.Employee.Builder setFirstName(
    java.lang.CharSequence value) {
   validate(fields()[0], value);
   this.firstName = value;
   fieldSetFlags()[0] = true;
   return this;
  }

  /** Checks whether the 'firstName' field has been set */
  public boolean hasFirstName() {
   return fieldSetFlags()[0];
  }

  /** Clears the value of the 'firstName' field */
  public tutorial.model.Employee.Builder clearFirstName() {
   firstName = null;
   fieldSetFlags()[0] = false;
   return this;
  }

  /** Gets the value of the 'lastName' field */
  public java.lang.CharSequence getLastName() {
   return lastName;
  }

  /** Sets the value of the 'lastName' field */
  public tutorial.model.Employee.Builder setLastName(
    java.lang.CharSequence value) {
   validate(fields()[1], value);
   this.lastName = value;
   fieldSetFlags()[1] = true;
   return this;
  }

  /** Checks whether the 'lastName' field has been set */
  public boolean hasLastName() {
   return fieldSetFlags()[1];
  }

  /** Clears the value of the 'lastName' field */
  public tutorial.model.Employee.Builder clearLastName() {
   lastName = null;
   fieldSetFlags()[1] = false;
   return this;
  }

  /** Gets the value of the 'age' field */
  public java.lang.Integer getAge() {
   return age;
  }

  /** Sets the value of the 'age' field */
  public tutorial.model.Employee.Builder setAge(int value) {
   validate(fields()[2], value);
   this.age = value;
   fieldSetFlags()[2] = true;
   return this;
  }

  /** Checks whether the 'age' field has been set */
  public boolean hasAge() {
   return fieldSetFlags()[2];
  }

  /** Clears the value of the 'age' field */
  public tutorial.model.Employee.Builder clearAge() {
   fieldSetFlags()[2] = false;
   return this;
  }

  /** Gets the value of the 'id' field */
  public java.lang.CharSequence getId() {
   return id;
  }

  /** Sets the value of the 'id' field */
  public tutorial.model.Employee.Builder setId(
    java.lang.CharSequence value) {
   validate(fields()[3], value);
   this.id = value;
   fieldSetFlags()[3] = true;
   return this;
  }

  /** Checks whether the 'id' field has been set */
  public boolean hasId() {
   return fieldSetFlags()[3];
  }

  /** Clears the value of the 'id' field */
  public tutorial.model.Employee.Builder clearId() {
   id = null;
   fieldSetFlags()[3] = false;
   return this;
  }

  /** Gets the value of the 'company' field */
  public java.lang.CharSequence getCompany() {
   return company;
  }

  /** Sets the value of the 'company' field */
  public tutorial.model.Employee.Builder setCompany(
    java.lang.CharSequence value) {
   validate(fields()[4], value);
   this.company = value;
   fieldSetFlags()[4] = true;
   return this;
  }

  /** Checks whether the 'company' field has been set */
  public boolean hasCompany() {
   return fieldSetFlags()[4];
  }

  /** Clears the value of the 'company' field */
  public tutorial.model.Employee.Builder clearCompany() {
   company = null;
   fieldSetFlags()[4] = false;
   return this;
  }

  public Employee build() {
   try {
    Employee record = new Employee();
    record.firstName = fieldSetFlags()[0] ? this.firstName
      : (java.lang.CharSequence) defaultValue(fields()[0]);
    record.lastName = fieldSetFlags()[1] ? this.lastName
      : (java.lang.CharSequence) defaultValue(fields()[1]);
    record.age = fieldSetFlags()[2] ? this.age
      : (java.lang.Integer) defaultValue(fields()[2]);
    record.id = fieldSetFlags()[3] ? this.id
      : (java.lang.CharSequence) defaultValue(fields()[3]);
    record.company = fieldSetFlags()[4] ? this.company
      : (java.lang.CharSequence) defaultValue(fields()[4]);
    return record;
   } catch (Exception e) {
    throw new org.apache.avro.AvroRuntimeException(e);
   }
  }
 }
}


Step 6: Create a package “tutorial.main”. Create class EmployeeUtil.java inside this package.
package tutorial.main;

import java.io.File;
import java.io.IOException;

import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;

import tutorial.model.Employee;

public class EmployeeUtil {
 public static void serializeEmployee(Employee emp, String fileName)
   throws IOException {
  DatumWriter<Employee> userDatumWriter = new SpecificDatumWriter<Employee>(
    Employee.class);
  DataFileWriter<Employee> dataFileWriter = new DataFileWriter<Employee>(
    userDatumWriter);

  dataFileWriter.create(emp.getSchema(), new File(fileName));
  dataFileWriter.append(emp);
  dataFileWriter.close();

 }

 public static Employee deSerializeEmployee(String fileName)
   throws IOException {
  // De-serialize employee from disk
  File file = new File(fileName);
  DatumReader<Employee> userDatumReader = new SpecificDatumReader<Employee>(
    Employee.class);
  DataFileReader<Employee> dataFileReader = new DataFileReader<Employee>(
    file, userDatumReader);
  Employee employee = null;
  while (dataFileReader.hasNext()) {
   /*
    * Reuse employee object by passing it to next(). This saves us from
    * allocating and garbage collecting many objects for files with
    * many items.
    */
   employee = dataFileReader.next(employee);
  }

  dataFileReader.close();
  return employee;
 }

}

Step 7: Create “Main.java” inside the package “tutorial.main”.

package tutorial.main;

import java.io.IOException;

import tutorial.model.Employee;

public class Main {
 public static void main(String args[]) throws IOException{
  Employee emp = new Employee();
  
  /* Initialize emp */
  emp.setAge(27);
  emp.setCompany("ABCD");
  emp.setFirstName("Hari krishna");
  emp.setId("E432123");
  emp.setLastName("Gurram");
  
  /* Serialize Employee */
  EmployeeUtil.serializeEmployee(emp, "ser.out");
  
  /* De-serialize Employee */
  Employee emp1 = EmployeeUtil.deSerializeEmployee("ser.out");
  
  System.out.println(emp1);
  
 }
}

Run “Main.java”, you will get following output.
{"firstName": "Hari krishna", "lastName": "Gurram", "age": 27, "id": "E432123", "company": "ABCD"}
  
Final project structure looks like below.



Prevoius                                                 Next                                                 Home

No comments:

Post a Comment