본문 바로가기

빅데이터

RabbitMQ & Spark Streaming Test

336x280(권장), 300x250(권장), 250x250, 200x200 크기의 광고 코드만 넣을 수 있습니다.



실험 내용



  ▶실제 환경 센서에서 수집 되는 데이터를 RabbitMQ Server로 받고 이를 Spark Streaming으로 확인하고 HDFS에 저장


  ▶실험 간략 요약


   






수집
Data


  ▶Format : JSON Array[]


  ▶Topic : test


  ▶MQTT Data


     {

           "device":"gateway",

           "mac":"00:40:9d:98:ac:d1",

           "timezone":"KST",

           "data":

                  [

                     {

                           "smac":"cf:e6:e2:fe:e2:d6",

                           "rssi":"-68",

                           "time":"2017-02-17 11:49:37.439",

                           "sensor":"9014","value":"2529"

                     },
                     {

                          "smac":"f4:5b:29:3e:07:81",

                          "rssi":"-58",

                          "time":"2017-02-17 11:49:38.942",

                          "sensor":"9014","value":"2809"

                     },


                      ...

                      ...

                      N개

                 ]

      }

 





핵심 Source


  ▶Topic "#" 은 모든 topic의 데이터를 받는다는 뜻임

   
  ▶본 예제에서는 "test"라는 topic에 대해서만 확인하면 되지만 그냥 전체 메시지를 받아봄

 

  String brokerUrl = "rabbitmq server URI";
  String topic = "#";


  JavaReceiverInputDStream<String> test5 = MQTTUtils.createStream(ssc, brokerUrl, topic,
                  "testid", "rabbitmq", "rabbitmq", true);

 

  ..

  ..


  data.dstream().saveAsTextFiles("hdfs://master.com:9000/user/test/", "test");






Dependency


 <dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>spark-streaming-mqtt_2.11</artifactId>
    <version>2.0.1</version>
 </dependency>






콘솔 확인


jsonArray.size() : 17
{"device":"gateway","mac":"00:40:9d:98:ac:d1","timezone":"KST","data":[{"smac":"cf:e6:e2:fe:e2:d6","rssi":"-68","time":"2017-02-17 11:49:37.439","sensor":"9014","value":"2529"},{"smac":"f4:5b:29:3e:07:81","rssi":"-58","time":"2017-02-17 11:49:38.942","sensor":"9014","value":"2809"},{"smac":"cf:62:8f:ad:78:f7","rssi":"-72","time":"2017-02-17 11:49:39.098","sensor":"9009","value":"59"},{"smac":"f4:5b:29:3e:07:81","rssi":"-55","time":"2017-02-17 11:49:39.307","sensor":"9014","value":"2809"},{"smac":"cf:e6:e2:fe:e2:d6","rssi":"-79","time":"2017-02-17 11:49:39.802","sensor":"9009","value":"49"},{"smac":"cf:62:8f:ad:78:f7","rssi":"-76","time":"2017-02-17 11:49:40.104","sensor":"9010","value":"472"},{"smac":"d0:09:cb:d1:08:da","rssi":"-63","time":"2017-02-17 11:49:40.652","sensor":"9011","value":"314"},{"smac":"cf:e6:e2:fe:e2:d6","rssi":"-74","time":"2017-02-17 11:49:40.806","sensor":"9010","value":"476"},{"smac":"cf:62:8f:ad:78:f7","rssi":"-66","time":"2017-02-17 11:49:41.101","sensor":"9011","value":"470"},{"smac":"d0:09:cb:d1:08:da","rssi":"-64","time":"2017-02-17 11:49:41.322","sensor":"9012","value":"849"},{"smac":"cf:e6:e2:fe:e2:d6","rssi":"-84","time":"2017-02-17 11:49:41.453","sensor":"9011","value":"469"},{"smac":"f4:5b:29:3e:07:81","rssi":"-58","time":"2017-02-17 11:49:42.297","sensor":"9010","value":"482"},{"smac":"f4:5b:29:3e:07:81","rssi":"-56","time":"2017-02-17 11:49:42.957","sensor":"9011","value":"326"},{"smac":"cf:e6:e2:fe:e2:d6","rssi":"-86","time":"2017-02-17 11:49:43.450","sensor":"9013","value":"21"},{"smac":"f4:5b:29:3e:07:81","rssi":"-46","time":"2017-02-17 11:49:45.957","sensor":"9014","value":"2810"},{"smac":"f4:5b:29:3e:07:81","rssi":"-55","time":"2017-02-17 11:49:46.293","sensor":"9014","value":"2810"},{"smac":"cf:62:8f:ad:78:f7","rssi":"-72","time":"2017-02-17 11:49:47.126","sensor":"9010","value":"462"}]}
17/02/17 11:48:20 INFO FileOutputCommitter: Saved output of task 'attempt_201702171148_0018_m_000000_34' to hdfs://master.com:9000/user/test/-1487299700000.test/_temporary/0/task_201702171148_0018_m_000000
17/02/17 11:48:20 INFO SparkHadoopMapRedUtil: attempt_201702171148_0018_m_000000_34: Committed
17/02/17 11:48:20 INFO Executor: Finished task 0.0 in stage 18.0 (TID 34). 1864 bytes result sent to driver
17/02/17 11:48:20 INFO TaskSetManager: Starting task 1.0 in stage 18.0 (TID 35, localhost, partition 1,NODE_LOCAL, 2017 bytes)
17/02/17 11:48:20 INFO Executor: Running task 1.0 in stage 18.0 (TID 35)
17/02/17 11:48:20 INFO TaskSetManager: Finished task 0.0 in stage 18.0 (TID 34) in 16 ms on localhost (1/2)
17/02/17 11:48:20 INFO BlockManager: Found block input-0-1487299696200 locally
17/02/17 11:48:20 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
jsonArray.size() : 13
{"device":"gateway","mac":"00:40:9d:98:ac:d1","timezone":"KST","data":[{"smac":"f4:5b:29:3e:07:81","rssi":"-57","time":"2017-02-17 11:49:48.967","sensor":"9010","value":"475"},{"smac":"f4:5b:29:3e:07:81","rssi":"-58","time":"2017-02-17 11:49:49.303","sensor":"9010","value":"475"},{"smac":"f4:5b:29:3e:07:81","rssi":"-46","time":"2017-02-17 11:49:49.954","sensor":"9011","value":"326"},{"smac":"f4:5b:29:3e:07:81","rssi":"-41","time":"2017-02-17 11:49:50.320","sensor":"9011","value":"326"},{"smac":"f4:5b:29:3e:07:81","rssi":"-48","time":"2017-02-17 11:49:50.955","sensor":"9012","value":"897"},{"smac":"f4:5b:29:3e:07:81","rssi":"-55","time":"2017-02-17 11:49:51.947","sensor":"9013","value":"19"},{"smac":"f4:5b:29:3e:07:81","rssi":"-41","time":"2017-02-17 11:49:54.952","sensor":"9009","value":"2"},{"smac":"cf:62:8f:ad:78:f7","rssi":"-67","time":"2017-02-17 11:49:55.128","sensor":"9011","value":"463"},{"smac":"d0:09:cb:d1:08:da","rssi":"-70","time":"2017-02-17 11:49:56.310","sensor":"9013","value":"24"},{"smac":"f4:5b:29:3e:07:81","rssi":"-52","time":"2017-02-17 11:49:56.327","sensor":"9010","value":"473"},{"smac":"d0:09:cb:d1:08:da","rssi":"-70","time":"2017-02-17 11:49:56.654","sensor":"9013","value":"24"},{"smac":"cf:e6:e2:fe:e2:d6","rssi":"-81","time":"2017-02-17 11:49:56.811","sensor":"9012","value":"568"},{"smac":"cf:62:8f:ad:78:f7","rssi":"-66","time":"2017-02-17 11:49:57.122","sensor":"9013","value":"20"}]}






HDFS 저장 확인


 




'빅데이터' 카테고리의 다른 글

rabbitmq publish - 최대 접속 테스트  (0) 2017.08.01
zeppelin 설치  (0) 2017.06.26
(설치)RabbitMQ Install Centos 6.X  (0) 2017.05.15
Kafka 설치(3) - Kafka + Spark Streaming Test  (0) 2017.04.27
Kafka 설치(2)  (0) 2017.04.27