import json
import urllib.parse
from ai.deep_speek import maxai
from collections import deque
apkey = 'application-24f27892e5f9da86d669d842c1c68254'
# apkey = 'application-80adf7e3384c1c9bf8b7a50e645dfc67'# 测试AI
c1 = maxai(new_chat=True, auth=apkey)
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer,SimpleHTTPRequestHandler
import sqlite3
import sqlite3
def create_table(db_name):
"""创建表"""
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS my_table (
id INTEGER PRIMARY KEY AUTOINCREMENT,
value TEXT NOT NULL
)
''')
conn.commit()
cursor.close()
conn.close()
def add_record(db_name, value):
"""向表中添加记录"""
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
try:
cursor.execute("INSERT INTO my_table (value) VALUES (?)", (value,))
conn.commit() # 提交事务
print(f"Record added: {value}")
except sqlite3.Error as e:
print(f"An error occurred while adding record: {e}")
finally:
cursor.close()
conn.close()
def get_and_delete_min_id_value(db_name):
"""获取并删除 id 最小的记录"""
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
try:
cursor.execute("SELECT id, value FROM my_table ORDER BY id ASC LIMIT 1")
row = cursor.fetchone()
if row:
min_id, value = row
print(f"Retrieved value: {value}")
# 删除该记录
cursor.execute("DELETE FROM my_table WHERE id = ?", (min_id,))
conn.commit() # 提交事务
return value
else:
# print("No records found.")
return None
except sqlite3.Error as e:
print(f"An error occurred: {e}")
return None
finally:
cursor.close()
conn.close()
class SSEHandler(BaseHTTPRequestHandler):
# # 关闭日志
def log_message(self, format, *args):
pass
def do_POST(self):
print("Received POST request") # 添加日志输出
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
decoded_str = urllib.parse.unquote(post_data.decode('utf-8'))
add_record(db_name, decoded_str) # "🐕测试用例设计:"+
print("Received message:", decoded_str)
response = ''
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Cache-Control', 'no-cache')
self.end_headers()
self.wfile.write(response.encode('utf-8'))
def do_GET(self):
self.send_response(200)
self.send_header("Content-type", "text/html; charset=utf-8")
self.send_header('Cache-Control', 'no-cache')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
value = get_and_delete_min_id_value(db_name)
if value:
response = c1.webchat(value)
if response.status_code == 200:
# 设置响应编码为UTF-8
response.encoding = 'utf-8'
# 逐行读取响应内容
for line in response.iter_lines(decode_unicode=True):
if line:
event_data = json.loads(line[5:])
if event_data['is_end'] == False:
text = event_data['content']
if text:
print(text, end='',flush=True)
self.wfile.write(text.encode('utf-8'))
def run(server_class=ThreadingHTTPServer, handler_class=SSEHandler, port=8000):
server_address = ('', port)
httpd = server_class(server_address, handler_class)
print(f'Starting httpd server on port {port}...')
httpd.serve_forever()
if __name__ == '__main__':
db_name = 'ai'
create_table(db_name)
run()