DynamoDB Stream + Amazon ES (ElasticSearch, Kibana) [P024]




* Parts

- Raspberry Pi 

- AWS Lambda Java Project in Ecplise


* Contents

1. Upload sensor data to json file using the Amazon S3 Rest API

Pi4J + Amazon S3 REST API + Amazon Athena [P019]



2. Amazon Lambda S3 Trigger (node.js) : S3toDynamoDB

- source code : https://github.com/rdiot/rdiot-p022/blob/master/S3toDynamoDB/index.js

'use strict';

console.log('RDIoT S3toDynamoDB Loading post function');

var AWS = require('aws-sdk'); 

var s3 = new AWS.S3();

var dynamo = new AWS.DynamoDB.DocumentClient();


exports.handler = function(event, context, callback) {

    console.log('Received event:', JSON.stringify(event, null, 2));

    var bucket = event.Records[0]['s3']['bucket']['name'];

    var en = event.Records[0]['eventName'];

    var et = event.Records[0]['eventTime'];

    var key = event.Records[0]['s3']['object']['key'];

    var sip = event.Records[0]['requestParameters']['sourceIPAddress'];


    var params1 = {Bucket: bucket, Key: key}; 

    s3.getObject( params1, function(err,data) {

        if(err) {

            console.log(err.stack);

            callback(err);

        } else {

            var value = data.Body.toString('ascii')

            console.log(data);

            console.log("Raw text:\n" + value);


            var params2 = {

                TableName: "pi-sensor",

                Item:{

                    "id": et,

                    "event" : en,

                    "bucket": bucket,

                    "key" : key,

                    "sip": sip,

                    "value" : value

                }

            };

            console.log("Gettings IoT device details...");


            //S3 to DynamoDB

            dynamo.put(params2, function(err, data) {

                if(err) {

                    console.error("Unable to post devices. Error JSON:", JSON.stringify(err, null, 2));

                    context.fail();

                } else {

                    console.log("keepet data:", JSON.stringify(data, null, 2));

                    context.succeed('success post');

                }


            });

        }

    });

}


3. AWS DynamoDB Monitoring




4. Create IAM Role

- It must have basic Amazon ES, DynamoDB, and Lambda execution permissions

- create custom role name : LambdaDynamoDBtoES


{

  "Version": "2012-10-17",

  "Statement": [

    {

      "Effect": "Allow",

      "Action": [

        "es:ESHttpPost",

        "es:ESHttpPut",

        "dynamodb:DescribeStream",

        "dynamodb:GetRecords",

        "dynamodb:GetShardIterator",

        "dynamodb:ListStreams",

        "logs:CreateLogGroup",

        "logs:CreateLogStream",

        "logs:PutLogEvents"

      ],

      "Resource": "*"

    }

  ]

}


5. Setup Amazon ES (Elasticsearch + Kibana)

- Define domain

- Configure cluster

- Set up access

- Review

- Service dashboard

6. Amazon Lambda DynamoDB Stream Trigger (java) : DynamoDBtoES




7. Develop AWS Lambda Java Project in Eclipse

- New Project : AWS Lambda Java Project


- New AWS Maven Project : awsLambdaDynamoDBStreamtoAmazonES

 : Class Name: LambdaFunctionHandler

 : Input Type : Dynamodb Event


- Check the items of DynamoDB when running pi4j


- Check the insert event message in the Cloud Watch Log 

 : public Integer handleRequest(DynamodbEvent event, Context context) {

   context.getLogger().log("Received event: " + event);

{ApproximateCreationDateTime: Mon Aug 13 12:07:00 UTC 2018,Keys: {id={S: 2018-08-13T12:07:12.024Z,}},NewImage: {bucket={S: rdiot-test,}, id={S: 2018-08-13T12:07:12.024Z,}, sip={S: 211.245.211.252,}, event={S: ObjectCreated:Put,}, value={S: {"name":"temperature","value":33.0},}, key={S: pi_dht11_2018813079.json,}},SequenceNumber: 187352800000000003101012958,SizeBytes: 175,StreamViewType: NEW_AND_OLD_IMAGES}



- Key Code
for (DynamodbStreamRecord record : event.getRecords()) {
            context.getLogger().log(record.getEventID());
            context.getLogger().log(record.getEventName());
            context.getLogger().log(record.getDynamodb().toString());
            switch (record.getEventName()) {
            case "INSERT":
            case "MODIFY":
            // insert, update code area forAmazon  Elastic Search 
            break;
            case "REMOVE":
            // none
            break;
           
            }
    }

- Reference
https://docs.aws.amazon.com/ko_kr/amazondynamodb/latest/developerguide/Streams.Lambda.html
https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/ES.html
https://github.com/elastic/elasticsearch-js

https://docs.amazonaws.cn/en_us/elasticsearch-service/latest/developerguide/es-aws-integrations.html#es-aws-integrations-dynamodb-es

https://docs.aws.amazon.com/ko_kr/elasticsearch-service/latest/developerguide/aes-dg.pdf

Posted by RDIoT
|