본문 바로가기

MY어플리케이션

[자작] 실시간 이벤트 플랫폼 - Stationery-ink

실시간 이벤트에 SQL을 적용해보자.

 

소스: https://github.com/dk-stationery/stationery-ink

 

ETL 빅데이터를 처리하다보면, 정적인 데이터 이외에,

카프카등 큐에 들어오는 실시간성 이벤트를 이용하여, 통계 데이터를 만들어야하는 경우가 생기게 된다.

이경우, 요즘은 스파크같은 마이크로 배치가 지원되는 플랫폼을 사용을 많이 하지만,

내가 stationery-ink를 만들었을때(2010년초?!)는 아직 스파크가 초기버전이였다...

(따끈따끈한 아기 Apache Spark...)

 

그당시, 주류는 Apache Storm이라는 실시간 처리 플랫폼이 메인 스트림이였다.

이 플랫폼을 사용한다고 해도,

카프카에 담긴 데이터를 sum, avg등 어그리게이션을 하기위해서는 결국, 프로그래밍을 해야했다.

 

스트림에서 sum, avg등 어그리게이션을 하는것은 정말 까다롭다.

스트림은 지속적으로 흘러가는 이벤트고,

여기서 어느 기준(윈도우 Time, Length, Size등의 개념)으로 어떤 범위까지 어그리게이션을 해야할지를 코드로 구현하기는

엄청 힘든작업이기 때문이다.

 

이부분은 esper라는 cep플랫폼을 이용하여, 쉽게 할수있기때문에, Apache Storm + Esper조합으로 실시간 어그리게이션을 많이 했다.

 

여기서 아이디어가 나왔는데,

이런 어그리게이션작업을 코딩으로 구현하지 말고,

자체 SQL쿼리를 이용하여, 자동으로 이벤트를 어그리게이션해주는 플랫폼을 만들수 있지 않을까?? 라는 생각이 들었다.

 

SQL쿼리를 자체적으로 만들기위해 Antlr3를 이용해서 쿼리 visitor 클래스들을 만들어냈다.

해당 쿼리를 통해,

카프카,디비,레디스등에서 오는 스트림데이터를 esper를 통해 어그리게이션하는 과정을 자체 SQL쿼리 베이스로 동작하게 만들었다.

 

그리고 이런 쿼리는 자체 Jdbc드라이버를 구현하여, 쿼리툴을 이용해 쿼리를 실행/구동할수있게 만들어서 범용성을 높였다.

( 이부분은 정말 될까?라는 의심반으로 했는데, 잘동작하는것을 보고, 흐믓했던 기억이 있다. )

 

이 플랫폼을 만들고 제일 먼저 한게,

Apache Storm을 만든 개발자,

Nathan Marz에게 "내가 이런것을 만들었는데 어때??" 라고 메일을 보냈다.

하지만, 메일은 읽었지만, 답변은 없더라.... (야속해)

 

그리고 2년정도 지나고나서, Apache Storm에서는 내가 만든것과 유사한 쿼리 베이스의 어그리게이션 기능이 나왔다.

속으로 (후후후...이녀석...)이라고 생각했다.

그리고 최근에 우연히 본 오픈소스, KsqlDb를 봤는데, 내가 만든 플랫폼과 거의 90% 컨셉이 일치하였다..그때 충격이란..

(2019년에 나왔는데, 나보다 3년이나 늦게 나온건데...)

 

역시, 개발도 타이밍 같다...

 

이 플랫폼 만들어서, 사내에서 적용해서 많이 사용했다. 분명 그당시 센세이션한 놈이였다고 분명히 생각하지만,

알아주는 사람은 없긴했다.

 

하지만 이 플랫폼을 만들면서 배운 것들이 정말 많았다.

Antlr3, Hbase, Storm, Redis, Jdbc Driver, Phinex, Esper에 대해서 많은 배움이 있었기에 후회는 없다.

보람만이 있을뿐!

 

(아래는 간단한 SQL 소개)

 

쿼리 중에 가장 중요한게 create source 쿼리다 이건 스트림소스의 접속정보를 등록하는 쿼리이다.

create source kafka meta (
		CATALOG 'KAFKA'
		, URL '127.0.0.1:2181,127.0.0.2:2181,127.0.0.3:2181,127.0.0.4:2181');

이렇게 하면 kafka라는 접속정보를 통해 카프카에 커넥션을 해서 데이터를 받아오게된다.

 

 

create stream은 해당 접속 정보를 통해 실제 이벤트 스트림 (테이블)을 만드는 쿼리이다.

create stream testA (_PAYLOAD_ STRING) meta (TOPIC 'testA_topic', TYPE 'topic|queue');

 

실제 쿼리는 아래와 같은 쿼리를 사용가능하다.

select 
		DMP_LOG.host
		,DMP_LOG.path
		,DMP_LOG.payload.message
	from 
		[testA:kafka] as DMP_LOG 
	where 
		DMP_LOG.payload.message is not null;

카프카 이벤트에서 payload필드의 message가 not null인것을 select하는 쿼리이다.

 

set JOB_NAME='INK_TEST';
set WORKER_CNT='14';
set SPOUT_THREAD_CNT='9';
set ESPER_THREAD_CNT='9';
set LOOKUP_THREAD_CNT='9';
set OUTPUT_THREAD_CNT='18';
set COMMIT_INTERVAL='5';
set STORM_MAXSPOUTPENDING_NUM='9';
select 
    incom_date.substring(0, 10) as _DT
    ,account_id as ACCOUNTID
    ,(case when (indirect_unique_id <> null) then direct_unique_id else indirect_unique_id end) as UNIQUE_ID
    , dir_amount as DIRECTAMT
    , dir_cnt as DIRECTCNT
    , in_amount as INDIRECTAMT
    , in_cnt as INDIRECTCNT
from 
    [test:rabbitmq];

lookup 
    LOG_MKR as MKRSEQ
    , LOG_ATP as AREATYPE
from 
    [test_click:phoenix]
where
    PAYLOAD_CTSA = '[:ACCOUNTID]' AND PAYLOAD_CTSU = '[:UNIQUE_ID]';

upsert into [TEST_REPORT:phoenix](
    DT
    ,MKRSEQ
    ,AREATYPE
    ,DIRECTCNT
    ,DIRECTAMT
    ,INDIRECTCNT
    ,INDIRECTAMT
) 
increase values( 
    [:_DT]
    ,[:MKRSEQ] 
    ,'[:AREATYPE]'
    ,[:DIRECTCNT] 
    ,[:DIRECTAMT] 
    ,[:INDIRECTCNT] 
    ,[:INDIRECTAMT] 
);

 

이벤트를 조회해서, 이벤트를 DB에 저장하는 쿼리도 가능하다.