본문 바로가기

spark - python - R

[SPARK] window operation- spark streaming

336x280(권장), 300x250(권장), 250x250, 200x200 크기의 광고 코드만 넣을 수 있습니다.


[Spark Streaming]


window operation


▶ spark의 window 연산은 여러 배치 배치들의 결과를 합쳐서 StreamingContext의 배치 간격보다 훨씬 긴 시간 간격에 대한 결과를 계산한다.


아래 두 가지 값은 StreamingContext의 배치 간격의 배수여야 한다.



1.window duraton


 - 지나간 배치를 몇 개나 사용 할지를 제어하며 개수 만큼의 최근 배치들을 사용

 - 만약 배치 간격 10초의 DStream을 갖고 있고 최근 30초간의 슬라이딩 윈도를 만들고 싶다면 1을 30초로






2.sliding duration


 - 기본값이 배치 간격과 동일하며 얼마나 자주 새로운 DStream이 결과를 계산할지를 결정

 - 만약 배치 간격 10초의 DStream을 갖고 있고 두 번의 배치마다 연산을 하고  싶다면 슬라이딩 간격을 20초로 설정한다.









※샘플 데이터


▶환경 센서 데이터 (RabbitMQ MQTT Data)









테스트 소스

30초의 기본 배치 설정 후 1분 30초마다 window 데이터 확인


 

    public SparkStreaming() {
        streaming_setting();
    }
   
    public static void streaming_setting(){
        StreamsparkConf = new SparkConf().setAppName("streaming").setMaster("local[2]")

                                            .set("spark.driver.allowMultipleContexts", "true")

                                            .set("spark.ui.port", "4046");
     
        global.jsc = new JavaSparkContext(StreamsparkConf);
        global.jsc.setLogLevel("ERROR");
        ssc= new JavaStreamingContext(global.jsc, Durations.seconds(30));  30초로 설정
       
        // Define MQTT url and topic
        brokerUrl = SettingValue.getBrokerurl();
        topic = SettingValue.getTopic();
        password = SettingValue.getPassword();
        user = SettingValue.getUser();
    }

    public void init(){
        memoryDB = new MemoryDB();
        mysqlProcess = new MysqlProcess();
        memoryDB.table_Create();
        memoryDB.mysqlTOmemory();
                       
        eventProcess = new EventProcess(mysqlProcess);
    }

    public static void stop() {       
        ssc.stop();
        data = null;
    }

    public static void reconnect() throws InterruptedException {
        logger.error("spark streaming reconnection");
        stop();
        TimeUnit.SECONDS.sleep(10);
        streaming_setting();
        connect();
    }
   
    public static void connect() throws InterruptedException {
        try {                       
            data = MQTTUtils.createStream(ssc, brokerUrl, topic, "streaming_test", user, password, true);
            JavaDStream<String> windowedWordCounts =

                               data.window(Durations.seconds(60), Durations.seconds(60));
       
            data.print();
            windowedWordCounts.print();
               
             








콘솔 결과 확인



window 데이터 = 첫번째 + 두번째 + 세번째 데이터



●첫번째 데이터

-------------------------------------------
Time: 1496034300000 ms
-------------------------------------------
[{"service_name":"bzt","device_type":"","device_mac":"00:40:9d:98:ac:d1","time_zone":"KST","sensor_code":"", ~~ 생략


●두번째 데이터

-------------------------------------------
Time: 1496034330000 ms
-------------------------------------------
[{"service_name":"bzt","device_type":"","device_mac":"00:40:9d:98:ac:d1","time_zone":"KST","sensor_code":"", ~~

[{"service_name":"env","device_type":"","device_mac":"00:40:9d:98:ac:d1","time_zone":"KST","sensor_code":9014, ~~

[{"service_name":"bzt","device_type":"","device_mac":"00:40:9d:98:ac:d1","time_zone":"KST","sensor_code":"", ~~

[{"service_name":"bzt","device_type":"","device_mac":"00:40:9d:98:ac:d1","time_zone":"KST","sensor_code":"", ~~


●세번째 데이터

-------------------------------------------
Time: 1496034360000 ms
-------------------------------------------
[{"service_name":"bzt","device_type":"","device_mac":"00:40:9d:98:ac:d1","time_zone":"KST","sensor_code":"", ~~

[{"service_name":"env","device_type":"","device_mac":"00:40:9d:98:ac:d1","time_zone":"KST","sensor_code":9009, ~~

[{"service_name":"bzt","device_type":"","device_mac":"00:40:9d:98:ac:d1","time_zone":"KST","sensor_code":"", ~~

[{"service_name":"bzt","device_type":"","device_mac":"00:40:9d:98:ac:d1","time_zone":"KST","sensor_code":"", ~~


●Window 데이터

-------------------------------------------
Time: 1496034360000 ms
-------------------------------------------

[{"service_name":"bzt","device_type":"","device_mac":"00:40:9d:98:ac:d1","time_zone":"KST","sensor_code":"", ~~

[{"service_name":"bzt","device_type":"","device_mac":"00:40:9d:98:ac:d1","time_zone":"KST","sensor_code":"", ~~

[{"service_name":"env","device_type":"","device_mac":"00:40:9d:98:ac:d1","time_zone":"KST","sensor_code":9014, ~~

[{"service_name":"bzt","device_type":"","device_mac":"00:40:9d:98:ac:d1","time_zone":"KST","sensor_code":"", ~~

[{"service_name":"bzt","device_type":"","device_mac":"00:40:9d:98:ac:d1","time_zone":"KST","sensor_code":"", ~~

[{"service_name":"bzt","device_type":"","device_mac":"00:40:9d:98:ac:d1","time_zone":"KST","sensor_code":"", ~~

[{"service_name":"env","device_type":"","device_mac":"00:40:9d:98:ac:d1","time_zone":"KST","sensor_code":9009, ~~

[{"service_name":"bzt","device_type":"","device_mac":"00:40:9d:98:ac:d1","time_zone":"KST","sensor_code":"", ~~

[{"service_name":"bzt","device_type":"","device_mac":"00:40:9d:98:ac:d1","time_zone":"KST","sensor_code":"", ~~