S3 Lambda Trigger + Amazon SQS + SQSReceiver + SparkStreaming [P023]
5) IoT_DataPlatform/Amazon AWS 2018. 10. 17. 19:32S3 Lambda Trigger + Amazon SQS + SQSReceiver + SparkStreaming [P023]
https://www.youtube.com/watch?v=37gl-tkUa3M
GitHub : https://github.com/rdiot/rdiot-p023
* Parts
- Pi4J + Amazon S3 REST API + Amazon Athena [P019]
- Raspberry Pi2
- Temperature and humidity DHT11 sensor module (KY-015) [S008]
* Contents
1. Create custom role : AWS Lambda required acess to your resources
- Create a new IAM Role
- Role Name : LambdaSQSRole
- Attach policy : AmazonSQSFullAccess
2. S3 Lambda Trigger : Create function
- Function Name : S3toSQS
- Runtime : Node.js 6.10
- full body
- SQS Receiver : https://github.com/rdiot/rdiot-p023/blob/master/sparkReceiver/src/main/java/com/rdiot/sparkReceiver/receiver/SQSReceiver.java
- SQS Receiver with Delete : https://github.com/rdiot/rdiot-p023/blob/master/sparkReceiver/src/main/java/com/rdiot/sparkReceiver/SQSWithDelete.java
- SQS Receiver without Delete : https://github.com/rdiot/rdiot-p023/blob/master/sparkReceiver/src/main/java/com/rdiot/sparkReceiver/SQSWithoutDelete.java
- s3 event word count source : https://github.com/rdiot/rdiot-p023/blob/master/sparkReceiver/src/main/java/com/rdiot/sparkReceiver/sample/SQSWordCount.java
package com.rdiot.sparkReceiver.sample;
import java.util.Arrays;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import com.amazonaws.regions.Regions;
import com.rdiot.sparkReceiver.SQSWithoutDelete;
import com.rdiot.sparkReceiver.receiver.SQSReceiver;
import scala.Tuple2;
public class SQSWordCount {
private static Logger logger = Logger.getLogger(SQSWithoutDelete.class);
final static String appName = "sparkSQSReceiver";
final static String master = "local[2]";
final static String queueName = "jobQueue";
final static Duration batchDuration = Durations.seconds(5); // Batch Duration
final static Duration windowDuration = Durations.seconds(5); // TBD
final static Duration slideDuration = Durations.seconds(3); // TBD
public static void main(String[] args) throws InterruptedException {
Logger.getLogger("org").setLevel(Level.OFF);
//Spark Config
SparkConf conf = new SparkConf().setMaster(master).setAppName(appName);
conf.set("spark.testing.memory", "2147480000"); // if you face any memory issues
try (JavaStreamingContext jssc = new JavaStreamingContext(conf, batchDuration)) {
SQSReceiver javaSQSReceiver = new SQSReceiver(queueName) // 메시지큐 즉시 삭제
.with(Regions.AP_NORTHEAST_2);
System.out.println("# Spark Streaming Start");
JavaReceiverInputDStream<String> input = jssc.receiverStream(javaSQSReceiver);
// SQS Messages
/*
input.foreachRDD(rdd->{
rdd.foreach(w->{
System.out.println(w);
});
});
*/
// Word Count
JavaDStream<String> words = input.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();
jssc.start();
jssc.awaitTermination();
} finally {
logger.info("Exiting the Application");
}
}
}
7. Result
- SQS Message
- Word Count : split ':'
* Reference
- AWS Lambda Permissions Model : https://docs.aws.amazon.com/lambda/latest/dg/intro-permission-model.html
- AWS SQS JS : https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html
- Spark Streaming Custom Receiver : http://spark.apache.org/docs/latest/streaming-custom-receivers.html
'5) IoT_DataPlatform > Amazon AWS' 카테고리의 다른 글
DynamoDB Stream + Amazon ES (ElasticSearch, Kibana) [P024] (0) | 2019.04.02 |
---|---|
Pi4J + Amazon S3 REST API + S3 Lambda Trigger + DynamoDB [P022] (0) | 2018.06.17 |
AWS IoT Core + Raspberry Pi + AWS IoT Device SDK for Java [P021] (0) | 2018.06.15 |
Pi4J + Amazon S3 REST API + Amazon Athena [P019] (0) | 2018.04.11 |
Amazon Polly + Google Home + Sonoff wifi [P018] (0) | 2018.04.04 |