본문 바로가기

spark - python - R

[Spark] spark Dataset<Row>를 이용한 HDFS to Mysql Save

336x280(권장), 300x250(권장), 250x250, 200x200 크기의 광고 코드만 넣을 수 있습니다.




□ SAVE MODES 설정






Mysql 예제 정보


▶ 테이블 명 : T_TEST


▶ 컬럼 정보 : String a, String b, String c


▶ HDFS 데이터를 이미 생성되어 있는 테이블에 저장할 것임, 이에   write().mode(SaveMode.Append)

 




DTO 생성


public class TestDto implements Serializable {

    private String a;

    private String b;

    private String c;
   
    public String getA() {

        return a;

    }


    public void setA(String a) {

        this.a = a;

    }


    public String getB() {

        return b;

    }


    public void setB(String b) {

        this.b = b;

    }


    public String getC() {

        return c;

    }


    public void setC(String c) {

        this.c = c;

    }

}





HDFS 예제 데이터 정보


▶ MongoDB 정보를 bson 형태로 HDFS에 저장한 데이터


▶ Key : Value

 




테스트


        final String MYSQL_USERNAME = "아이디";

        final String MYSQL_PWD = "패스워드";


        final SQLContext sqlContext = new SQLContext(StaticSparkContext.scontext);



        JavaRDD<TestDto> data = StaticSparkContext.scontext.newAPIHadoopFile(

                "hdfs://192.168.0.XXX:9000/user/위치/파일명",

                BSONFileInputFormat.class,

                Object.class, BSONObject.class,

                bsonDataConfig

                ).map(new Function<Tuple2<Object, BSONObject>, TestDto>() {
                   

                    private static final long serialVersionUID = 1L;

 

                    public TestDto call(Tuple2<Object, BSONObject> doc) throws Exception {


                        TestDto dto = new TestDto();

                        dto.setA((String) doc._2.get("a"));

                        dto.setB((String) doc._2.get("b"));

                        dto.setC((String) doc._2.get("c"));


                        return dto;
                    }
                });



         Dataset<Row> schemadata = sqlContext.createDataFrame(one, TestDto.class);
         schemadata.printSchema();



         Properties prop = new java.util.Properties();
         prop.put("user", MYSQL_USERNAME);
         prop.put("password", MYSQL_PWD);



         schemadata.

                    write().

                    mode(SaveMode.Append).

                    jdbc("jdbc:mysql://192.168.0.XXX:3306/데이터베이스명", "테이블명", prop);