본문 바로가기

spark - python - R

[SPARK] Spark Streaming - Transformations on DStreams

336x280(권장), 300x250(권장), 250x250, 200x200 크기의 광고 코드만 넣을 수 있습니다.







[Spark Streaming DStreams]



RDD의 경우와 마찬가지로 변환을 통한 DStream의 데이터를 수정이 가능하며 제공되는 기능은 아래와 같다.










이제 예제를 통해(그냥 개인적으로 만든거라 안되면 문의) 사용해 보자.


될 경우 Spark 버을 확인하고 Scala 버전 확인 후 Dependency 체크


그리고 버전에 따른 변경된 문법을 확인 (1.6 -> 2.1로 변경하면서 살짝쿵 틀린 부분 있었음)







▶예제 요약


  -  데이터를 받아서 기본 연산을 편하게 하기 위해 변환 후 연산 결과를 DB에 저장하는 예제를 진행


  -  ( 흠.... 그냥 많이 사용 되는거 같아서 ?..... )






▶예제 내용

 

(1)커넥션 이후 데이터 받고(JavaReceiverInputDStream) 이를, N개로 분리(JavaDStream)


(2)데이터 필터링


(3)평균, 합, 카운트 등의 연산을 쉽게하기 위한 Key-Value 로 변환 (JavaDStream => JavaPairDStream)


(4)groupByKey를 이용한 동일한 Key 값에 대한 Value 그룹핑


(5)mapValues를 이용한 Key 값별 개수


(6)mapValues를 이용한 Value 값만 변경


(7)foreachRDD를 이용한 개별 데이터 처리


(8)foreachRDD로 MongoDB 저장 (JavaDStream)






(1) flatMap


           사용예 : 띄워쓰기 콤마 등의 구분자로 Text를 개별 문자로 분리

                       Array 데이터 하나로 분리
                       1개의 데이터를 N개로 분리 등


            JavaReceiverInputDStream<String> data = 커넥션


            JavaDStream<String> flatMap_sample = data.flatMap(new FlatMapFunction<String, String>() {
                private static final long serialVersionUID = 1L;
                public Iterator<String> call(String line) {                       
                    List<String> data = new ArrayList<String>();   
                   
                    //처리

                    return data.iterator();
                }
            });






(2) filter


 

             사용예 : 개별 데이터를 조건으로 특정 데이터만 선택 또는 제외


             JavaDStream<String> filter_sample = flatMap_sample.filter(new Function<String, Boolean>() {
                private static final long serialVersionUID = 1L;
                @Override
                public Boolean call(String dump) throws Exception {     
                   
                    //처리

                    (예) 데이터에 문자 A가 있으면 제외시켜라

                    if(  dump.contains("A") ){

                            return true;

                    }

                    else{

                            return false;

                    }

                }
            });






(3) mapToPair


            사용예 : 개별 데이터에서 Key, Value 값으로 설정할 정보 추출


            JavaPairDStream<String, Integer> map_example = filter_sample.mapToPair(
                    new PairFunction<String, String, Integer>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Tuple2<String, Integer> call(String line) {
                       
                          //처리
                          String line 에서 Key(String), Value(Int) 추출


                          return new Tuple2<String, Integer>(키, 값);   
                      }
                    }
            );

 





(4) groupByKey


            사용예 : 동일한 Key 값에 대한 Value 그룹핑


            JavaPairDStream<String, Iterable<Integer>> group_example =  map_example.groupByKey();





[데이터 예]

A : 1     A : 2    A : 8    B : 4    C : 9  B : 12  ....


[변환 결과]

A : [ 1, 2, 8, 12 .. ]    B : [ 4, .. ]    C : [ 9, .. ]






(5) countByValue


            사용예 : Key 별 갯수


            JavaPairDStream<String, Long> count_example = map_example.countByValue();




[데이터 예]

A : 1     A : 2    B : 8    B : 4    C : 9 


[변환 결과]

A : 2.0개

B : 2.0개

C : 1.0개






(6) mapValues


            사용예 : Value 값만 변경


            JavaPairDStream<String, String> average =

                                                     group_data.mapValues(new Function<Iterable<Integer>, String>() {


                private static final long serialVersionUID = 1L;
                public String call(Iterable<Integer> v1) throws Exception {
                  
                    return 변경될 값;
                }
            });

 





(7) foreachRDD



            사용예 : 가장 일반적인 출력 연산, 데이터 DB 저장, 개별 데이터 처리 등


            foreach_example.foreachRDD(new VoidFunction<JavaPairRDD<String, String>>() {          
                private static final long serialVersionUID = 1L;
                public void call(JavaPairRDD<String, String> rdd) throws Exception {
                     rdd.foreach(
                                new VoidFunction<Tuple2<String,String>>() {
                                    private static final long serialVersionUID = 1L;
                                    public void call(Tuple2<String,String> data) {
                                        //처리
                                    }
                                }
                            );
                    }
            });

 





(8) foreachRDD 추가 (MongoDB)



            사용예 : JavaDStream<String> 데이터 MongoDB로 저장


            mongo_example.foreachRDD(new VoidFunction<JavaRDD<String>>() {
                private static final long serialVersionUID = 1L;
                Mongo mongo = new Mongo(아이피주소, 포트번호);
                DB db = mongo.getDB("데이터 데이스명");

                @Override
                public void call(JavaRDD<String> data) throws Exception {
                          DBCollection collection = db.getCollection("컬렉션 이름");

                          DBObject dbObject = (DBObject) JSON.parse(temp.toString());
                          collection.insert(dbObject);
                }
            });