S3 Lambda Trigger + Amazon SQS + SQSReceiver + SparkStreaming [P023]
VIDEO
https://www.youtube.com/watch?v=37gl-tkUa3M
GitHub : https://github.com/rdiot/rdiot-p023
* Parts
- Pi4J + Amazon S3 REST API + Amazon Athena [P019]
- Raspberry Pi2
- Temperature and humidity DHT11 sensor module (KY-015) [S008]
* Contents
1. Create custom role : AWS Lambda required acess to your resources
- Create a new IAM Role
- Role Name : LambdaSQSRole
- Attach policy : AmazonSQSFullAccess
2. S3 Lambda Trigger : Create function
- Function Name : S3toSQS
- Runtime : Node.js 6.10
- Designer & Configure Triggers : Add triggers from the list on the left : S3
: Bucket : s3 bucket name
: Event type : PUT
: Enable trigger
- S3toSQS Function Code
: Source : https://github.com/rdiot/rdiot-p023/blob/master/S3toSQS/index.js
var AWS = require("aws-sdk");
var sqs = new AWS.SQS();
var QUEUE_URL = 'https://sqs.ap-northeast-2.amazonaws.com/996409770277/jobQueue';
exports.handler = (event, context, callback) => {
var sqsParams = {
MessageBody: JSON.stringify(event),
QueueUrl: QUEUE_URL
};
console.log(sqsParams)
var sqsdata = sqs.sendMessage(sqsParams, function(err, data) {
if (err) {
console.log('ERR', err);
}
console.log(data);
context.succeed('Exit');
});
console.log('message sent')
};
3. Create New Queue (Amazon SQS)
- Queue Name : jobQueue
4. Run Pi4j Java 5. Load into Amazon SQS via Amazon Lambda
- full body
{"Records":[{"eventVersion":"2.0","eventSource":"aws:s3","awsRegion":"ap-northeast-2","eventTime":"2018-07-12T15:34:52.533Z","eventName":"ObjectCreated:Put","userIdentity":{"principalId":"AWS:AIDAIQKJB3IKQS4YDOI2Y"},"requestParameters":{"sourceIPAddress":"211.245.211.252"},"responseElements":{"x-amz-request-id":"1ED3B3657F2A08F6","x-amz-id-2":"MzvKf2QuE9OImRCUfc7dwc4YXAgCTQDfhs06Jbd6FQCsZ499R/lhip4j9uSBGebAVwh3Ev7f8RM="},"s3":{"s3SchemaVersion":"1.0","configurationId":"1cbdc078-b185-4621-81de-91c92188a0fa","bucket":{"name":"rdiot-test","ownerIdentity":{"principalId":"A3PLNTCS5VC05F"},"arn":"arn:aws:s3:::rdiot-test"},"object":{"key":"pi_dht11_20187132475.json","size":35,"eTag":"8ff2b7914053e1c8ef3c01323ee35b46","sequencer":"005B47751C774BCE79"}}}]}
6. Spark Streaming with Amazon SQS : Spark-SQS-Receiver (streaming custom receiver)
- SQS Receiver : https://github.com/rdiot/rdiot-p023/blob/master/sparkReceiver/src/main/java/com/rdiot/sparkReceiver/receiver/SQSReceiver.java
- SQS Receiver with Delete : https://github.com/rdiot/rdiot-p023/blob/master/sparkReceiver/src/main/java/com/rdiot/sparkReceiver/SQSWithDelete.java
- SQS Receiver without Delete : https://github.com/rdiot/rdiot-p023/blob/master/sparkReceiver/src/main/java/com/rdiot/sparkReceiver/SQSWithoutDelete.java
- s3 event word count source : https://github.com/rdiot/rdiot-p023/blob/master/sparkReceiver/src/main/java/com/rdiot/sparkReceiver/sample/SQSWordCount.java
package com.rdiot.sparkReceiver.sample;
import java.util.Arrays;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import com.amazonaws.regions.Regions;
import com.rdiot.sparkReceiver.SQSWithoutDelete;
import com.rdiot.sparkReceiver.receiver.SQSReceiver;
import scala.Tuple2;
public class SQSWordCount {
private static Logger logger = Logger.getLogger(SQSWithoutDelete.class);
final static String appName = "sparkSQSReceiver";
final static String master = "local[2]";
final static String queueName = "jobQueue";
final static Duration batchDuration = Durations.seconds(5); // Batch Duration
final static Duration windowDuration = Durations.seconds(5); // TBD
final static Duration slideDuration = Durations.seconds(3); // TBD
public static void main(String[] args) throws InterruptedException {
Logger.getLogger("org").setLevel(Level.OFF);
//Spark Config
SparkConf conf = new SparkConf().setMaster(master).setAppName(appName);
conf.set("spark.testing.memory", "2147480000"); // if you face any memory issues
try (JavaStreamingContext jssc = new JavaStreamingContext(conf, batchDuration)) {
SQSReceiver javaSQSReceiver = new SQSReceiver(queueName) // 메시지큐 즉시 삭제
.with(Regions.AP_NORTHEAST_2);
System.out.println("# Spark Streaming Start");
JavaReceiverInputDStream<String> input = jssc.receiverStream(javaSQSReceiver);
// SQS Messages
/*
input.foreachRDD(rdd->{
rdd.foreach(w->{
System.out.println(w);
});
});
*/
// Word Count
JavaDStream<String> words = input.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();
jssc.start();
jssc.awaitTermination();
} finally {
logger.info("Exiting the Application");
}
}
}
7. Result
- SQS Message
- Word Count : split ':'
* Reference
- AWS Lambda Permissions Model : https://docs.aws.amazon.com/lambda/latest/dg/intro-permission-model.html
- AWS SQS JS : https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html
- Spark Streaming Custom Receiver : http://spark.apache.org/docs/latest/streaming-custom-receivers.html