DynamoDB Stream + Amazon ES (ElasticSearch, Kibana) [P024]




* Parts

- Raspberry Pi 

- AWS Lambda Java Project in Ecplise


* Contents

1. Upload sensor data to json file using the Amazon S3 Rest API

Pi4J + Amazon S3 REST API + Amazon Athena [P019]



2. Amazon Lambda S3 Trigger (node.js) : S3toDynamoDB

- source code : https://github.com/rdiot/rdiot-p022/blob/master/S3toDynamoDB/index.js

'use strict';

console.log('RDIoT S3toDynamoDB Loading post function');

var AWS = require('aws-sdk'); 

var s3 = new AWS.S3();

var dynamo = new AWS.DynamoDB.DocumentClient();


exports.handler = function(event, context, callback) {

    console.log('Received event:', JSON.stringify(event, null, 2));

    var bucket = event.Records[0]['s3']['bucket']['name'];

    var en = event.Records[0]['eventName'];

    var et = event.Records[0]['eventTime'];

    var key = event.Records[0]['s3']['object']['key'];

    var sip = event.Records[0]['requestParameters']['sourceIPAddress'];


    var params1 = {Bucket: bucket, Key: key}; 

    s3.getObject( params1, function(err,data) {

        if(err) {

            console.log(err.stack);

            callback(err);

        } else {

            var value = data.Body.toString('ascii')

            console.log(data);

            console.log("Raw text:\n" + value);


            var params2 = {

                TableName: "pi-sensor",

                Item:{

                    "id": et,

                    "event" : en,

                    "bucket": bucket,

                    "key" : key,

                    "sip": sip,

                    "value" : value

                }

            };

            console.log("Gettings IoT device details...");


            //S3 to DynamoDB

            dynamo.put(params2, function(err, data) {

                if(err) {

                    console.error("Unable to post devices. Error JSON:", JSON.stringify(err, null, 2));

                    context.fail();

                } else {

                    console.log("keepet data:", JSON.stringify(data, null, 2));

                    context.succeed('success post');

                }


            });

        }

    });

}


3. AWS DynamoDB Monitoring




4. Create IAM Role

- It must have basic Amazon ES, DynamoDB, and Lambda execution permissions

- create custom role name : LambdaDynamoDBtoES


{

  "Version": "2012-10-17",

  "Statement": [

    {

      "Effect": "Allow",

      "Action": [

        "es:ESHttpPost",

        "es:ESHttpPut",

        "dynamodb:DescribeStream",

        "dynamodb:GetRecords",

        "dynamodb:GetShardIterator",

        "dynamodb:ListStreams",

        "logs:CreateLogGroup",

        "logs:CreateLogStream",

        "logs:PutLogEvents"

      ],

      "Resource": "*"

    }

  ]

}


5. Setup Amazon ES (Elasticsearch + Kibana)

- Define domain

- Configure cluster

- Set up access

- Review

- Service dashboard

6. Amazon Lambda DynamoDB Stream Trigger (java) : DynamoDBtoES




7. Develop AWS Lambda Java Project in Eclipse

- New Project : AWS Lambda Java Project


- New AWS Maven Project : awsLambdaDynamoDBStreamtoAmazonES

 : Class Name: LambdaFunctionHandler

 : Input Type : Dynamodb Event


- Check the items of DynamoDB when running pi4j


- Check the insert event message in the Cloud Watch Log 

 : public Integer handleRequest(DynamodbEvent event, Context context) {

   context.getLogger().log("Received event: " + event);

{ApproximateCreationDateTime: Mon Aug 13 12:07:00 UTC 2018,Keys: {id={S: 2018-08-13T12:07:12.024Z,}},NewImage: {bucket={S: rdiot-test,}, id={S: 2018-08-13T12:07:12.024Z,}, sip={S: 211.245.211.252,}, event={S: ObjectCreated:Put,}, value={S: {"name":"temperature","value":33.0},}, key={S: pi_dht11_2018813079.json,}},SequenceNumber: 187352800000000003101012958,SizeBytes: 175,StreamViewType: NEW_AND_OLD_IMAGES}



- Key Code
for (DynamodbStreamRecord record : event.getRecords()) {
            context.getLogger().log(record.getEventID());
            context.getLogger().log(record.getEventName());
            context.getLogger().log(record.getDynamodb().toString());
            switch (record.getEventName()) {
            case "INSERT":
            case "MODIFY":
            // insert, update code area forAmazon  Elastic Search 
            break;
            case "REMOVE":
            // none
            break;
           
            }
    }

- Reference
https://docs.aws.amazon.com/ko_kr/amazondynamodb/latest/developerguide/Streams.Lambda.html
https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/ES.html
https://github.com/elastic/elasticsearch-js

https://docs.amazonaws.cn/en_us/elasticsearch-service/latest/developerguide/es-aws-integrations.html#es-aws-integrations-dynamodb-es

https://docs.aws.amazon.com/ko_kr/elasticsearch-service/latest/developerguide/aes-dg.pdf

Posted by RDIoT
|

S3 Lambda Trigger + Amazon SQS + SQSReceiver + SparkStreaming [P023]




https://www.youtube.com/watch?v=37gl-tkUa3M


GitHub : https://github.com/rdiot/rdiot-p023



* Parts

Pi4J + Amazon S3 REST API + Amazon Athena [P019]

Raspberry Pi2

Temperature and humidity DHT11 sensor module (KY-015) [S008]


* Contents

1. Create custom role : AWS Lambda required acess to your resources

- Create a new IAM Role

- Role Name : LambdaSQSRole

- Attach policy : AmazonSQSFullAccess 

2. S3 Lambda Trigger : Create function

- Function Name : S3toSQS

- Runtime : Node.js 6.10


- Designer & Configure Triggers
 : Add triggers from the list on the left : S3
 : Bucket : s3 bucket name
 : Event type : PUT
 : Enable trigger



- S3toSQS Function Code
 : Source : https://github.com/rdiot/rdiot-p023/blob/master/S3toSQS/index.js


var AWS = require("aws-sdk");
var sqs = new AWS.SQS();
var QUEUE_URL = 'https://sqs.ap-northeast-2.amazonaws.com/996409770277/jobQueue';

exports.handler = (event, context, callback) => {

   var sqsParams = {
      MessageBody: JSON.stringify(event),
      QueueUrl: QUEUE_URL
    };
    console.log(sqsParams)

    var sqsdata = sqs.sendMessage(sqsParams, function(err, data) {
      if (err) {
        console.log('ERR', err);
      }
      console.log(data);
      context.succeed('Exit');

    });
    console.log('message sent')
};

3. Create New Queue (Amazon SQS)
- Queue Name : jobQueue


4. Run Pi4j Java 


 
5. Load into Amazon SQS via Amazon Lambda

- full body 

{"Records":[{"eventVersion":"2.0","eventSource":"aws:s3","awsRegion":"ap-northeast-2","eventTime":"2018-07-12T15:34:52.533Z","eventName":"ObjectCreated:Put","userIdentity":{"principalId":"AWS:AIDAIQKJB3IKQS4YDOI2Y"},"requestParameters":{"sourceIPAddress":"211.245.211.252"},"responseElements":{"x-amz-request-id":"1ED3B3657F2A08F6","x-amz-id-2":"MzvKf2QuE9OImRCUfc7dwc4YXAgCTQDfhs06Jbd6FQCsZ499R/lhip4j9uSBGebAVwh3Ev7f8RM="},"s3":{"s3SchemaVersion":"1.0","configurationId":"1cbdc078-b185-4621-81de-91c92188a0fa","bucket":{"name":"rdiot-test","ownerIdentity":{"principalId":"A3PLNTCS5VC05F"},"arn":"arn:aws:s3:::rdiot-test"},"object":{"key":"pi_dht11_20187132475.json","size":35,"eTag":"8ff2b7914053e1c8ef3c01323ee35b46","sequencer":"005B47751C774BCE79"}}}]}

6. Spark Streaming with Amazon SQS : Spark-SQS-Receiver (streaming custom receiver)

- SQS Receiver : https://github.com/rdiot/rdiot-p023/blob/master/sparkReceiver/src/main/java/com/rdiot/sparkReceiver/receiver/SQSReceiver.java

- SQS Receiver with Delete : https://github.com/rdiot/rdiot-p023/blob/master/sparkReceiver/src/main/java/com/rdiot/sparkReceiver/SQSWithDelete.java

- SQS Receiver without Delete : https://github.com/rdiot/rdiot-p023/blob/master/sparkReceiver/src/main/java/com/rdiot/sparkReceiver/SQSWithoutDelete.java


- s3 event word count source : https://github.com/rdiot/rdiot-p023/blob/master/sparkReceiver/src/main/java/com/rdiot/sparkReceiver/sample/SQSWordCount.java


package com.rdiot.sparkReceiver.sample;


import java.util.Arrays;


import org.apache.log4j.Level;

import org.apache.log4j.Logger;

import org.apache.spark.SparkConf;

import org.apache.spark.streaming.Duration;

import org.apache.spark.streaming.Durations;

import org.apache.spark.streaming.api.java.JavaDStream;

import org.apache.spark.streaming.api.java.JavaPairDStream;

import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;

import org.apache.spark.streaming.api.java.JavaStreamingContext;


import com.amazonaws.regions.Regions;

import com.rdiot.sparkReceiver.SQSWithoutDelete;

import com.rdiot.sparkReceiver.receiver.SQSReceiver;


import scala.Tuple2;


public class SQSWordCount {

  private static Logger logger = Logger.getLogger(SQSWithoutDelete.class);

    final static String appName = "sparkSQSReceiver";

    final static String master = "local[2]";

    final static String queueName = "jobQueue";

    

    final static Duration batchDuration = Durations.seconds(5); // Batch Duration 

    final static Duration windowDuration = Durations.seconds(5); // TBD

    final static Duration slideDuration = Durations.seconds(3); // TBD


public static void main(String[] args) throws InterruptedException {

    Logger.getLogger("org").setLevel(Level.OFF);

    //Spark Config 

        SparkConf conf = new SparkConf().setMaster(master).setAppName(appName);

        conf.set("spark.testing.memory", "2147480000");  // if you face any memory issues


try (JavaStreamingContext jssc = new JavaStreamingContext(conf, batchDuration)) {

SQSReceiver javaSQSReceiver = new SQSReceiver(queueName) // 메시지큐 즉시 삭제 

.with(Regions.AP_NORTHEAST_2);

        System.out.println("# Spark Streaming Start");

        

JavaReceiverInputDStream<String> input = jssc.receiverStream(javaSQSReceiver);

// SQS Messages

/*

        input.foreachRDD(rdd->{

            rdd.foreach(w->{

            System.out.println(w);

            });

        });

        */

        // Word Count

        JavaDStream<String> words = input.flatMap(x -> Arrays.asList(x.split(" ")).iterator());

        JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));

        JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);        

        wordCounts.print();


jssc.start();

jssc.awaitTermination();

} finally {

logger.info("Exiting the Application");

}

}


}


7. Result

- SQS Message

- Word Count : split ':'


* Reference

- AWS Lambda Permissions Model : https://docs.aws.amazon.com/lambda/latest/dg/intro-permission-model.html

- AWS SQS JS : https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html

- Spark Streaming Custom Receiver : http://spark.apache.org/docs/latest/streaming-custom-receivers.html

Posted by RDIoT
|

Pi4J + Amazon S3 REST API + S3 Lambda Trigger + DynamoDB [P022]





https://www.youtube.com/watch?v=LHZBQmybbqc


GitHub : https://github.com/rdiot/rdiot-p022


* Parts

- Pi4J + Amazon S3 REST API + Amazon Athena [P019]

Raspberry Pi2

- Temperature and humidity DHT11 sensor module (KY-015) [S008]


* Contents

1. S3 Upload 

- Reference : Pi4J + Amazon S3 REST API + Amazon Athena [P019]


2. Run Java Application (pi4j)

- Source : 

java -Dpi4j.linking=dynamic -jar pi4j_s3rest-0.0.1-SNAPSHOT.jar




3.  S3 Storage Monitoring

   

4. AWS Lambda Monitoring



5. AWS DynamoDB Monitoring


6. AWS Lambda Source (node.js) : S3toDynamoDB

- source code : https://github.com/rdiot/rdiot-p022/blob/master/S3toDynamoDB/index.js

'use strict';

console.log('RDIoT S3toDynamoDB Loading post function');

var AWS = require('aws-sdk'); 

var s3 = new AWS.S3();

var dynamo = new AWS.DynamoDB.DocumentClient();


exports.handler = function(event, context, callback) {

    console.log('Received event:', JSON.stringify(event, null, 2));

    var bucket = event.Records[0]['s3']['bucket']['name'];

    var en = event.Records[0]['eventName'];

    var et = event.Records[0]['eventTime'];

    var key = event.Records[0]['s3']['object']['key'];

    var sip = event.Records[0]['requestParameters']['sourceIPAddress'];


    var params1 = {Bucket: bucket, Key: key}; 

    s3.getObject( params1, function(err,data) {

        if(err) {

            console.log(err.stack);

            callback(err);

        } else {

            var value = data.Body.toString('ascii')

            console.log(data);

            console.log("Raw text:\n" + value);


            var params2 = {

                TableName: "pi-sensor",

                Item:{

                    "id": et,

                    "event" : en,

                    "bucket": bucket,

                    "key" : key,

                    "sip": sip,

                    "value" : value

                }

            };

            console.log("Gettings IoT device details...");


            //S3 to DynamoDB

            dynamo.put(params2, function(err, data) {

                if(err) {

                    console.error("Unable to post devices. Error JSON:", JSON.stringify(err, null, 2));

                    context.fail();

                } else {

                    console.log("keepet data:", JSON.stringify(data, null, 2));

                    context.succeed('success post');

                }


            });

        }

    });

}



7. Lambda : s3 put event sample

{
    "Records": [
      {
        "awsRegion": "ap-northeast-2",
        "eventName": "ObjectCreated:Put",
        "eventSource": "aws:s3",
        "eventTime": "2018-06-30T12:04:47.253Z",
        "eventVersion": "2.0",
        "requestParameters": {
          "sourceIPAddress": "211.245.211.252"
        },
        "responseElements": {
          "x-amz-id-2": "PTHGAl4fTlMHW/2lurF0HaruzgLtTnG1JyUlWC9o9AM21jLLUQ5Dp3LH1Ur0uVltB6L7BqDs6Ts=",
          "x-amz-request-id": "31CCB31CF0807321"
        },
        "s3": {
          "bucket": {
            "arn": "arn:aws:s3:::rdiot-test",
            "name": "rdiot-test",
            "ownerIdentity": {
              "principalId": "A3PLNTCS5VC05F"
            }
          },
          "configurationId": "174810a9-e50b-484e-bfda-eafed1ff2ec3",
          "object": {
            "eTag": "5d81121100d6ec386ee75237c8eb3549",
            "key": "pi_dht11_20186300444.json",
            "sequencer": "005B3771DF3531F6C1",
            "size": 35
          },
          "s3SchemaVersion": "1.0"
        },
        "userIdentity": {
          "principalId": "AWS:AIDAIQKJB3IKQS4YDOI2Y"
        }
      }
    ]
  }


8. Lambda : Context Sample

{
    "awsRequestId": "45bca188-7c5e-11e8-9d80-cdcb57e017f4",
    "callbackWaitsForEmptyEventLoop": true,
    "functionName": "S3toDynamoDB",
    "functionVersion": "$LATEST",
    "invokedFunctionArn": "arn:aws:lambda:ap-northeast-2:996409770277:function:S3toDynamoDB",
    "invokeid": "45bca188-7c5e-11e8-9d80-cdcb57e017f4",
    "logGroupName": "/aws/lambda/S3toDynamoDB",
    "logStreamName": "2018/06/30/[$LATEST]16b94689af7d4257bcc75dffc45798db",
    "memoryLimitInMB": "128"
  }


9. Lambda : S3 Object GET Data Sample

2018-06-30T14:04:12.852Z 775a8d39-7c6e-11e8-bc88-e9ab4a76e0af { AcceptRanges: 'bytes',
  LastModified: 2018-06-30T12:04:48.000Z,
  ContentLength: 35,
  ETag: '"5d81121100d6ec386ee75237c8eb3549"',
  CacheControl: 'no-cache',
  ContentType: 'binary/octet-stream',
  Metadata: {},
  StorageClass: 'REDUCED_REDUNDANCY',
  Body: <Buffer 7b 22 6e 61 6d 65 22 3a 22 74 65 6d 70 65 72 61 74 75 72 65 22 2c 22 76 61 6c 75 65 22 3a 33 30 2e 30 7d> }
2018-06-30T14:04:12.853Z 775a8d39-7c6e-11e8-bc88-e9ab4a76e0af Raw text:
{"name":"temperature","value":30.0}


Posted by RDIoT
|

Pi4J + Amazon S3 REST API + Amazon Athena [P019]






https://www.youtube.com/watch?v=G-Ot7oh4_jk


GitHub : https://github.com/rdiot/rdiot-p019.git


* Parts

- Raspberry Pi2

Temperature and humidity DHT11 sensor module (KY-015) [S008]



* Contents

- Connect 

S - Signal GPIO3

middle - VCC

- - GND


1. Getting sensor value by Pi4J (Java I/O library for the Raspberry Pi) and then upload to amazon s3.

 : Run the java application

$ java -Dpi4j.linking=dynamic -jar pi4j_s3rest-0.0.1-SNAPSHOT.jar


2. Check the upload status in the amazon console. (s3)




3. Setup Amazon Athena .

- add database : pisensor

- add table : temperature


- DDL (add table)

CREATE EXTERNAL TABLE IF NOT EXISTS piSensor.temperature (

  `name` string,

  `value` float 

)

ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'

WITH SERDEPROPERTIES (

  'serialization.format' = '1'

) LOCATION 's3://rdkim-test/'

TBLPROPERTIES ('has_encrypted_data'='false');


4. SQL Query in the Amazon Athena

example : SELECT * FROM "pisensor"."temperature" limit 10;



- Pi4J Java Maven

<dependency>

    <groupId>com.pi4j</groupId>

    <artifactId>pi4j-core</artifactId>

   <version>1.1</version>

</dependency>


- project maven pom.xml 

 : https://github.com/rdiot/rdiot-p019/blob/master/pom.xml


- source : main : https://github.com/rdiot/rdiot-p019/blob/master/pi4j_s3rest/App.java

- source : pi4j : https://github.com/rdiot/rdiot-p019/blob/master/pi4j/dht11.java

- source : s3 rest api header aws sig4 : https://github.com/rdiot/rdiot-p019/blob/master/s3rest/auth/AWS4SignerForAuthorizationHeader.java


- Key Code

     dht11 dht = new dht11();

   

        for (int i=0; i<10; i++) {

            try {

Thread.sleep(2000);

} catch (InterruptedException e) {

e.printStackTrace();

}

            

            objectContent = dht.getTemperature();

            

            if(objectContent != null) {

        System.out.println(objectContent);

        putS3Object(bucketName, regionName, awsAccessKey, awsSecretKey);

            break;

            }            

         }     



- Reference

 : Pi4J Project : http://pi4j.com

 : Pi4J GitHub : https://github.com/Pi4J/pi4j

 : get dht11 : https://stackoverflow.com/questions/28486159/read-temperature-from-dht11-using-pi4j

 : Amazon S3 Rest API : https://docs.aws.amazon.com/AmazonS3/latest/API/Welcome.html


Posted by RDIoT
|