S3 Lambda Trigger + Amazon SQS + SQSReceiver + SparkStreaming [P023]




https://www.youtube.com/watch?v=37gl-tkUa3M


GitHub : https://github.com/rdiot/rdiot-p023



* Parts

Pi4J + Amazon S3 REST API + Amazon Athena [P019]

Raspberry Pi2

Temperature and humidity DHT11 sensor module (KY-015) [S008]


* Contents

1. Create custom role : AWS Lambda required acess to your resources

- Create a new IAM Role

- Role Name : LambdaSQSRole

- Attach policy : AmazonSQSFullAccess 

2. S3 Lambda Trigger : Create function

- Function Name : S3toSQS

- Runtime : Node.js 6.10


- Designer & Configure Triggers
 : Add triggers from the list on the left : S3
 : Bucket : s3 bucket name
 : Event type : PUT
 : Enable trigger



- S3toSQS Function Code
 : Source : https://github.com/rdiot/rdiot-p023/blob/master/S3toSQS/index.js


var AWS = require("aws-sdk");
var sqs = new AWS.SQS();
var QUEUE_URL = 'https://sqs.ap-northeast-2.amazonaws.com/996409770277/jobQueue';

exports.handler = (event, context, callback) => {

   var sqsParams = {
      MessageBody: JSON.stringify(event),
      QueueUrl: QUEUE_URL
    };
    console.log(sqsParams)

    var sqsdata = sqs.sendMessage(sqsParams, function(err, data) {
      if (err) {
        console.log('ERR', err);
      }
      console.log(data);
      context.succeed('Exit');

    });
    console.log('message sent')
};

3. Create New Queue (Amazon SQS)
- Queue Name : jobQueue


4. Run Pi4j Java 


 
5. Load into Amazon SQS via Amazon Lambda

- full body 

{"Records":[{"eventVersion":"2.0","eventSource":"aws:s3","awsRegion":"ap-northeast-2","eventTime":"2018-07-12T15:34:52.533Z","eventName":"ObjectCreated:Put","userIdentity":{"principalId":"AWS:AIDAIQKJB3IKQS4YDOI2Y"},"requestParameters":{"sourceIPAddress":"211.245.211.252"},"responseElements":{"x-amz-request-id":"1ED3B3657F2A08F6","x-amz-id-2":"MzvKf2QuE9OImRCUfc7dwc4YXAgCTQDfhs06Jbd6FQCsZ499R/lhip4j9uSBGebAVwh3Ev7f8RM="},"s3":{"s3SchemaVersion":"1.0","configurationId":"1cbdc078-b185-4621-81de-91c92188a0fa","bucket":{"name":"rdiot-test","ownerIdentity":{"principalId":"A3PLNTCS5VC05F"},"arn":"arn:aws:s3:::rdiot-test"},"object":{"key":"pi_dht11_20187132475.json","size":35,"eTag":"8ff2b7914053e1c8ef3c01323ee35b46","sequencer":"005B47751C774BCE79"}}}]}

6. Spark Streaming with Amazon SQS : Spark-SQS-Receiver (streaming custom receiver)

- SQS Receiver : https://github.com/rdiot/rdiot-p023/blob/master/sparkReceiver/src/main/java/com/rdiot/sparkReceiver/receiver/SQSReceiver.java

- SQS Receiver with Delete : https://github.com/rdiot/rdiot-p023/blob/master/sparkReceiver/src/main/java/com/rdiot/sparkReceiver/SQSWithDelete.java

- SQS Receiver without Delete : https://github.com/rdiot/rdiot-p023/blob/master/sparkReceiver/src/main/java/com/rdiot/sparkReceiver/SQSWithoutDelete.java


- s3 event word count source : https://github.com/rdiot/rdiot-p023/blob/master/sparkReceiver/src/main/java/com/rdiot/sparkReceiver/sample/SQSWordCount.java


package com.rdiot.sparkReceiver.sample;


import java.util.Arrays;


import org.apache.log4j.Level;

import org.apache.log4j.Logger;

import org.apache.spark.SparkConf;

import org.apache.spark.streaming.Duration;

import org.apache.spark.streaming.Durations;

import org.apache.spark.streaming.api.java.JavaDStream;

import org.apache.spark.streaming.api.java.JavaPairDStream;

import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;

import org.apache.spark.streaming.api.java.JavaStreamingContext;


import com.amazonaws.regions.Regions;

import com.rdiot.sparkReceiver.SQSWithoutDelete;

import com.rdiot.sparkReceiver.receiver.SQSReceiver;


import scala.Tuple2;


public class SQSWordCount {

  private static Logger logger = Logger.getLogger(SQSWithoutDelete.class);

    final static String appName = "sparkSQSReceiver";

    final static String master = "local[2]";

    final static String queueName = "jobQueue";

    

    final static Duration batchDuration = Durations.seconds(5); // Batch Duration 

    final static Duration windowDuration = Durations.seconds(5); // TBD

    final static Duration slideDuration = Durations.seconds(3); // TBD


public static void main(String[] args) throws InterruptedException {

    Logger.getLogger("org").setLevel(Level.OFF);

    //Spark Config 

        SparkConf conf = new SparkConf().setMaster(master).setAppName(appName);

        conf.set("spark.testing.memory", "2147480000");  // if you face any memory issues


try (JavaStreamingContext jssc = new JavaStreamingContext(conf, batchDuration)) {

SQSReceiver javaSQSReceiver = new SQSReceiver(queueName) // 메시지큐 즉시 삭제 

.with(Regions.AP_NORTHEAST_2);

        System.out.println("# Spark Streaming Start");

        

JavaReceiverInputDStream<String> input = jssc.receiverStream(javaSQSReceiver);

// SQS Messages

/*

        input.foreachRDD(rdd->{

            rdd.foreach(w->{

            System.out.println(w);

            });

        });

        */

        // Word Count

        JavaDStream<String> words = input.flatMap(x -> Arrays.asList(x.split(" ")).iterator());

        JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));

        JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);        

        wordCounts.print();


jssc.start();

jssc.awaitTermination();

} finally {

logger.info("Exiting the Application");

}

}


}


7. Result

- SQS Message

- Word Count : split ':'


* Reference

- AWS Lambda Permissions Model : https://docs.aws.amazon.com/lambda/latest/dg/intro-permission-model.html

- AWS SQS JS : https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html

- Spark Streaming Custom Receiver : http://spark.apache.org/docs/latest/streaming-custom-receivers.html

Posted by RD IoT RDIoT

댓글을 달아 주세요

TDA7492P Bluetooth 4.0 Amplifier Board 8-25V 50W (TDA7492P) [D091]


https://www.youtube.com/watch?v=bO7p7wcF7wA


* Specs

- Description

TDA7492P digital amplifier chip, designed to optimize the circuit layout, the sound quality is very good, with big power. 

DIP switches can be more easily direcly adjusted after-class amplifier gain, to better promote the impedance larger speakers. 


- Specs

Net weight: approx. 33g 

Product size: approx.80*50*15mm/3.14*1.96*0.59'' 

Power: 2.1 interface DC 8~25V power supply 

Audio input: stereo input various versions of the Bluetooth connection, using CSP8635 Bluetooth chip, Bluetooth V4.0, high software compatibility. 

Output Impedance: 8 ohms Best(4,6,8,16 ohm impedance can drive speakers) 

Output power: Add pre-5532 output foot(powered 25V, 6-ohm impedance output 2 X 50W, 8ohm impedance output 2 X 40W dual-channel) 



Posted by RD IoT RDIoT

댓글을 달아 주세요

Pi4J + Amazon S3 REST API + S3 Lambda Trigger + DynamoDB [P022]





https://www.youtube.com/watch?v=LHZBQmybbqc


GitHub : https://github.com/rdiot/rdiot-p022


* Parts

- Pi4J + Amazon S3 REST API + Amazon Athena [P019]

Raspberry Pi2

- Temperature and humidity DHT11 sensor module (KY-015) [S008]


* Contents

1. S3 Upload 

- Reference : Pi4J + Amazon S3 REST API + Amazon Athena [P019]


2. Run Java Application (pi4j)

- Source : 

java -Dpi4j.linking=dynamic -jar pi4j_s3rest-0.0.1-SNAPSHOT.jar




3.  S3 Storage Monitoring

   

4. AWS Lambda Monitoring



5. AWS DynamoDB Monitoring


6. AWS Lambda Source (node.js) : S3toDynamoDB

- source code : https://github.com/rdiot/rdiot-p022/blob/master/S3toDynamoDB/index.js

'use strict';

console.log('RDIoT S3toDynamoDB Loading post function');

var AWS = require('aws-sdk'); 

var s3 = new AWS.S3();

var dynamo = new AWS.DynamoDB.DocumentClient();


exports.handler = function(event, context, callback) {

    console.log('Received event:', JSON.stringify(event, null, 2));

    var bucket = event.Records[0]['s3']['bucket']['name'];

    var en = event.Records[0]['eventName'];

    var et = event.Records[0]['eventTime'];

    var key = event.Records[0]['s3']['object']['key'];

    var sip = event.Records[0]['requestParameters']['sourceIPAddress'];


    var params1 = {Bucket: bucket, Key: key}; 

    s3.getObject( params1, function(err,data) {

        if(err) {

            console.log(err.stack);

            callback(err);

        } else {

            var value = data.Body.toString('ascii')

            console.log(data);

            console.log("Raw text:\n" + value);


            var params2 = {

                TableName: "pi-sensor",

                Item:{

                    "id": et,

                    "event" : en,

                    "bucket": bucket,

                    "key" : key,

                    "sip": sip,

                    "value" : value

                }

            };

            console.log("Gettings IoT device details...");


            //S3 to DynamoDB

            dynamo.put(params2, function(err, data) {

                if(err) {

                    console.error("Unable to post devices. Error JSON:", JSON.stringify(err, null, 2));

                    context.fail();

                } else {

                    console.log("keepet data:", JSON.stringify(data, null, 2));

                    context.succeed('success post');

                }


            });

        }

    });

}



7. Lambda : s3 put event sample

{
    "Records": [
      {
        "awsRegion": "ap-northeast-2",
        "eventName": "ObjectCreated:Put",
        "eventSource": "aws:s3",
        "eventTime": "2018-06-30T12:04:47.253Z",
        "eventVersion": "2.0",
        "requestParameters": {
          "sourceIPAddress": "211.245.211.252"
        },
        "responseElements": {
          "x-amz-id-2": "PTHGAl4fTlMHW/2lurF0HaruzgLtTnG1JyUlWC9o9AM21jLLUQ5Dp3LH1Ur0uVltB6L7BqDs6Ts=",
          "x-amz-request-id": "31CCB31CF0807321"
        },
        "s3": {
          "bucket": {
            "arn": "arn:aws:s3:::rdiot-test",
            "name": "rdiot-test",
            "ownerIdentity": {
              "principalId": "A3PLNTCS5VC05F"
            }
          },
          "configurationId": "174810a9-e50b-484e-bfda-eafed1ff2ec3",
          "object": {
            "eTag": "5d81121100d6ec386ee75237c8eb3549",
            "key": "pi_dht11_20186300444.json",
            "sequencer": "005B3771DF3531F6C1",
            "size": 35
          },
          "s3SchemaVersion": "1.0"
        },
        "userIdentity": {
          "principalId": "AWS:AIDAIQKJB3IKQS4YDOI2Y"
        }
      }
    ]
  }


8. Lambda : Context Sample

{
    "awsRequestId": "45bca188-7c5e-11e8-9d80-cdcb57e017f4",
    "callbackWaitsForEmptyEventLoop": true,
    "functionName": "S3toDynamoDB",
    "functionVersion": "$LATEST",
    "invokedFunctionArn": "arn:aws:lambda:ap-northeast-2:996409770277:function:S3toDynamoDB",
    "invokeid": "45bca188-7c5e-11e8-9d80-cdcb57e017f4",
    "logGroupName": "/aws/lambda/S3toDynamoDB",
    "logStreamName": "2018/06/30/[$LATEST]16b94689af7d4257bcc75dffc45798db",
    "memoryLimitInMB": "128"
  }


9. Lambda : S3 Object GET Data Sample

2018-06-30T14:04:12.852Z 775a8d39-7c6e-11e8-bc88-e9ab4a76e0af { AcceptRanges: 'bytes',
  LastModified: 2018-06-30T12:04:48.000Z,
  ContentLength: 35,
  ETag: '"5d81121100d6ec386ee75237c8eb3549"',
  CacheControl: 'no-cache',
  ContentType: 'binary/octet-stream',
  Metadata: {},
  StorageClass: 'REDUCED_REDUNDANCY',
  Body: <Buffer 7b 22 6e 61 6d 65 22 3a 22 74 65 6d 70 65 72 61 74 75 72 65 22 2c 22 76 61 6c 75 65 22 3a 33 30 2e 30 7d> }
2018-06-30T14:04:12.853Z 775a8d39-7c6e-11e8-bc88-e9ab4a76e0af Raw text:
{"name":"temperature","value":30.0}


Posted by RD IoT RDIoT

댓글을 달아 주세요