在K8S环境中构建基于python flask架构的websocket
业务上需要搭建一个能推送消息的架构,现有的服务端是基于python-flask构建的。
代码部分
服务端
服务端python需要集成socketio,参考这里https://github.com/miguelgrinberg/python-socketio/blob/main/examples/server/wsgi/app.py
但是需要记得async_mode改成gevent。
前端
前端测试的代码, 需要替换一下Server IP。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
| <!DOCTYPE HTML> <html> <head> <title>Flask-SocketIO Test</title> <script type="text/javascript" src="//code.jquery.com/jquery-2.1.4.min.js"></script> <script type="text/javascript" src="//cdnjs.cloudflare.com/ajax/libs/socket.io/3.0.3/socket.io.min.js"></script> <script type="text/javascript" charset="utf-8"> $(document).ready(function(){ var socket = io.connect("http://<Server IP>:5000/");
socket.on('connect', function() { socket.emit('my_event', {data: 'I\'m connected!'}); }); socket.on('disconnect', function() { $('#log').append('<br>Disconnected'); }); socket.on('nuke_response', function(msg) { $('#log').append('<br>Received: ' + msg.data); });
$('form#emit').submit(function(event) { socket.emit('my_event', {data: $('#emit_data').val()}); return false; }); $('form#broadcast').submit(function(event) { socket.emit('my_broadcast_event', {data: $('#broadcast_data').val()}); return false; }); $('form#join').submit(function(event) { socket.emit('join', {room: $('#join_room').val()}); return false; }); $('form#leave').submit(function(event) { socket.emit('leave', {room: $('#leave_room').val()}); return false; }); $('form#send_room').submit(function(event) { socket.emit('my_room_event', {room: $('#room_name').val(), data: $('#room_data').val()}); return false; }); $('form#close').submit(function(event) { socket.emit('close_room', {room: $('#close_room').val()}); return false; }); $('form#disconnect').submit(function(event) { socket.emit('disconnect_request'); return false; }); }); </script> </head> <body> <h1>Flask-SocketIO Test</h1> <h2>Send:</h2> <form id="emit" method="POST" action='#'> <input type="text" name="emit_data" id="emit_data" placeholder="Message"> <input type="submit" value="Echo"> </form> <form id="broadcast" method="POST" action='#'> <input type="text" name="broadcast_data" id="broadcast_data" placeholder="Message"> <input type="submit" value="Broadcast"> </form> <form id="join" method="POST" action='#'> <input type="text" name="join_room" id="join_room" placeholder="Room Name"> <input type="submit" value="Join Room"> </form> <form id="leave" method="POST" action='#'> <input type="text" name="leave_room" id="leave_room" placeholder="Room Name"> <input type="submit" value="Leave Room"> </form> <form id="send_room" method="POST" action='#'> <input type="text" name="room_name" id="room_name" placeholder="Room Name"> <input type="text" name="room_data" id="room_data" placeholder="Message"> <input type="submit" value="Send to Room"> </form> <form id="close" method="POST" action="#"> <input type="text" name="close_room" id="close_room" placeholder="Room Name"> <input type="submit" value="Close Room"> </form> <form id="disconnect" method="POST" action="#"> <input type="submit" value="Disconnect"> </form> <h2>Receive:</h2> <div><p id="log"></p></div> </body> </html>
|
这样就基本完成了正常环境下的websocket下的代码部分的功能。
架构部分
架构图
如图所示,几条重点:
- 系统发布在一个K8S环境中,一个application的deployment里包含三个pod和一个service。
- K8S Sevice用NodePort的方式进行服务暴漏。
- K8S系统外面是用一个HAProxy进行代理,域名解析到HAProxy所在的虚拟机的IP上。
- HAProxy和K8S service形成了两层的LoadBalance。
目的是让客户端Client A, B, C访问到一个POD后,以后就一直绑定到这个pod上。
需要改动的配置:
- HAProxy的balance策略改成source
这样就让HAProxy进行转发的时候根据客户端ip进行选择目的IP。
- K8S Service的配置里增加sessionAffinity: ClientIP
K8S Service进行转发的时候根据客户端ip进行选择目的IP
- K8S Service的配置里增加externalTrafficPolicy: Local
如果不加这一条,那么所有的ClientIP都会被认为是来自HAProxy的那个IP,加上这个配置后,会根据请求的header里的X-Forwarded-For里的客户端IP进行判断。
消息队列
正常情况下,客户端A和B链接到第一个pod,客户端C链接到第三个pod,如果有一个时间发生在第二个POD,或者Job Pod上,是无法直接发送消息给所有的pod上的。
所以需要一个消息总线,可以选择Redis/MQ/kube-event来实现,三个API pod侦听消息队列的某个时间,所有需要发送给客户的消息直接发送给消息队列,然后消息队列转发给三个API pod,
API pod收到消息队列的提醒后,然后推送给自己链接的客户端,这样就完成了一个整体回路。