[Spark Streaming DStreams]
※ RDD의 경우와 마찬가지로 변환을 통한 DStream의 데이터를 수정이 가능하며 제공되는 기능은 아래와 같다.
|
이제 예제를 통해(그냥 개인적으로 만든거라 안되면 문의) 사용해 보자.
안될 경우 Spark 버전을 확인하고 Scala 버전 확인 후 Dependency 체크
그리고 버전에 따른 변경된 문법을 확인 (1.6 -> 2.1로 변경하면서 살짝쿵 틀린 부분 있었음)
▶예제 요약
- 데이터를 받아서 기본 연산을 편하게 하기 위해 변환 후 연산 결과를 DB에 저장하는 예제를 진행 - ( 흠.... 그냥 많이 사용 되는거 같아서 ?..... ) |
▶예제 내용
(1)커넥션 이후 데이터 받고(JavaReceiverInputDStream) 이를, N개로 분리(JavaDStream) (2)데이터 필터링 (3)평균, 합, 카운트 등의 연산을 쉽게하기 위한 Key-Value 로 변환 (JavaDStream => JavaPairDStream) (4)groupByKey를 이용한 동일한 Key 값에 대한 Value 그룹핑 (5)mapValues를 이용한 Key 값별 개수 (6)mapValues를 이용한 Value 값만 변경 (7)foreachRDD를 이용한 개별 데이터 처리 (8)foreachRDD로 MongoDB 저장 (JavaDStream) |
(1) flatMap
사용예 : 띄워쓰기 콤마 등의 구분자로 Text를 개별 문자로 분리 Array 데이터 하나로 분리1개의 데이터를 N개로 분리 등
JavaReceiverInputDStream<String> data = 커넥션 JavaDStream<String> flatMap_sample = data.flatMap(new FlatMapFunction<String, String>() { |
(2) filter
사용예 : 개별 데이터를 조건으로 특정 데이터만 선택 또는 제외 JavaDStream<String> filter_sample = flatMap_sample.filter(new Function<String, Boolean>() { (예) 데이터에 문자 A가 있으면 제외시켜라 if( dump.contains("A") ){ return true; } else{ return false; } } |
(3) mapToPair
사용예 : 개별 데이터에서 Key, Value 값으로 설정할 정보 추출
JavaPairDStream<String, Integer> map_example = filter_sample.mapToPair( return new Tuple2<String, Integer>(키, 값);
|
(4) groupByKey
사용예 : 동일한 Key 값에 대한 Value 그룹핑 JavaPairDStream<String, Iterable<Integer>> group_example = map_example.groupByKey(); [데이터 예] A : 1 A : 2 A : 8 B : 4 C : 9 B : 12 .... [변환 결과] A : [ 1, 2, 8, 12 .. ] B : [ 4, .. ] C : [ 9, .. ] |
(5) countByValue
사용예 : Key 별 갯수 JavaPairDStream<String, Long> count_example = map_example.countByValue(); [데이터 예] A : 1 A : 2 B : 8 B : 4 C : 9 [변환 결과] A : 2.0개 B : 2.0개 C : 1.0개 |
(6) mapValues
사용예 : Value 값만 변경 JavaPairDStream<String, String> average = group_data.mapValues(new Function<Iterable<Integer>, String>() {
|
(7) foreachRDD
사용예 : 가장 일반적인 출력 연산, 데이터 DB 저장, 개별 데이터 처리 등 foreach_example.foreachRDD(new VoidFunction<JavaPairRDD<String, String>>() {
|
(8) foreachRDD 추가 (MongoDB)
사용예 : JavaDStream<String> 데이터 MongoDB로 저장 mongo_example.foreachRDD(new VoidFunction<JavaRDD<String>>() {
|
'spark - python - R' 카테고리의 다른 글
[Spark] RDD를 이용한 Mongo Collection Data to HDFS Save 및 연산처리 (0) | 2017.06.14 |
---|---|
[Spark] spark Dataset<Row>를 이용한 HDFS to Mysql Save (0) | 2017.06.14 |
[SPARK] flatMap - JavaReceiverInputDStream (0) | 2017.05.30 |
[SPARK] window operation- spark streaming (0) | 2017.05.29 |
[SPARK] Mongo Mysql foreachRDD (0) | 2017.04.20 |