본문 바로가기

spark - python - R

[SPARK]개별 평균 & Spark Streaming

336x280(권장), 300x250(권장), 250x250, 200x200 크기의 광고 코드만 넣을 수 있습니다.

테스트 과정

1. 실시간 데이터 수집

2. 게이트웨이-디바이스-센서종류 별로 데이터 그룹

3. 개별 평균





센서 원시데이터 샘플


{"device":"gateway","mac":"00:40:9d:98:aa:79","timezone":"KST","data":[{"mac":"d0:09:cb:d1:08:da","rssi":"-36","time":"2017-02-03 17:44:20.710,"sensor":"9012","value":"874"}]}

 





SPARK STREAMING 에서 실시간으로 들어오는 Json 형태의 센서 데이터 Parsing

                JavaDStream<String> line_data = data.flatMap(new FlatMapFunction<String, String>() {

                private static final long serialVersionUID = 1L;


                public Iterable<String> call(String line) {                    

                    List<String> data = new ArrayList<String>();

                    JsonObject jsonObject = new JsonParser().parse(line).getAsJsonObject();

                    

                    try{                        

                        String device = jsonObject.get("device").getAsString();

                        String mac = jsonObject.get("mac").getAsString();

                        String timezone = jsonObject.get("timezone").getAsString();


                        if(memoryDB.gateway_check(mac) == true){

                            JsonArray jsonArray = new JsonParser().parse(jsonObject.get("data").toString()).getAsJsonArray();

                            for (int count = 0; count < jsonArray.size(); count++) {

                                JsonObject temp = new JsonObject();

                                JsonObject temp2 = new JsonParser().parse(jsonArray.get(count).toString()).getAsJsonObject();


                                temp.addProperty("device", device);

                                temp.addProperty("mac", mac);

                                temp.addProperty("timezone", timezone);

                                temp.addProperty("smac", temp2.get("smac").getAsString());

                                temp.addProperty("rssi", temp2.get("rssi").getAsString());

                                temp.addProperty("time", temp2.get("time").getAsString());

                                temp.addProperty("value", temp2.get("value").getAsString());

                                temp.addProperty("sensor", temp2.get("sensor").getAsString());

                                data.add(temp.toString());

                            }                                    

                        }

                    }

                    catch(Exception e){

                        System.out.println("validation error");

                    }

                    return data;

                }

            });


 




고유 Key[디바이스 Info] - Value[센서 값] 으로 변환

 

                    JavaPairDStream<String, Integer> mappedStream = line_data.mapToPair(

                    new PairFunction<String, String, Integer>() {

                      @Override

                      public Tuple2<String, Integer> call(String line) {

                          JsonObject jsonObject = new JsonParser().parse(line).getAsJsonObject();

                          JsonObject dump = new JsonObject();

                          

                          dump.add("smac", jsonObject.get("smac"));

                          dump.add("mac", jsonObject.get("mac"));

                          dump.add("device", jsonObject.get("device"));

                          dump.add("sensor", jsonObject.get("sensor"));

                          int value = Integer.parseInt(jsonObject.get("value").toString().replace("\"", ""));

                        return new Tuple2<String, Integer>(dump.toString(), value);

                      }

                    });



          

         


개별 평균

     

            JavaPairDStream<String, Iterable<Integer>> grouped = mappedStream.groupByKey();

            JavaPairDStream<String, Integer> average = grouped.mapValues(new Function<Iterable<Integer>, Integer>() {

                public Integer call(Iterable<Integer> v1) throws Exception {

                    int count = 0;

                    int sum = 0;

                    for (int f : v1) {

                        count++;

                        sum += f;

                    }

                    return sum / count;

                }

            });



          

 


센서별 평균 콘솔 확인


 디바이스  1의 센서 평균 결과

({"smac":"d0:09:cb:d1:08:da","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9014"},2306)

({"smac":"d0:09:cb:d1:08:da","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9010"},316)

({"smac":"d0:09:cb:d1:08:da","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9011"},313)

({"smac":"d0:09:cb:d1:08:da","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9009"},16)

({"smac":"d0:09:cb:d1:08:da","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9012"},847)

({"smac":"d0:09:cb:d1:08:da","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9013"},26)

디바이스  2의 센서 평균 결과

({"smac":"cf:e6:e2:fe:e2:d6","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9011"},466)

({"smac":"cf:e6:e2:fe:e2:d6","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9009"},32)

({"smac":"cf:e6:e2:fe:e2:d6","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9012"},475)

({"smac":"cf:e6:e2:fe:e2:d6","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9013"},24)

({"smac":"cf:e6:e2:fe:e2:d6","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9010"},463)

디바이스  3의 센서 평균 결과

({"smac":"f4:5b:29:3e:07:81","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9010"},472)

({"smac":"f4:5b:29:3e:07:81","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9014"},2625)

({"smac":"f4:5b:29:3e:07:81","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9011"},308)

({"smac":"f4:5b:29:3e:07:81","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9009"},41)

({"smac":"f4:5b:29:3e:07:81","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9012"},895)

({"smac":"f4:5b:29:3e:07:81","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9013"},22)

디바이스  4의 센서 평균 결과

({"smac":"cf:62:8f:ad:78:f7","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9012"},850)

({"smac":"cf:62:8f:ad:78:f7","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9009"},71)

({"smac":"cf:62:8f:ad:78:f7","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9013"},24)

({"smac":"cf:62:8f:ad:78:f7","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9014"},2505)

({"smac":"cf:62:8f:ad:78:f7","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9010"},472)

({"smac":"cf:62:8f:ad:78:f7","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9011"},468)