DynamoDB Stream + Amazon ES (ElasticSearch, Kibana) [P024]




* Parts

- Raspberry Pi 

- AWS Lambda Java Project in Ecplise


* Contents

1. Upload sensor data to json file using the Amazon S3 Rest API

Pi4J + Amazon S3 REST API + Amazon Athena [P019]



2. Amazon Lambda S3 Trigger (node.js) : S3toDynamoDB

- source code : https://github.com/rdiot/rdiot-p022/blob/master/S3toDynamoDB/index.js

'use strict';

console.log('RDIoT S3toDynamoDB Loading post function');

var AWS = require('aws-sdk'); 

var s3 = new AWS.S3();

var dynamo = new AWS.DynamoDB.DocumentClient();


exports.handler = function(event, context, callback) {

    console.log('Received event:', JSON.stringify(event, null, 2));

    var bucket = event.Records[0]['s3']['bucket']['name'];

    var en = event.Records[0]['eventName'];

    var et = event.Records[0]['eventTime'];

    var key = event.Records[0]['s3']['object']['key'];

    var sip = event.Records[0]['requestParameters']['sourceIPAddress'];


    var params1 = {Bucket: bucket, Key: key}; 

    s3.getObject( params1, function(err,data) {

        if(err) {

            console.log(err.stack);

            callback(err);

        } else {

            var value = data.Body.toString('ascii')

            console.log(data);

            console.log("Raw text:\n" + value);


            var params2 = {

                TableName: "pi-sensor",

                Item:{

                    "id": et,

                    "event" : en,

                    "bucket": bucket,

                    "key" : key,

                    "sip": sip,

                    "value" : value

                }

            };

            console.log("Gettings IoT device details...");


            //S3 to DynamoDB

            dynamo.put(params2, function(err, data) {

                if(err) {

                    console.error("Unable to post devices. Error JSON:", JSON.stringify(err, null, 2));

                    context.fail();

                } else {

                    console.log("keepet data:", JSON.stringify(data, null, 2));

                    context.succeed('success post');

                }


            });

        }

    });

}


3. AWS DynamoDB Monitoring




4. Create IAM Role

- It must have basic Amazon ES, DynamoDB, and Lambda execution permissions

- create custom role name : LambdaDynamoDBtoES


{

  "Version": "2012-10-17",

  "Statement": [

    {

      "Effect": "Allow",

      "Action": [

        "es:ESHttpPost",

        "es:ESHttpPut",

        "dynamodb:DescribeStream",

        "dynamodb:GetRecords",

        "dynamodb:GetShardIterator",

        "dynamodb:ListStreams",

        "logs:CreateLogGroup",

        "logs:CreateLogStream",

        "logs:PutLogEvents"

      ],

      "Resource": "*"

    }

  ]

}


5. Setup Amazon ES (Elasticsearch + Kibana)

- Define domain

- Configure cluster

- Set up access

- Review

- Service dashboard

6. Amazon Lambda DynamoDB Stream Trigger (java) : DynamoDBtoES




7. Develop AWS Lambda Java Project in Eclipse

- New Project : AWS Lambda Java Project


- New AWS Maven Project : awsLambdaDynamoDBStreamtoAmazonES

 : Class Name: LambdaFunctionHandler

 : Input Type : Dynamodb Event


- Check the items of DynamoDB when running pi4j


- Check the insert event message in the Cloud Watch Log 

 : public Integer handleRequest(DynamodbEvent event, Context context) {

   context.getLogger().log("Received event: " + event);

{ApproximateCreationDateTime: Mon Aug 13 12:07:00 UTC 2018,Keys: {id={S: 2018-08-13T12:07:12.024Z,}},NewImage: {bucket={S: rdiot-test,}, id={S: 2018-08-13T12:07:12.024Z,}, sip={S: 211.245.211.252,}, event={S: ObjectCreated:Put,}, value={S: {"name":"temperature","value":33.0},}, key={S: pi_dht11_2018813079.json,}},SequenceNumber: 187352800000000003101012958,SizeBytes: 175,StreamViewType: NEW_AND_OLD_IMAGES}



- Key Code
for (DynamodbStreamRecord record : event.getRecords()) {
            context.getLogger().log(record.getEventID());
            context.getLogger().log(record.getEventName());
            context.getLogger().log(record.getDynamodb().toString());
            switch (record.getEventName()) {
            case "INSERT":
            case "MODIFY":
            // insert, update code area forAmazon  Elastic Search 
            break;
            case "REMOVE":
            // none
            break;
           
            }
    }

- Reference
https://docs.aws.amazon.com/ko_kr/amazondynamodb/latest/developerguide/Streams.Lambda.html
https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/ES.html
https://github.com/elastic/elasticsearch-js

https://docs.amazonaws.cn/en_us/elasticsearch-service/latest/developerguide/es-aws-integrations.html#es-aws-integrations-dynamodb-es

https://docs.aws.amazon.com/ko_kr/elasticsearch-service/latest/developerguide/aes-dg.pdf

Posted by RDIoT
|

MQTT + Kafka + Amazon ElasticSearch Service [P015]

 

 




https://www.youtube.com/watch?v=-8B8Yp52XMY


* GitHubhttps://github.com/rdiot/rdiot-p015


* Parts

- Arduino UNO

Ethernet W5100 Shield (W5100) [B004]

LCD1602 I2C (LCD1602) [D016]

Photo Resistor Module (KY-018) [S002]

Temperature and humidity DHT22 (DHT22) [S063]

TPM-300 Air Quality Module (TPM-300) [S092]

Samsung ARTIK 5 (ARTIK5-V0.5) [B023]

Raspberry Pi 3 B Model (RASPBERRY-PI-3-B) [B088] x 3ea : for Kafka Cluster

- USB Power Supply

- Ethernet Hub

 

* Contents

1. Amazon ElasticSearch Service Setup

 - Define domain

- Configure cluster


- Set up access

- Review


Amazon Elasticsearch Service dashboard


2. ES Information example 

- ElasticSearch Endpoint : https://search-rdiot-aws-es-2yolnmmxbjghreiywdbh4yrsay.ap-northeast-2.es.amazonaws.com

- Kibana URL : https://search-rdiot-aws-es-2yolnmmxbjghreiywdbh4yrsay.ap-northeast-2.es.amazonaws.com/_plugin/kibana/



3. MQTT - Kafka - Bridge (json)

- source : https://github.com/rdiot/rdiot-p015/blob/master/MqttToKafkaReConn.java

- result 

{"time": 1511884443315,"value":724}

{"time": 1511884444405,"value":724}

{"time": 1511884445488,"value":723}

{"time": 1511884446579,"value":724}

{"time": 1511884447657,"value":726}


4. install logstash : tested logstash-2.4.1 

$ cd /data1/logstash

$ wget https://download.elastic.co/logstash/logstash/logstash-2.4.1.tar.gz

$ tar zxvf logstash-2.4.1.tar.gz

$ ln -s logstash-2.4.1 logstash


$ sudo apt-get install ant texinfo openjdk-8-jdk build-essential

$ git clone https://github.com/jnr/jffi.git

$ export JAVA_HOME="/usr/lib/jvm/java-8-openjdk-armhf"

$ cd jffi

$ ant jar

$ sudo cp build/jni/libjffi-1.2.so /data1/logstash/logstash/vendor/jruby/lib/jni/arm-Linux


5. logstash config : tested logstash-2.4.1 

- source : https://github.com/rdiot/rdiot-p015/blob/master/logstash-kafka-aws-es.conf

input {

   kafka {

   zk_connect => "kafka-pi-01:2181"

   group_id => "logstash"

   topic_id => "cds"

   consumer_threads => 2

   decorate_events => true

   

  }

}


filter {

  json {

    source => "message"

  }

  date {

    match => [ "time", "UNIX"]

    target => "time_new"


  }   

}


output {

  elasticsearch {

    index => "test-%{+YYYY.MM.dd}"

    hosts => ["https://search-rdiot-aws-es-2yolnmmxbjghreiywdbh4yrsay.ap-northeast-2.es.amazonaws.com"]

    codec => json

  }

  stdout {

    codec => "rubydebug"

  }

}


6. Amazon ElasticSearch Query Sample



7. Kibana : Search & Chart 


 



Posted by RDIoT
|