In this post, I am going to explain how to create a data lineage. Data lineage explains how to data flows in the system, like where it is originated, what transformations applied on it, where it moved etc., all these are captured in data lineage.
I am going to demonstrate, how can we build above image. As you see above image, there are two processes and 5 datasets (files) defined.
To achieve this lineage,
first I need to define 5 files.
a. Sales_from_canada.txt
b. Sales_from_germany.txt
c. Global_salex.txt
d. Profit_in_Germany.txt
e. Profit_in_Canada.txt
I need to define two processes
a. mergeSalesDaily -> takes Sales_from_canada.txt, Sales_from_germany.txt as input and produce global_sales.txt as output
b. mapReduceSales -> takes global_sales.txt as input and generate profit_in_Germany.txt, Profit_in_Canada.txt files as output.
Define data files
salesFromGermany.json
{
"entity": {
"typeName": "DataSet",
"attributes": {
"owner": "Krishna Maj",
"qualifiedName": "DataSet_Germany_sales",
"description": "sales_from_Germany.txt",
"name": "sales_from_germany.txt"
}
}
}
salesFromCanada.json
{
"entity": {
"typeName": "DataSet",
"attributes": {
"owner": "Krishna Maj",
"qualifiedName": "DataSet_Canada_sales",
"description": "sales_from_Canada.txt",
"name": "sales_from_Canada.txt"
}
}
}
profitInCanada.json
{
"entity": {
"typeName": "DataSet",
"attributes": {
"owner": "Krishna Maj",
"qualifiedName": "DataSet_profit_Canada",
"description": "profit in Canada",
"name": "profit_in_Canada.txt"
}
}
}
profitInGermany.json
{
"entity": {
"typeName": "DataSet",
"attributes": {
"owner": "Krishna Maj",
"qualifiedName": "profit_in_Germany",
"description": "profit in Germany",
"name": "profit_in_Germany.txt"
}
}
}
globalSalesGermanyCanada.json
{
"entity": {
"typeName": "DataSet",
"attributes": {
"owner": "Krishna Maj",
"qualifiedName": "global sales Germany canada",
"description": "global sales germany canada",
"name": "global_sales_canada_germany.txt"
}
}
}
Execute below commands to onboard the dataset files.
curl -X POST -u admin:admin -H 'accept: application/json' -H 'cache-control: no-cache' -H 'content-type: application/json' http://localhost:21000/api/atlas/v2/entity -d @salesFromGermany.json curl -X POST -u admin:admin -H 'accept: application/json' -H 'cache-control: no-cache' -H 'content-type: application/json' http://localhost:21000/api/atlas/v2/entity -d @salesFromCanada.json curl -X POST -u admin:admin -H 'accept: application/json' -H 'cache-control: no-cache' -H 'content-type: application/json' http://localhost:21000/api/atlas/v2/entity -d @profitInCanada.json curl -X POST -u admin:admin -H 'accept: application/json' -H 'cache-control: no-cache' -H 'content-type: application/json' http://localhost:21000/api/atlas/v2/entity -d @profitInGermany.json curl -X POST -u admin:admin -H 'accept: application/json' -H 'cache-control: no-cache' -H 'content-type: application/json' http://localhost:21000/api/atlas/v2/entity -d @globalSalesGermanyCanada.json
In my case,
salesFromGermany.json generated the id "19e1b839-bf0e-450d-8bdd-2e44b00ea87c"
salesFromCanada.json generated the id "3508b7bf-ad31-48ed-9013-030d9c3d10e8"
profitInCanada.json generated the id "7012df53-a1e6-464d-a407-39e04714cf8d"
profitInGermany.json generated the id "f1646aa0-5144-4e74-9907-6c1e6b96fdbf"
globalSalesGermanyCanada.json generated the id "95324b7d-e6f9-4fa7-a2a4-83c194beba54"
Define merger_process and map_reduce_process types
mergerProcess.json
{
"entityDefs": [
{
"category": "ENTITY",
"name": "merger_process",
"description": "Merge the data",
"attributeDefs": [
{
"name": "startTime",
"typeName": "long",
"isOptional": true,
"cardinality": "SINGLE",
"valuesMinCount": 0,
"valuesMaxCount": 1,
"isUnique": false,
"isIndexable": false,
"includeInNotification": false,
"searchWeight": -1
},
{
"name": "endTime",
"typeName": "long",
"isOptional": true,
"cardinality": "SINGLE",
"valuesMinCount": 0,
"valuesMaxCount": 1,
"isUnique": false,
"isIndexable": false,
"includeInNotification": false,
"searchWeight": -1
}
],
"superTypes": [
"Process"
]
}
]
}
mapAndReduceProcess.json
{
"entityDefs": [
{
"category": "ENTITY",
"name": "map_reduce_process",
"description": "Map and Reduce process",
"attributeDefs": [
{
"name": "startTime",
"typeName": "long",
"isOptional": true,
"cardinality": "SINGLE",
"valuesMinCount": 0,
"valuesMaxCount": 1,
"isUnique": false,
"isIndexable": false,
"includeInNotification": false,
"searchWeight": -1
},
{
"name": "endTime",
"typeName": "long",
"isOptional": true,
"cardinality": "SINGLE",
"valuesMinCount": 0,
"valuesMaxCount": 1,
"isUnique": false,
"isIndexable": false,
"includeInNotification": false,
"searchWeight": -1
}
],
"superTypes": [
"Process"
]
}
]
}
Execute below commands to onboard these processes to Atlas.
curl -X POST -u admin:admin -H 'accept: application/json' -H 'cache-control: no-cache' -H 'content-type: application/json' http://localhost:21000/api/atlas/v2/types/typedefs -d @mapAndReduceProcess.json
curl -X POST -u admin:admin -H 'accept: application/json' -H 'cache-control: no-cache' -H 'content-type: application/json' http://localhost:21000/api/atlas/v2/types/typedefs -d @mergerProcess.json
bash-3.2$ curl -X POST -u admin:admin -H 'accept: application/json' -H 'cache-control: no-cache' -H 'content-type: application/json' http://localhost:21000/api/atlas/v2/types/typedefs -d @mapAndReduceProcess.json
{"enumDefs":[],"structDefs":[],"classificationDefs":[],"entityDefs":[{"category":"ENTITY","guid":"5029c27f-a750-4806-8b1d-8c4668d1ed87","createdBy":"admin","updatedBy":"admin","createTime":1643376135667,"updateTime":1643376135667,"version":1,"name":"map_reduce_process","description":"Map and Reduce process","typeVersion":"1.0","attributeDefs":[{"name":"startTime","typeName":"long","isOptional":true,"cardinality":"SINGLE","valuesMinCount":0,"valuesMaxCount":1,"isUnique":false,"isIndexable":false,"includeInNotification":false,"searchWeight":-1},{"name":"endTime","typeName":"long","isOptional":true,"cardinality":"SINGLE","valuesMinCount":0,"valuesMaxCount":1,"isUnique":false,"isIndexable":false,"includeInNotification":false,"searchWeight":-1}],"superTypes":["Process"],"subTypes":[],"relationshipAttributeDefs":[{"name":"outputs","typeName":"array<DataSet>","isOptional":true,"cardinality":"SET","valuesMinCount":0,"valuesMaxCount":2147483647,"isUnique":false,"isIndexable":false,"includeInNotification":false,"searchWeight":-1,"relationshipTypeName":"process_dataset_outputs","isLegacyAttribute":true},{"name":"inputs","typeName":"array<DataSet>","isOptional":true,"cardinality":"SET","valuesMinCount":0,"valuesMaxCount":2147483647,"isUnique":false,"isIndexable":false,"includeInNotification":false,"searchWeight":-1,"relationshipTypeName":"dataset_process_inputs","isLegacyAttribute":true},{"name":"meanings","typeName":"array<AtlasGlossaryTerm>","isOptional":true,"cardinality":"SET","valuesMinCount":-1,"valuesMaxCount":-1,"isUnique":false,"isIndexable":false,"includeInNotification":false,"searchWeight":-1,"relationshipTypeName":"AtlasGlossarySemanticAssignment","isLegacyAttribute":false}],"businessAttributeDefs":{}}],"relationshipDefs":[],"businessMetadataDefs":[]}bash-3.2$
bash-3.2$
bash-3.2$
bash-3.2$ curl -X POST -u admin:admin -H 'accept: application/json' -H 'cache-control: no-cache' -H 'content-type: application/json' http://localhost:21000/api/atlas/v2/types/typedefs -d @mergerProcess.json
{"enumDefs":[],"structDefs":[],"classificationDefs":[],"entityDefs":[{"category":"ENTITY","guid":"16606f89-aec6-4b02-a58a-7eb7bb82a41d","createdBy":"admin","updatedBy":"admin","createTime":1643376147173,"updateTime":1643376147173,"version":1,"name":"merger_process","description":"Merge the data","typeVersion":"1.0","attributeDefs":[{"name":"startTime","typeName":"long","isOptional":true,"cardinality":"SINGLE","valuesMinCount":0,"valuesMaxCount":1,"isUnique":false,"isIndexable":false,"includeInNotification":false,"searchWeight":-1},{"name":"endTime","typeName":"long","isOptional":true,"cardinality":"SINGLE","valuesMinCount":0,"valuesMaxCount":1,"isUnique":false,"isIndexable":false,"includeInNotification":false,"searchWeight":-1}],"superTypes":["Process"],"subTypes":[],"relationshipAttributeDefs":[{"name":"outputs","typeName":"array<DataSet>","isOptional":true,"cardinality":"SET","valuesMinCount":0,"valuesMaxCount":2147483647,"isUnique":false,"isIndexable":false,"includeInNotification":false,"searchWeight":-1,"relationshipTypeName":"process_dataset_outputs","isLegacyAttribute":true},{"name":"inputs","typeName":"array<DataSet>","isOptional":true,"cardinality":"SET","valuesMinCount":0,"valuesMaxCount":2147483647,"isUnique":false,"isIndexable":false,"includeInNotification":false,"searchWeight":-1,"relationshipTypeName":"dataset_process_inputs","isLegacyAttribute":true},{"name":"meanings","typeName":"array<AtlasGlossaryTerm>","isOptional":true,"cardinality":"SET","valuesMinCount":-1,"valuesMaxCount":-1,"isUnique":false,"isIndexable":false,"includeInNotification":false,"searchWeight":-1,"relationshipTypeName":"AtlasGlossarySemanticAssignment","isLegacyAttribute":false}],"businessAttributeDefs":{}}],"relationshipDefs":[],"businessMetadataDefs":[]}bash-3.2$
Create first lineage
Let’s create a merger_process instance with inputs salesFromGermany.json and, salesFromCanada.json and output globalSalesGermanyCanada.json.
mergerProcessInstance.json
{
"entity": {
"typeName": "merger_process",
"attributes": {
"owner": "Krishna Maj",
"outputs": [
{
"guid": "95324b7d-e6f9-4fa7-a2a4-83c194beba54",
"typeName": "DataSet"
}
],
"qualifiedName": "mergeSalesDaily@file_merge",
"displayName": "Merge Sales",
"inputs": [
{
"guid": "19e1b839-bf0e-450d-8bdd-2e44b00ea87c",
"typeName": "DataSet"
},
{
"guid": "3508b7bf-ad31-48ed-9013-030d9c3d10e8",
"typeName": "DataSet"
}
],
"description": "Daily merge sales data to global sales file",
"name": "mergeSalesDailyFromGermanyCanada",
"startTime": 1643363556,
"endTime": 1643364556,
"processId":"salesMergeProcessGermanyCanada"
}
}
}
Execute below command to define entity.
curl -X POST -u admin:admin -H 'accept: application/json' -H 'cache-control: no-cache' -H 'content-type: application/json' http://localhost:21000/api/atlas/v2/entity -d @mergerProcessInstance.json
Now go the lineage of file global_sales.txt, you will see the diagram like below.
Create second lineage
mapReduceProcessInstance.json
{
"entity": {
"typeName": "map_reduce_process",
"attributes": {
"owner": "Krishna Maj",
"outputs": [
{
"guid": "7012df53-a1e6-464d-a407-39e04714cf8d",
"typeName": "DataSet"
},
{
"guid": "f1646aa0-5144-4e74-9907-6c1e6b96fdbf",
"typeName": "DataSet"
}
],
"qualifiedName": "mapReduceProcess@mapReduceGermanyCanada",
"displayName": "Map Reduce sales Germany Canada",
"inputs": [
{
"guid": "95324b7d-e6f9-4fa7-a2a4-83c194beba54",
"typeName": "DataSet"
}
],
"description": "map reduce sales daily in Germany, canada",
"name": "map reduce sales daily in Germany, canada",
"startTime": 1643363556,
"endTime": 1643364556,
"processId":"mapReduceProcess"
}
}
}
Execute below command to create lineage.
curl -X POST -u admin:admin -H 'accept: application/json' -H 'cache-control: no-cache' -H 'content-type: application/json' http://localhost:21000/api/atlas/v2/entity -d @mapReduceProcessInstance.json
Reload ui and go to the lineage of global_sales.txt file, you will see the lineage like below.
You can download all the json files from this link.
Previous Next Home
No comments:
Post a Comment