데이터 스트림_2(gstreamer, socketio, rabbitmq, kafka)

이번 글에서는 지난 1년동안 웹기반 실시간 영상 프로젝트에 참여하며 배운점을 공유하면서

프로젝트에서 쓰인 

gstreamer라는 스트리밍 플랫폼, socketio, rabbitmq와

rabbitmq와 유사한 kafka와 같은 이벤트 기반 프레임워크와 비교하겠다.

이 비교과정을 통해서

스트리밍, 비동기 프로그래밍, callback 기반 코드, 이벤트 기반 프레임워크 등에 대해  

더 깊은 이해를 할 수 있었다.


위 제품들은 아주 큰 범위의 streaming에서만 일맥상통하고

메커니즘, 지향하는 바는 제각각이기 때문에

쓰이는 분야 또한 제각각이다.





지난 1년간, 프로젝트에서 gstreamer는 주로 미디어 처리 역할을 수행하게 되었다.

gstreamer

gstreamer는 c로 작성된 객체지향형 프레임워크인 gobject를 기반으로 구성된 프레임워크이다.

gstreamer 자체는 pipeline, bin, element을 구현하기 위한 코어 모듈들

콜백 시스템 구현을 위한 각종 시그널, 버스, 메시지 등의 모듈들로 구성되었다.

그 외에 각종 plug-in들은 

gstreamer 프레임워크를 준수하며 사용 목적에 맞게 개발자들이 제작한 것이다. 


나는 그중에서도 실시간 영상 스트리밍을 위해

rtspsrc, hlssink(multifilesink의 상속), avdec, h264parse, x264enc 과 같은

인코딩, 디코딩, 파싱 모듈을 사용했으며 이는 영상처리를 위한 플러그인들이다.

파이프라인에 흘러갈 소스가 동영상이 아닌 경우에는

다른 플러그인을 사용하면 된다.


rtsp src는 적절한 연결-수립 통신 이후에는 - rtp packet을 보내게되고

이 rtp-packet에서 변환 과정을 거쳐야 영상처리를 위한 데이터를 뽑을 수 있다.

rtsp packet

rtp packet -> h264 -> x-raw (이 부분부터 raw 파일을 조작할 수 있다.)


작업했던 프로젝트에서는

gstreamer에서는 rtspsrc element이후부터는 일절 네트워크 소켓을 이용하지 않았다.

python application과 callback을 함수 호출을 통해 데이터를 넘기고, 받고, 처리할 뿐이다.


실제 컨텐츠 전송은 HLS 프로토콜을 사용했으며

Nginx를 파일 서버로 배치하여 프론트엔드와 통신하게했다.


web 생태계로 들어가는 순간 

네이티브 html5에서 동영상을 보기 위해서는

타겟 브라우저가 해당 동영상 컨테이너 포맷을 지원하는지 확인해야한다.


실시간 영상을 보여주는 것처럼 hls 프로토콜을 사용하여 갱신되게 하려면

1. 프론트엔드 서버에서도 hls.js와 같은 패키지가 필요하다.

2.반드시 서버라는 엔드포인트를 하나 더 통하게 되어있기 때문에 

인코딩-디코딩 과정에서 지연이 생긴다.


web에서 공식적인 rtsp소스-web browser와의

직접적인 연결 지원은 하지 않고 있지만

플러그인 설치를 통한 방법이나, 오픈소스등의 방법이 있다. 


파이썬 애플리케이션 기능 사용을 위해

appsrc - appsink 등의 element를 사용했다.


gmainloop이라는 루프를 사용하면

gst.message 들이 발생할 때마다 캐치하여 핸들링할 수 있다.


파이프라인의 처음부터 끝까지 전송이 끝나고 완료됐다는 메시지를 받는

req/response 형식이 아니라 

내가 작성한 파이프라인의 경우에는

rtspsrc에 들어오는 데이터 자체가 스트리밍의 동력 역할을 했다.



appsink/appsrc에서 콜백 호출을 하면서 자주 block되는 일이 많았다.

native python queue를 사용하는 과정에서

queue의 timeout 속성을 잘 활용하지못하면

전체 파이프라인이 block되기 때문이다.

개발하면서 queue size가 문제가 된 적은 없었지만

프로듀서-컨슈머의 처리 속도에 따라 적절히 조절해주어야한다.


따라서, 이런 스트리밍 프레임워크에서는

프로듀서-컨슈머가 정의되고 난 후에 두 blocking 작업 지점에 대해 잘 파악하고 있어야한다.




socektio

socketio는 프론트엔드에 event 메시지를 발송할 때 사용했던 라이브리다.

어느 서버 프레임워크에서 socketio를 돌릴 것인가?

python 기준으로, 구현 서버로 flask를 선호하지 않는 이유는

flask는 synchronous framework이기 때문이다.

python-socektio doc에도 flask보다는 비동기 프레임워크를 사용할 것을 권장하고 있다.

socketio를 사용하기 좋은 use-case는 챗봇이나 채팅 서비스인데,

사용자가 몰리게 됐을 때, 비동기 프레임워크가 응답속도가 더 좋으며 

이 특징은 더 나은 사용자 경험을 제공한다.


FE-BE의 socketio 세팅

socketio는 websocket을 완벽하게 구현한것이 아니라서

websocket client- socketio server와 완벽하게 호환되지 않을 수 있다.

구현언어가 FE-BE 서로 다른 경우, 공식 홈페이지를 참고하여 버전을 맞춰줘야한다.

websocket connection은 사용가능할 때면 언제든지 전환이 가능하지만

그렇지 않은 경우에는 HTTP polling을 사용한다.

real-time use case, streaming data use case 

와 같은 시나리오에서 패킷 구조적으로 Websocket 이 더 효율적이다.


Python에서 socketio의 asynchronous 특성을 유지하려면

python-socketio의 공식 문서중 deploy 부분을 참고하는 것이 좋다.


https://python-socketio.readthedocs.io/en/stable/server.html#standard-threads

gevent나 eventlet이 설치되어있지 않은 환경이라면

자동적으로 threading 모드로 전환한다.


python-socketio 애플리케이션은 python standard, asyncio 2개 버전으로 구현할 수 있다.








rabbitmq

mqtt를 python binding을 사용할 때에는 pyho-mqtt를 사용하고
rabbitmq를 사용할 때에는 pika를 설치하여 사용한다.

rabbitmq는 mqtt 프로토콜도 포함했으나
amqp 메시지로 파싱하여 사용하고 있었다.

Native mqtt 업데이트 이후로는
성능 개선이 되어 amqp 메시지로의 파싱없이 사용할  수 있다고한다.

IoT 센서와 통신할 때 성능 향상을 기대해볼 수 있을 것이다.

rabbitmq는 erlang으로 작성되었고 배포를 위해서는
Erlang도 설치해줘야한다.


프로젝트에서는 Persist-message 기능을 통해
위험 이벤트 메시지를 모아서 DB에 기록하는 서비스로 쓰였다.

rabbitmq 브로커는 kafka 브로커와 달리 route, filter 역할만 수행한다.

장기적으로 저장을 위해서는 별도의 storage service가 필요하다는 것이
kafak의 차이라고 볼 수 있다.



kafka

3세대 데이터 스트리밍 시스템이다.
이전 세대의 로그 모델이 가지고 있던 신뢰성과RPC모델을 결합하여
중간계층을 구현한다.

순서 의미 체계를 버리면서 분산 시스템을 유지하고 신뢰성을 보장할 수 있다.
Kafka는 디스크에 로그를 쓰는 기능을 built-in 차원에서 지원한다.
Kafka 브로커를 통해 메시지는 로그로 확장되어 파티셔닝, 복제된 형태의 저장소에 저장된다.


retention 옵션에 따라 메시지가 저장될 기간을 지정할 수 있으며 사용자가 읽을 수도 있다.


( Kafka를 쓴다고 단순히 메시지들의 순서가 보장되지 않는 것이 아님.
추가적인 데이터와 구현을 통해 메시지를 다시 시간순으로 정렬할 수도 있고
Single partition을 쓰는 경우 순서를 보장함.

다만 용어에서 암시하듯이 병렬성이나 로드밸런싱 효과는 얻을 수 없음.
멀티 파티션과 trade off 관계임.

kafka에 대한 특성은 다른 포스팅에서 )







gstreamer, rabbitmq, kafka는 공통적으로
내부적으로 구현된 Loop, Queue, Event 객체를 사용하고 있었다.

큐 패러다임은 메시지 버스의 개발을 가능하게 했고
서로 다른 구성 요소에 느슨하게 결합되게했다.
단점으로 큐의 의미체계를 유지하고자 
큐에 삽입되는 순서를
소비자에게 전달하는 순서와 일치시켰다.

위 순서를 유지하는 것은 분산 시스템에서는 불필요한 작업이다.
분산 시스템에 도입한다면 
메시지에 마킹된 정보에 따라 적합한 처리기로 라우팅하기 때문이다.
(때문에 kafka가 순서 유지체계를 거의 버리다 시피했다.)

순서가 필요하다면 
클라이언트에서 seq number와 같은 정보를 참조하여
재배열하여 처리하는 방법을 많이 쓰고 있다.




socketio는 내부적으로 직접 Queue를 구현하여 쓰고있지 않았지만
" there is no message queue stored for a given ID on the server "
websocket으로 전환된 후에 buffer에 enqueue하는 메서드가 있다.
( 브라우저의 websocket용으로 할당된 buffer 메모리 공간을 쓰는 것이 아닐까 유추해본다 )


Loop에 해당하는 작업은 socketio가 올라간 서버에서 수행한다.
이 Loop에서 event가 발생했는지 검사한다.



socketio의 asynchronous feature를 완벽하게 사용하기 위해서는
socketio 가 작동하는 서버에서부터 asynchronous handling이 필요하다.







댓글

이 블로그의 인기 게시물

실무진 면접 경험으로 정리하는 백엔드 (1) : 에듀 테크 기업 면접

노마드코더 개발자북클럽 Clean code 완주, 독후감

Blogger 커스터마이징 : CSS 수정 (sticky-header)