Tuesday, 7 June 2022

Apache Atlas: data lineage example

In this post, I am going to explain how to create a data lineage. Data lineage explains how to data flows in the system, like where it is originated, what transformations applied on it, where it moved etc., all these are captured in data lineage.

 


 

I am going to demonstrate, how can we build above image. As you see above image, there are two processes and 5 datasets (files) defined.

 

To achieve this lineage,

 

first I need to define 5 files.

a.   Sales_from_canada.txt

b.   Sales_from_germany.txt

c.    Global_salex.txt

d.   Profit_in_Germany.txt

e.   Profit_in_Canada.txt

 

I need to define two processes

a.   mergeSalesDaily -> takes Sales_from_canada.txt, Sales_from_germany.txt as input and produce global_sales.txt as output

b.   mapReduceSales -> takes global_sales.txt as input and generate profit_in_Germany.txt, Profit_in_Canada.txt files as output.

 

 

Define data files

 

salesFromGermany.json

{
  "entity": {
    "typeName": "DataSet",
    "attributes": {
      "owner": "Krishna Maj",
      "qualifiedName": "DataSet_Germany_sales",
      "description": "sales_from_Germany.txt",
      "name": "sales_from_germany.txt"
    }
  }
}

 

salesFromCanada.json

{
  "entity": {
    "typeName": "DataSet",
    "attributes": {
      "owner": "Krishna Maj",
      "qualifiedName": "DataSet_Canada_sales",
      "description": "sales_from_Canada.txt",
      "name": "sales_from_Canada.txt"
    }
  }
}

 

profitInCanada.json

{
  "entity": {
    "typeName": "DataSet",
    "attributes": {
      "owner": "Krishna Maj",
      "qualifiedName": "DataSet_profit_Canada",
      "description": "profit in Canada",
      "name": "profit_in_Canada.txt"
    }
  }
}

 

profitInGermany.json

{
  "entity": {
    "typeName": "DataSet",
    "attributes": {
      "owner": "Krishna Maj",
      "qualifiedName": "profit_in_Germany",
      "description": "profit in Germany",
      "name": "profit_in_Germany.txt"
    }
  }
}

globalSalesGermanyCanada.json

{
  "entity": {
    "typeName": "DataSet",
    "attributes": {
      "owner": "Krishna Maj",
      "qualifiedName": "global sales Germany canada",
      "description": "global sales germany canada",
      "name": "global_sales_canada_germany.txt"
    }
  }
}

Execute below commands to onboard the dataset files.

curl -X POST -u admin:admin -H 'accept: application/json'  -H 'cache-control: no-cache'  -H 'content-type: application/json'  http://localhost:21000/api/atlas/v2/entity -d @salesFromGermany.json

curl -X POST -u admin:admin -H 'accept: application/json'  -H 'cache-control: no-cache'  -H 'content-type: application/json'  http://localhost:21000/api/atlas/v2/entity -d @salesFromCanada.json

curl -X POST -u admin:admin -H 'accept: application/json'  -H 'cache-control: no-cache'  -H 'content-type: application/json'  http://localhost:21000/api/atlas/v2/entity -d @profitInCanada.json

curl -X POST -u admin:admin -H 'accept: application/json'  -H 'cache-control: no-cache'  -H 'content-type: application/json'  http://localhost:21000/api/atlas/v2/entity -d @profitInGermany.json

curl -X POST -u admin:admin -H 'accept: application/json'  -H 'cache-control: no-cache'  -H 'content-type: application/json'  http://localhost:21000/api/atlas/v2/entity -d @globalSalesGermanyCanada.json

In my case,

 

salesFromGermany.json generated the id "19e1b839-bf0e-450d-8bdd-2e44b00ea87c"

salesFromCanada.json generated the id "3508b7bf-ad31-48ed-9013-030d9c3d10e8"

profitInCanada.json generated the id "7012df53-a1e6-464d-a407-39e04714cf8d"

profitInGermany.json generated the id "f1646aa0-5144-4e74-9907-6c1e6b96fdbf"

globalSalesGermanyCanada.json generated the id "95324b7d-e6f9-4fa7-a2a4-83c194beba54"

 

 

Define merger_process and map_reduce_process types

 

mergerProcess.json

{
  "entityDefs": [
    {
      "category": "ENTITY",
      "name": "merger_process",
      "description": "Merge the data",
      "attributeDefs": [
        {
          "name": "startTime",
          "typeName": "long",
          "isOptional": true,
          "cardinality": "SINGLE",
          "valuesMinCount": 0,
          "valuesMaxCount": 1,
          "isUnique": false,
          "isIndexable": false,
          "includeInNotification": false,
          "searchWeight": -1
        },
        {
          "name": "endTime",
          "typeName": "long",
          "isOptional": true,
          "cardinality": "SINGLE",
          "valuesMinCount": 0,
          "valuesMaxCount": 1,
          "isUnique": false,
          "isIndexable": false,
          "includeInNotification": false,
          "searchWeight": -1
        }
      ],
      "superTypes": [
        "Process"
      ]
    }
  ]
}

mapAndReduceProcess.json

{
  "entityDefs": [
    {
      "category": "ENTITY",
      "name": "map_reduce_process",
      "description": "Map and Reduce process",
      "attributeDefs": [
        {
          "name": "startTime",
          "typeName": "long",
          "isOptional": true,
          "cardinality": "SINGLE",
          "valuesMinCount": 0,
          "valuesMaxCount": 1,
          "isUnique": false,
          "isIndexable": false,
          "includeInNotification": false,
          "searchWeight": -1
        },
        {
          "name": "endTime",
          "typeName": "long",
          "isOptional": true,
          "cardinality": "SINGLE",
          "valuesMinCount": 0,
          "valuesMaxCount": 1,
          "isUnique": false,
          "isIndexable": false,
          "includeInNotification": false,
          "searchWeight": -1
        }
      ],
      "superTypes": [
        "Process"
      ]
    }
  ]
}

Execute below commands to onboard these processes to Atlas.

curl -X POST -u admin:admin -H 'accept: application/json'  -H 'cache-control: no-cache'  -H 'content-type: application/json'  http://localhost:21000/api/atlas/v2/types/typedefs -d @mapAndReduceProcess.json

 

curl -X POST -u admin:admin -H 'accept: application/json'  -H 'cache-control: no-cache'  -H 'content-type: application/json'  http://localhost:21000/api/atlas/v2/types/typedefs -d @mergerProcess.json


bash-3.2$ curl -X POST -u admin:admin -H 'accept: application/json'  -H 'cache-control: no-cache'  -H 'content-type: application/json'  http://localhost:21000/api/atlas/v2/types/typedefs -d @mapAndReduceProcess.json
{"enumDefs":[],"structDefs":[],"classificationDefs":[],"entityDefs":[{"category":"ENTITY","guid":"5029c27f-a750-4806-8b1d-8c4668d1ed87","createdBy":"admin","updatedBy":"admin","createTime":1643376135667,"updateTime":1643376135667,"version":1,"name":"map_reduce_process","description":"Map and Reduce process","typeVersion":"1.0","attributeDefs":[{"name":"startTime","typeName":"long","isOptional":true,"cardinality":"SINGLE","valuesMinCount":0,"valuesMaxCount":1,"isUnique":false,"isIndexable":false,"includeInNotification":false,"searchWeight":-1},{"name":"endTime","typeName":"long","isOptional":true,"cardinality":"SINGLE","valuesMinCount":0,"valuesMaxCount":1,"isUnique":false,"isIndexable":false,"includeInNotification":false,"searchWeight":-1}],"superTypes":["Process"],"subTypes":[],"relationshipAttributeDefs":[{"name":"outputs","typeName":"array<DataSet>","isOptional":true,"cardinality":"SET","valuesMinCount":0,"valuesMaxCount":2147483647,"isUnique":false,"isIndexable":false,"includeInNotification":false,"searchWeight":-1,"relationshipTypeName":"process_dataset_outputs","isLegacyAttribute":true},{"name":"inputs","typeName":"array<DataSet>","isOptional":true,"cardinality":"SET","valuesMinCount":0,"valuesMaxCount":2147483647,"isUnique":false,"isIndexable":false,"includeInNotification":false,"searchWeight":-1,"relationshipTypeName":"dataset_process_inputs","isLegacyAttribute":true},{"name":"meanings","typeName":"array<AtlasGlossaryTerm>","isOptional":true,"cardinality":"SET","valuesMinCount":-1,"valuesMaxCount":-1,"isUnique":false,"isIndexable":false,"includeInNotification":false,"searchWeight":-1,"relationshipTypeName":"AtlasGlossarySemanticAssignment","isLegacyAttribute":false}],"businessAttributeDefs":{}}],"relationshipDefs":[],"businessMetadataDefs":[]}bash-3.2$ 
bash-3.2$ 
bash-3.2$ 
bash-3.2$ curl -X POST -u admin:admin -H 'accept: application/json'  -H 'cache-control: no-cache'  -H 'content-type: application/json'  http://localhost:21000/api/atlas/v2/types/typedefs -d @mergerProcess.json
{"enumDefs":[],"structDefs":[],"classificationDefs":[],"entityDefs":[{"category":"ENTITY","guid":"16606f89-aec6-4b02-a58a-7eb7bb82a41d","createdBy":"admin","updatedBy":"admin","createTime":1643376147173,"updateTime":1643376147173,"version":1,"name":"merger_process","description":"Merge the data","typeVersion":"1.0","attributeDefs":[{"name":"startTime","typeName":"long","isOptional":true,"cardinality":"SINGLE","valuesMinCount":0,"valuesMaxCount":1,"isUnique":false,"isIndexable":false,"includeInNotification":false,"searchWeight":-1},{"name":"endTime","typeName":"long","isOptional":true,"cardinality":"SINGLE","valuesMinCount":0,"valuesMaxCount":1,"isUnique":false,"isIndexable":false,"includeInNotification":false,"searchWeight":-1}],"superTypes":["Process"],"subTypes":[],"relationshipAttributeDefs":[{"name":"outputs","typeName":"array<DataSet>","isOptional":true,"cardinality":"SET","valuesMinCount":0,"valuesMaxCount":2147483647,"isUnique":false,"isIndexable":false,"includeInNotification":false,"searchWeight":-1,"relationshipTypeName":"process_dataset_outputs","isLegacyAttribute":true},{"name":"inputs","typeName":"array<DataSet>","isOptional":true,"cardinality":"SET","valuesMinCount":0,"valuesMaxCount":2147483647,"isUnique":false,"isIndexable":false,"includeInNotification":false,"searchWeight":-1,"relationshipTypeName":"dataset_process_inputs","isLegacyAttribute":true},{"name":"meanings","typeName":"array<AtlasGlossaryTerm>","isOptional":true,"cardinality":"SET","valuesMinCount":-1,"valuesMaxCount":-1,"isUnique":false,"isIndexable":false,"includeInNotification":false,"searchWeight":-1,"relationshipTypeName":"AtlasGlossarySemanticAssignment","isLegacyAttribute":false}],"businessAttributeDefs":{}}],"relationshipDefs":[],"businessMetadataDefs":[]}bash-3.2$ 

Create first lineage

Let’s create a merger_process instance with inputs salesFromGermany.json and, salesFromCanada.json and output globalSalesGermanyCanada.json.

 

mergerProcessInstance.json

{
  "entity": {
    "typeName": "merger_process",
    "attributes": {
      "owner": "Krishna Maj",
      "outputs": [
        {
          "guid": "95324b7d-e6f9-4fa7-a2a4-83c194beba54",
          "typeName": "DataSet"
        }
      ],
      "qualifiedName": "mergeSalesDaily@file_merge",
      "displayName": "Merge Sales",
      "inputs": [
        {
          "guid": "19e1b839-bf0e-450d-8bdd-2e44b00ea87c",
          "typeName": "DataSet"
        },
        {
          "guid": "3508b7bf-ad31-48ed-9013-030d9c3d10e8",
          "typeName": "DataSet"
        }
      ],
      "description": "Daily merge sales data to global sales file",
      "name": "mergeSalesDailyFromGermanyCanada",
      "startTime": 1643363556,
      "endTime": 1643364556,
      "processId":"salesMergeProcessGermanyCanada"
    }
  }
}

Execute below command to define entity.

curl -X POST -u admin:admin -H 'accept: application/json'  -H 'cache-control: no-cache'  -H 'content-type: application/json'  http://localhost:21000/api/atlas/v2/entity -d @mergerProcessInstance.json


Now go the lineage of file global_sales.txt, you will see the diagram like below.





 

Create second lineage

 

 

mapReduceProcessInstance.json

{
  "entity": {
    "typeName": "map_reduce_process",
    "attributes": {
      "owner": "Krishna Maj",
      "outputs": [
        {
          "guid": "7012df53-a1e6-464d-a407-39e04714cf8d",
          "typeName": "DataSet"
        },
        {
          "guid": "f1646aa0-5144-4e74-9907-6c1e6b96fdbf",
          "typeName": "DataSet"
        }
      ],
      "qualifiedName": "mapReduceProcess@mapReduceGermanyCanada",
      "displayName": "Map Reduce sales Germany Canada",
      "inputs": [
        {
          "guid": "95324b7d-e6f9-4fa7-a2a4-83c194beba54",
          "typeName": "DataSet"
        }
      ],
      "description": "map reduce sales daily in Germany, canada",
      "name": "map reduce sales daily in Germany, canada",
      "startTime": 1643363556,
      "endTime": 1643364556,
      "processId":"mapReduceProcess"
    }
  }
}

 

Execute below command to create lineage.

 

curl -X POST -u admin:admin -H 'accept: application/json'  -H 'cache-control: no-cache'  -H 'content-type: application/json'  http://localhost:21000/api/atlas/v2/entity -d @mapReduceProcessInstance.json

 

Reload ui and go to the lineage of global_sales.txt file, you will see the lineage like below.

 

 


You can download all the json files from this link.



 

Previous                                                    Next                                                    Home

No comments:

Post a Comment