테스트 과정
1. 실시간 데이터 수집
2. 게이트웨이-디바이스-센서종류 별로 데이터 그룹
3. 개별 평균
센서 원시데이터 샘플
{"device":"gateway","mac":"00:40:9d:98:aa:79","timezone":"KST","data":[{"mac":"d0:09:cb:d1:08:da","rssi":"-36","time":"2017-02-03 17:44:20.710,"sensor":"9012","value":"874"}]}
|
SPARK STREAMING 에서 실시간으로 들어오는 Json 형태의 센서 데이터 Parsing
JavaDStream<String> line_data = data.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; public Iterable<String> call(String line) { List<String> data = new ArrayList<String>(); JsonObject jsonObject = new JsonParser().parse(line).getAsJsonObject();
try{ String device = jsonObject.get("device").getAsString(); String mac = jsonObject.get("mac").getAsString(); String timezone = jsonObject.get("timezone").getAsString(); if(memoryDB.gateway_check(mac) == true){ JsonArray jsonArray = new JsonParser().parse(jsonObject.get("data").toString()).getAsJsonArray(); for (int count = 0; count < jsonArray.size(); count++) { JsonObject temp = new JsonObject(); JsonObject temp2 = new JsonParser().parse(jsonArray.get(count).toString()).getAsJsonObject(); temp.addProperty("device", device); temp.addProperty("mac", mac); temp.addProperty("timezone", timezone); temp.addProperty("smac", temp2.get("smac").getAsString()); temp.addProperty("rssi", temp2.get("rssi").getAsString()); temp.addProperty("time", temp2.get("time").getAsString()); temp.addProperty("value", temp2.get("value").getAsString()); temp.addProperty("sensor", temp2.get("sensor").getAsString()); data.add(temp.toString()); } } } catch(Exception e){ System.out.println("validation error"); } return data; } }); |
고유 Key[디바이스 Info] - Value[센서 값] 으로 변환
JavaPairDStream<String, Integer> mappedStream = line_data.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String line) { JsonObject jsonObject = new JsonParser().parse(line).getAsJsonObject(); JsonObject dump = new JsonObject();
dump.add("smac", jsonObject.get("smac")); dump.add("mac", jsonObject.get("mac")); dump.add("device", jsonObject.get("device")); dump.add("sensor", jsonObject.get("sensor")); int value = Integer.parseInt(jsonObject.get("value").toString().replace("\"", "")); return new Tuple2<String, Integer>(dump.toString(), value); } }); |
개별 평균
JavaPairDStream<String, Iterable<Integer>> grouped = mappedStream.groupByKey(); JavaPairDStream<String, Integer> average = grouped.mapValues(new Function<Iterable<Integer>, Integer>() { public Integer call(Iterable<Integer> v1) throws Exception { int count = 0; int sum = 0; for (int f : v1) { count++; sum += f; } return sum / count; } }); |
센서별 평균 콘솔 확인
디바이스 1의 센서 평균 결과 ({"smac":"d0:09:cb:d1:08:da","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9014"},2306) ({"smac":"d0:09:cb:d1:08:da","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9010"},316) ({"smac":"d0:09:cb:d1:08:da","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9011"},313) ({"smac":"d0:09:cb:d1:08:da","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9009"},16) ({"smac":"d0:09:cb:d1:08:da","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9012"},847) ({"smac":"d0:09:cb:d1:08:da","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9013"},26) 디바이스 2의 센서 평균 결과 ({"smac":"cf:e6:e2:fe:e2:d6","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9011"},466) ({"smac":"cf:e6:e2:fe:e2:d6","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9009"},32) ({"smac":"cf:e6:e2:fe:e2:d6","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9012"},475) ({"smac":"cf:e6:e2:fe:e2:d6","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9013"},24) ({"smac":"cf:e6:e2:fe:e2:d6","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9010"},463) 디바이스 3의 센서 평균 결과 ({"smac":"f4:5b:29:3e:07:81","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9010"},472) ({"smac":"f4:5b:29:3e:07:81","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9014"},2625) ({"smac":"f4:5b:29:3e:07:81","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9011"},308) ({"smac":"f4:5b:29:3e:07:81","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9009"},41) ({"smac":"f4:5b:29:3e:07:81","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9012"},895) ({"smac":"f4:5b:29:3e:07:81","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9013"},22) 디바이스 4의 센서 평균 결과 ({"smac":"cf:62:8f:ad:78:f7","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9012"},850) ({"smac":"cf:62:8f:ad:78:f7","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9009"},71) ({"smac":"cf:62:8f:ad:78:f7","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9013"},24) ({"smac":"cf:62:8f:ad:78:f7","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9014"},2505) ({"smac":"cf:62:8f:ad:78:f7","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9010"},472) ({"smac":"cf:62:8f:ad:78:f7","mac":"00:40:9d:98:ac:d1","device":"gateway","sensor":"9011"},468) |
'spark - python - R' 카테고리의 다른 글
[SPARK] flatMap - JavaReceiverInputDStream (0) | 2017.05.30 |
---|---|
[SPARK] window operation- spark streaming (0) | 2017.05.29 |
[SPARK] Mongo Mysql foreachRDD (0) | 2017.04.20 |
[R] R Studio Server 설치 (0) | 2017.04.20 |
[SPARK] 1.6 -> 2.1 foreachRDD, flatMap 사용법 변화 (0) | 2017.04.20 |