본문 바로가기

spark - python - R

[Spark] RDD를 이용한 Mongo Collection Data to HDFS Save 및 연산처리

336x280(권장), 300x250(권장), 250x250, 200x200 크기의 광고 코드만 넣을 수 있습니다.




□ 내용


 ▶ MongoDB에 저장되어 있는 데이터를 Collection 단위로 HDFS에 저장


 ▶ 저장 시 BSON Format으로 저장되며 Read, Write 과정에서 데이터들을 Key / Value로 편리하게 처리 가능

 



□ 저장 테스트


        SparkConf sparkConf = new SparkConf().

                                                  setAppName("test").

                                                  setMaster("local[2]").
                                                  set("spark.ui.port", "4046");
     
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);


        Configuration conf = new Configuration();

        conf.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat");

        conf.set("mongo.input.uri",

                "mongodb://아이디:패스워드@192.168.~~~.~~~:27017/데이터 베이스명" + "." + "컬렉션명");



        JavaPairRDD<Object, BSONObject> documents = jsc.newAPIHadoopRDD(

                conf, // Configuration

                com.mongodb.hadoop.MongoInputFormat.class, // InputFormat: read from a live cluster.

                Object.class, // Key class

                BSONObject.class // Value class

        );



        documents.saveAsNewAPIHadoopFile("hdfs://master.com:9000/user/저장할 위치/파일명",
                                               Object.class,
                                               BSONObject.class,
                                               BSONFileOutputFormat.class,
                                               new Configuration());

 



□ Mongo Sample


▶  몽고에 저장된 데이터 예는 아래와 같다고 한다면 위의 JavaPairRDD<Object, BSONObject> documents 처리는 Key / Value로 편리하게 처리 할 수 있다.


{ "_id" : ObjectId("593c09ac3c14878fba224f46"), "a" : "test" },

{ "_id" : ObjectId("593c09ac3c14878fba224f47"), "a" : "X" },

{ "_id" : ObjectId("593c09ac3c14878fba224f48"), "a" : "test" },

{ "_id" : ObjectId("593c09ac3c14878fba224f49"), "a" : "XX" },

{ "_id" : ObjectId("593c09ac3c14878fba224f50"), "a" : "XXX" },

{ "_id" : ObjectId("593c09ac3c14878fba224f51"), "a" : "test" }

...

...

 





Key / Value


아래 예는 JavaPairRDD<Object, BSONObject> documents의 데이터들 중에서 Key가 a인 것 중에서 Value가 test인 값 필터링 한다.


텍스트 변환, 단어 구분, 커스텀 클래스 생성, 변환을 위한 데이터 구분 등등의 추가적인 과정 없이 사용이 매우 쉽다.


아래 예 뿐만 아니라  다른 연산 처리에도 적용 가능



         documents.filter(new Function<Tuple2<Object, BSONObject>, Boolean>() {

                       

                        private static final long serialVersionUID = 1L;


                        public Boolean call(Tuple2<Object, BSONObject> arg0) throws Exception {

                            if(arg0._2.get("a").equals("test")){

                                return true;

                            }

                            else{

                                return false;

                            }

                        }

         });