-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathvideo_scene_monitor.py
148 lines (136 loc) · 7.31 KB
/
video_scene_monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import asyncio
import time
from typing import List
from PIL import Image
from io import BytesIO
import base64
from llm.qwen2_vl_http_server_cli import send_message_to_server
from llm.llm_base import Message, MessageRole, KEEP_SILENT_RESPONSE
import logging
from common.config import config
class VideoSceneMonitor:
def __init__(self, interval, send_message_callback, send_to_server_callback, enable_self_reaction: bool = False, min_interval: float = 30):
self.interval = interval
self.min_interval = min_interval
self.send_message_callback = send_message_callback
self.send_to_server_callback = send_to_server_callback
self.last_scene_description = ""
self.last_check_time = 0
self.monitoring = False
self.imgs: List[Image] = [] # type: ignore
self.max_imgs = 3
self.has_new_img:bool = False
self.last_sentences = []
self.max_sentences = 6
self.immediate_check_event = asyncio.Event()
self.enalbe_self_reaction = enable_self_reaction
self.last_add_question = ''
self._add_question = ''
self.last_response_str = ''
def add_sentence(self, sentence: str, from_user: bool = True, max_len: int = 256):
sentence = sentence[:max_len]
if from_user:
self.last_sentences.append(f"{sentence}")
else:
self.last_sentences.append(f"Assistant: {sentence}")
if len(self.last_sentences) > self.max_sentences:
self.last_sentences.pop(0)
async def start_monitoring(self):
self.monitoring = True
logging.info("VideoSceneMonitor start monitoring")
while self.monitoring:
try:
await asyncio.wait_for(self.immediate_check_event.wait(), timeout=self.interval)
except asyncio.TimeoutError:
self._add_question = ''
finally:
self.immediate_check_event.clear()
await self.check_scene(self._add_question)
logging.info("VideoSceneMonitor monitoring stopped")
def check_scene_immediately(self, add_question: str = ''):
self._add_question = add_question
if self.monitoring:
logging.info("check scene immediately, add question: %s", add_question)
self.immediate_check_event.set()
def stop_monitoring(self):
self.monitoring = False
async def check_scene(self, add_question: str = ''):
current_time = time.time()
if current_time - self.last_check_time < self.min_interval:
logging.info(f"Scene check too frequent, skip, last check time: {self.last_check_time}, current time: {current_time}, min interval: {self.min_interval}")
return
self.last_check_time = current_time
scene_description = await self.analyze_scene(add_question)
if scene_description is None:
logging.error("Scene description is None")
return
if KEEP_SILENT_RESPONSE in scene_description:
logging.info("Silent response")
return
if scene_description == self.last_scene_description:
logging.info("Same scene description")
return
self.last_scene_description = scene_description
# 如果scene_description以talk:开头,则直接发送给用户
if scene_description.startswith("talk:"):
logging.info("Scene changed or anomaly detected, talk to user")
await self.send_message_callback(f"{scene_description[5:]}", with_tts=True)
return
else:
if scene_description.startswith("description:"):
scene_description = scene_description[11:]
logging.info("Scene changed or anomaly detected, describe to other assistant")
await self.send_message_callback(f"[其他assistant看到场景的描述]:({scene_description})", with_tts=False, save_to_redis=True)
async def analyze_scene(self, add_question: str = ''):
if not self.has_new_img and add_question==self.last_add_question:
return self.last_response_str
if len(self.imgs) == 0:
return ""
self.last_add_question = add_question
if self.enalbe_self_reaction:
system_prompt = "假设你是一个智能助理(Assistant),可以通过摄像头截图看到当前的场景,并用文字或语音的方式和用户、其他Assistant进行沟通。"
else:
system_prompt = "假设你是一个智能助理(Assistant),可以通过摄像头截图看到当前的场景,请根据对话内容描述当前场景,以帮助其他Assistant理解当前的场景。"
system_prompt += "用户可能有多个人,请以[user_n]来区分说话的人。"
if self.enalbe_self_reaction:
system_prompt += "首先考虑根据对话内容给其他Assistant描述场景,以帮助其他Assistant理解当前的场景。"
system_prompt += "如果发现任何异常的、紧急的、有趣的、有疑问的情况,请用简短的口语告知或询问用户;否则请描述和对话相关的场景。"
system_prompt += f"请注意回复格式必须为以下二种之一:\ntalk: 说给用户的话\ndescription: 说给其他Assistant的场景描述。"
else:
system_prompt += "请注意,你不需要直接回答问题,而是需要把你看到的和对话相关的内容描述给其他Assistant。"
# system_prompt += "如果考虑后不应该打扰用户,或者不应该参与用户之间的对话,或者想说的内容与之前的雷同,仅回复:"+KEEP_SILENT_RESPONSE+"\n"
system_message = Message(role=MessageRole.system, content=system_prompt)
user_prompt = ""
image_contents = [
]
if self.enalbe_self_reaction:
if len(self.last_sentences) > 0:
for sentence in self.last_sentences:
user_prompt += f"{sentence}\n"
if add_question:
user_prompt += f"{add_question}\n"
for _, img in enumerate(self.imgs):
buffered = BytesIO()
img.save(buffered, format="JPEG")
img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8')
image_contents.append({"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{img_base64}"}})
self.has_new_img = False
if self.enalbe_self_reaction:
user_prompt += "现在请以Assitance的身份,以talk或description开始回复:"
if user_prompt != "":
image_contents.append({"type": "text", "text": user_prompt})
logging.debug(f"Scene analysis start with system prompt: {system_prompt}, user prompt: {user_prompt}")
user_message = Message(role=MessageRole.user, content=image_contents)
response_str = await self.send_to_server_callback([system_message, user_message], model=config.llm.openai_custom_mm_model)
logging.info(f"Scene analysis: {response_str}, message count: {len(image_contents)}")
self.last_response_str = response_str
return response_str
def get_current_frame(self) -> Image:
if len(self.imgs) == 0:
return None
return self.imgs[-1]
def add_frame(self, img: Image):
self.imgs.append(img)
self.has_new_img = True
if len(self.imgs) > self.max_imgs:
self.imgs.pop(0)