-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathbiliLiveBroadcaster.py
230 lines (162 loc) · 5.53 KB
/
biliLiveBroadcaster.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
import websocket
import brotli
import time
import rel
from threading import Thread
from threading import Lock
import json
import requests
from functools import partial
import timeit
#获取真实房间号
def _getRealRoomId(roomId):
roomInfo = requests.get("https://api.live.bilibili.com/room/v1/Room/room_init?id=" + str(roomId)).json()
realRoomId = roomInfo["data"]["room_id"]
return realRoomId
#获取key
def _getKey(realRoomId):
keyInfo = requests.get("https://api.live.bilibili.com/room/v1/Danmu/getConf?room_id=" + str(realRoomId) + "&platform=pc&player=web").json()
key = keyInfo["data"]["token"]
return key
#转换byte array为json dictionary
def _raw2Json(raw):
jsonList = []
bracketPair = 0
startPosition = 0
n = 0
while (n < len(raw)):
#检测左括号
if raw[n:n+1] == b"{":
if bracketPair == 0: #此处为起始
startPosition = n #标记
bracketPair += 1
#检测右括号
elif raw[n:n+1] == b"}":
bracketPair -= 1
if bracketPair == 0: #此处为末尾
jsonList.append(json.loads(raw[startPosition:n+1])) #截取段落
n += 16 #跳过两个段落之间的乱码
n += 1
return jsonList #输出
#解读处理好的数据
def _interpreteJson(data, onReceiveDanmu, onAudienceEnter, giftStat):
for info in data:
try:
match info["cmd"]:
#弹幕
case "DANMU_MSG":
speaker = info["info"][2][1]
content = info["info"][1]
onReceiveDanmu(speaker, content)
#礼物
case "SEND_GIFT":
sender = info["data"]["uname"]
quantity = info["data"]["num"]
giftName = info["data"]["giftName"]
giftStat.add(sender, giftName, quantity)
#进入直播间
case "INTERACT_WORD":
audience = info["data"]["uname"]
onAudienceEnter(audience)
except: #无关信息
pass
#收到数据
def _onMessage(onReceiveDanmu, onAudienceEnter, giftStat, ws, message):
if message[7] == 3: #用brotli解压缩
rawData = brotli.decompress(message[16:])[16:]
elif message[7] == 0: #无需解压缩
rawData = message[16:]
else: #与直播间内容无关
return
try:
data = _raw2Json(rawData) #转换为json词典
except:
print(rawData)
#解读数据
_interpreteJson(data, onReceiveDanmu, onAudienceEnter, giftStat)
#处理错误
def _onError(ws, error):
print("出现错误")
print(error)
#断开连接
def _onClose(ws, close_status_code, close_msg):
print("已断开连接")
#发送心跳包
def _sendHeartBeat(ws):
while(True):
#每30秒发送一次
time.sleep(30)
ws.send(bytearray.fromhex("0000001f0010000100000002000000015b6f626a656374204f626a6563745d"))
#统计收到的礼物
def _collectGiftReceived(giftStat, onReceiveGift):
while(True):
time.sleep(1)
#统计
giftList = giftStat.extractData()
for gift in giftList:
onReceiveGift(gift[0], gift[2], gift[1]) #用户自定义函数
#已连接上
def _onOpen(realRoomId, key, giftStat, onReceiveGift, ws):
#编辑确认信息
verification = b'{"uid":0,"roomid":' + bytes(str(realRoomId), "utf-8") + b',"protover":3,"platform":"web","type":2,"key":"' + bytes(key, "utf-8") + b'"}'
dataToSend = (len(verification)+16).to_bytes(4, "big") + bytearray.fromhex("001000010000000700000001") + verification
#发送确认信息
ws.send(dataToSend)
#开启心跳包定时
Thread(target=_sendHeartBeat, args=(ws,)).start()
#定时统计收到的礼物
Thread(target=_collectGiftReceived, args=(giftStat, onReceiveGift)).start()
print("已连接")
#收到的礼物列表
class _giftInfoArray:
def __init__(self):
self.__data = [] #所有收到的礼物
#向列表中添加礼物
def add(self, sender, giftName, quantity):
#寻找相同用户赠送的相同礼物并叠加
for i in range(0, len(self.__data)):
if(self.__data[i][0:2] == [sender, giftName]):
self.__data[i][2] += quantity
self.__data[i][3] = timeit.default_timer()
return
#否则新建一个元素
self.__data.append([sender, giftName, quantity, timeit.default_timer()])
#提取数据
def extractData(self):
currentTime = timeit.default_timer()
listToReturn = []
for info in self.__data:
#超过3秒未赠送同样的礼物(连击停止)
if(currentTime - info[3] > 3):
listToReturn.append(info)
self.__data.remove(info)
return listToReturn
class biliLiveBroadcaster:
def __init__(self, roomId, onReceiveDanmu, onReceiveGift, onAudienceEnter):
self.__giftStat = _giftInfoArray()
self.__roomId = roomId
self.__onReceiveDanmu = onReceiveDanmu
self.__onReceiveGift = onReceiveGift
self.__onAudienceEnter = onAudienceEnter
def startBroadcasting(self):
print("连接中")
#获取真实房间号和key
self.__realRoomId = _getRealRoomId(self.__roomId)
self.__key = _getKey(self.__realRoomId)
#开启连接
websocket.enableTrace(False)
ws = websocket.WebSocketApp("wss://tx-sh-live-comet-01.chat.bilibili.com/sub",
on_open = partial(_onOpen,
self.__realRoomId,
self.__key,
self.__giftStat,
self.__onReceiveGift),
on_message = partial(_onMessage,
self.__onReceiveDanmu,
self.__onAudienceEnter,
self.__giftStat),
on_error = _onError,
on_close = _onClose)
ws.run_forever(dispatcher = rel)
rel.signal(2, rel.abort)
rel.dispatch()