在K8S环境中构建基于python flask架构的websocket

业务上需要搭建一个能推送消息的架构,现有的服务端是基于python-flask构建的。

代码部分

服务端

服务端python需要集成socketio,参考这里https://github.com/miguelgrinberg/python-socketio/blob/main/examples/server/wsgi/app.py
但是需要记得async_mode改成gevent。

前端

前端测试的代码, 需要替换一下Server IP。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
<!DOCTYPE HTML>
<html>
<head>
<title>Flask-SocketIO Test</title>
<script type="text/javascript" src="//code.jquery.com/jquery-2.1.4.min.js"></script>
<script type="text/javascript" src="//cdnjs.cloudflare.com/ajax/libs/socket.io/3.0.3/socket.io.min.js"></script>
<script type="text/javascript" charset="utf-8">
$(document).ready(function(){
var socket = io.connect("http://<Server IP>:5000/");

socket.on('connect', function() {
socket.emit('my_event', {data: 'I\'m connected!'});
});
socket.on('disconnect', function() {
$('#log').append('<br>Disconnected');
});
socket.on('nuke_response', function(msg) {
$('#log').append('<br>Received: ' + msg.data);
});

// event handler for server sent data
// the data is displayed in the "Received" section of the page
// handlers for the different forms in the page
// these send data to the server in a variety of ways
$('form#emit').submit(function(event) {
socket.emit('my_event', {data: $('#emit_data').val()});
return false;
});
$('form#broadcast').submit(function(event) {
socket.emit('my_broadcast_event', {data: $('#broadcast_data').val()});
return false;
});
$('form#join').submit(function(event) {
socket.emit('join', {room: $('#join_room').val()});
return false;
});
$('form#leave').submit(function(event) {
socket.emit('leave', {room: $('#leave_room').val()});
return false;
});
$('form#send_room').submit(function(event) {
socket.emit('my_room_event', {room: $('#room_name').val(), data: $('#room_data').val()});
return false;
});
$('form#close').submit(function(event) {
socket.emit('close_room', {room: $('#close_room').val()});
return false;
});
$('form#disconnect').submit(function(event) {
socket.emit('disconnect_request');
return false;
});
});
</script>
</head>
<body>
<h1>Flask-SocketIO Test</h1>
<h2>Send:</h2>
<form id="emit" method="POST" action='#'>
<input type="text" name="emit_data" id="emit_data" placeholder="Message">
<input type="submit" value="Echo">
</form>
<form id="broadcast" method="POST" action='#'>
<input type="text" name="broadcast_data" id="broadcast_data" placeholder="Message">
<input type="submit" value="Broadcast">
</form>
<form id="join" method="POST" action='#'>
<input type="text" name="join_room" id="join_room" placeholder="Room Name">
<input type="submit" value="Join Room">
</form>
<form id="leave" method="POST" action='#'>
<input type="text" name="leave_room" id="leave_room" placeholder="Room Name">
<input type="submit" value="Leave Room">
</form>
<form id="send_room" method="POST" action='#'>
<input type="text" name="room_name" id="room_name" placeholder="Room Name">
<input type="text" name="room_data" id="room_data" placeholder="Message">
<input type="submit" value="Send to Room">
</form>
<form id="close" method="POST" action="#">
<input type="text" name="close_room" id="close_room" placeholder="Room Name">
<input type="submit" value="Close Room">
</form>
<form id="disconnect" method="POST" action="#">
<input type="submit" value="Disconnect">
</form>
<h2>Receive:</h2>
<div><p id="log"></p></div>
</body>
</html>

这样就基本完成了正常环境下的websocket下的代码部分的功能。

架构部分

架构图

arch

如图所示,几条重点:

  1. 系统发布在一个K8S环境中,一个application的deployment里包含三个pod和一个service。
  2. K8S Sevice用NodePort的方式进行服务暴漏。
  3. K8S系统外面是用一个HAProxy进行代理,域名解析到HAProxy所在的虚拟机的IP上。
  4. HAProxy和K8S service形成了两层的LoadBalance。

目的是让客户端Client A, B, C访问到一个POD后,以后就一直绑定到这个pod上。
需要改动的配置:

  1. HAProxy的balance策略改成source
    这样就让HAProxy进行转发的时候根据客户端ip进行选择目的IP。
  2. K8S Service的配置里增加sessionAffinity: ClientIP
    K8S Service进行转发的时候根据客户端ip进行选择目的IP
  3. K8S Service的配置里增加externalTrafficPolicy: Local
    如果不加这一条,那么所有的ClientIP都会被认为是来自HAProxy的那个IP,加上这个配置后,会根据请求的header里的X-Forwarded-For里的客户端IP进行判断。

消息队列

正常情况下,客户端A和B链接到第一个pod,客户端C链接到第三个pod,如果有一个时间发生在第二个POD,或者Job Pod上,是无法直接发送消息给所有的pod上的。
所以需要一个消息总线,可以选择Redis/MQ/kube-event来实现,三个API pod侦听消息队列的某个时间,所有需要发送给客户的消息直接发送给消息队列,然后消息队列转发给三个API pod,
API pod收到消息队列的提醒后,然后推送给自己链接的客户端,这样就完成了一个整体回路。

Notice: 正常情况下,这里会有一个基于utteranc.es的留言系统,如果看不到,可能需要科学上网方式。