'Amazon SQS'에 해당되는 글 1건

  1. 2018.10.17 S3 Lambda Trigger + Amazon SQS + SQSReceiver + SparkStreaming [P023]

S3 Lambda Trigger + Amazon SQS + SQSReceiver + SparkStreaming [P023]




https://www.youtube.com/watch?v=37gl-tkUa3M


GitHub : https://github.com/rdiot/rdiot-p023



* Parts

Pi4J + Amazon S3 REST API + Amazon Athena [P019]

Raspberry Pi2

Temperature and humidity DHT11 sensor module (KY-015) [S008]


* Contents

1. Create custom role : AWS Lambda required acess to your resources

- Create a new IAM Role

- Role Name : LambdaSQSRole

- Attach policy : AmazonSQSFullAccess 

2. S3 Lambda Trigger : Create function

- Function Name : S3toSQS

- Runtime : Node.js 6.10


- Designer & Configure Triggers
 : Add triggers from the list on the left : S3
 : Bucket : s3 bucket name
 : Event type : PUT
 : Enable trigger



- S3toSQS Function Code
 : Source : https://github.com/rdiot/rdiot-p023/blob/master/S3toSQS/index.js


var AWS = require("aws-sdk");
var sqs = new AWS.SQS();
var QUEUE_URL = 'https://sqs.ap-northeast-2.amazonaws.com/996409770277/jobQueue';

exports.handler = (event, context, callback) => {

   var sqsParams = {
      MessageBody: JSON.stringify(event),
      QueueUrl: QUEUE_URL
    };
    console.log(sqsParams)

    var sqsdata = sqs.sendMessage(sqsParams, function(err, data) {
      if (err) {
        console.log('ERR', err);
      }
      console.log(data);
      context.succeed('Exit');

    });
    console.log('message sent')
};

3. Create New Queue (Amazon SQS)
- Queue Name : jobQueue


4. Run Pi4j Java 


 
5. Load into Amazon SQS via Amazon Lambda

- full body 

{"Records":[{"eventVersion":"2.0","eventSource":"aws:s3","awsRegion":"ap-northeast-2","eventTime":"2018-07-12T15:34:52.533Z","eventName":"ObjectCreated:Put","userIdentity":{"principalId":"AWS:AIDAIQKJB3IKQS4YDOI2Y"},"requestParameters":{"sourceIPAddress":"211.245.211.252"},"responseElements":{"x-amz-request-id":"1ED3B3657F2A08F6","x-amz-id-2":"MzvKf2QuE9OImRCUfc7dwc4YXAgCTQDfhs06Jbd6FQCsZ499R/lhip4j9uSBGebAVwh3Ev7f8RM="},"s3":{"s3SchemaVersion":"1.0","configurationId":"1cbdc078-b185-4621-81de-91c92188a0fa","bucket":{"name":"rdiot-test","ownerIdentity":{"principalId":"A3PLNTCS5VC05F"},"arn":"arn:aws:s3:::rdiot-test"},"object":{"key":"pi_dht11_20187132475.json","size":35,"eTag":"8ff2b7914053e1c8ef3c01323ee35b46","sequencer":"005B47751C774BCE79"}}}]}

6. Spark Streaming with Amazon SQS : Spark-SQS-Receiver (streaming custom receiver)

- SQS Receiver : https://github.com/rdiot/rdiot-p023/blob/master/sparkReceiver/src/main/java/com/rdiot/sparkReceiver/receiver/SQSReceiver.java

- SQS Receiver with Delete : https://github.com/rdiot/rdiot-p023/blob/master/sparkReceiver/src/main/java/com/rdiot/sparkReceiver/SQSWithDelete.java

- SQS Receiver without Delete : https://github.com/rdiot/rdiot-p023/blob/master/sparkReceiver/src/main/java/com/rdiot/sparkReceiver/SQSWithoutDelete.java


- s3 event word count source : https://github.com/rdiot/rdiot-p023/blob/master/sparkReceiver/src/main/java/com/rdiot/sparkReceiver/sample/SQSWordCount.java


package com.rdiot.sparkReceiver.sample;


import java.util.Arrays;


import org.apache.log4j.Level;

import org.apache.log4j.Logger;

import org.apache.spark.SparkConf;

import org.apache.spark.streaming.Duration;

import org.apache.spark.streaming.Durations;

import org.apache.spark.streaming.api.java.JavaDStream;

import org.apache.spark.streaming.api.java.JavaPairDStream;

import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;

import org.apache.spark.streaming.api.java.JavaStreamingContext;


import com.amazonaws.regions.Regions;

import com.rdiot.sparkReceiver.SQSWithoutDelete;

import com.rdiot.sparkReceiver.receiver.SQSReceiver;


import scala.Tuple2;


public class SQSWordCount {

  private static Logger logger = Logger.getLogger(SQSWithoutDelete.class);

    final static String appName = "sparkSQSReceiver";

    final static String master = "local[2]";

    final static String queueName = "jobQueue";

    

    final static Duration batchDuration = Durations.seconds(5); // Batch Duration 

    final static Duration windowDuration = Durations.seconds(5); // TBD

    final static Duration slideDuration = Durations.seconds(3); // TBD


public static void main(String[] args) throws InterruptedException {

    Logger.getLogger("org").setLevel(Level.OFF);

    //Spark Config 

        SparkConf conf = new SparkConf().setMaster(master).setAppName(appName);

        conf.set("spark.testing.memory", "2147480000");  // if you face any memory issues


try (JavaStreamingContext jssc = new JavaStreamingContext(conf, batchDuration)) {

SQSReceiver javaSQSReceiver = new SQSReceiver(queueName) // 메시지큐 즉시 삭제 

.with(Regions.AP_NORTHEAST_2);

        System.out.println("# Spark Streaming Start");

        

JavaReceiverInputDStream<String> input = jssc.receiverStream(javaSQSReceiver);

// SQS Messages

/*

        input.foreachRDD(rdd->{

            rdd.foreach(w->{

            System.out.println(w);

            });

        });

        */

        // Word Count

        JavaDStream<String> words = input.flatMap(x -> Arrays.asList(x.split(" ")).iterator());

        JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));

        JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);        

        wordCounts.print();


jssc.start();

jssc.awaitTermination();

} finally {

logger.info("Exiting the Application");

}

}


}


7. Result

- SQS Message

- Word Count : split ':'


* Reference

- AWS Lambda Permissions Model : https://docs.aws.amazon.com/lambda/latest/dg/intro-permission-model.html

- AWS SQS JS : https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html

- Spark Streaming Custom Receiver : http://spark.apache.org/docs/latest/streaming-custom-receivers.html

Posted by RDIoT
|