본문 바로가기

spark - python - R

[SPARK] flatMap - JavaReceiverInputDStream

336x280(권장), 300x250(권장), 250x250, 200x200 크기의 광고 코드만 넣을 수 있습니다.




데이터를 전송할 때


  공통적인 부분과 개별 데이터 2종류로 나뉜다.

 
  개별 데이터의 경우 향후 데이터 처리하는 서버와의 정해진 인터페이스 정의서에 맞게 반복적으로 보내는 경우가 많으며


  또한 이러한 데이터들은 묶여서 보내지는 경우가 많다.





Json Format이 많이 사용되고 있으며


  Array, List 등의 데이터를 JavaDStream<..> 로 변환하는데 flatMap이 사용된다.


  이에 Spark Streaming Data를 개별로 쉽게 분리하는 방법을 실험 해보자.





샘플데이터


Topic에서 분리한 Json Array String 데이터


  [{"A":"AA","B":"BB","C":"CC"}, {"A":"AA","B":"BB","C":"CC"}, {"A":"AA","B":"BB","C":"CC"} .. N개 }]





테스트


JavaReceiverInputDStream<String> data = 스트리밍 연결 (Kafka, RabbitMQ 등)


//JSON Format Validation Check
public static boolean isJSONValid(String test) {
        try {
            new JSONObject(test);
        } catch (JSONException ex) {
            try {
                new JSONArray(test);
            } catch (JSONException ex1) {
                return false;
            }
        }
        return true;
    }



JavaDStream<String> line_data = data.flatMap(new FlatMapFunction<String, String>() {
                private static final long serialVersionUID = 1L;
                public Iterator<String> call(String line) {   
                   
                    List<String> data = new ArrayList<String>();   
                    if(isJSONValid(line) == true){               
                        JsonArray jsonArray;
                        jsonArray = new JsonParser().parse(line.toString()).getAsJsonArray();
                       
                        for (int count = 0; count < jsonArray.size(); count++) {
                                JsonObject temp = new JsonParser().parse(jsonArray.get(count).toString()).getAsJsonObject();
                                data.add(temp.toString());
                        }                           
                    }
                    else{
                        System.out.println("validation error");
                    }
                    return data.iterator();
              }
     });



line_data.print();

 




결과

Json Array -> JsonObject String


{"A":"AA","B":"BB","C":"CC"}
{"A":"AA","B":"BB","C":"CC"}
{"A":"AA","B":"BB","C":"CC"}

..

..