103 lines
3.7 KiB
Python
103 lines
3.7 KiB
Python
import sqlite3
|
|
from plugins import BaseModule
|
|
import time
|
|
|
|
class Storage(BaseModule):
|
|
def __init__(self, params):
|
|
super().__init__(params)
|
|
self.db_path = None #数据库地址
|
|
self.input_queue = None #输入队列
|
|
self.manage_queue = None #删除任务队列
|
|
self.data_queue = None
|
|
self.commit_interval = params.get('commit_interval', 1.0) #提交间隔
|
|
self.batch_size = params.get('batch_size', 100) #缓存最大长度
|
|
self.conn = None #连接
|
|
self.cursor = None #游标
|
|
|
|
def set_input_queue(self, queue): #设置输入队列的外部接口
|
|
self.input_queue = queue
|
|
def set_manage_queue(self,queue):
|
|
self.manage_queue = queue
|
|
def set_data_queue(self,queue):
|
|
self.data_queue = queue
|
|
|
|
|
|
def setup_database(self): #设置数据库
|
|
self.conn = sqlite3.connect(self.db_path)
|
|
self.cursor = self.conn.cursor()
|
|
self.cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS queue_data (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp INTEGER NOT NULL,
|
|
data TEXT NOT NULL
|
|
);
|
|
""")
|
|
self.conn.commit()
|
|
self.cursor.execute("CREATE INDEX IF NOT EXISTS idx_timestamp ON queue_data (timestamp);")
|
|
self.conn.commit()
|
|
|
|
def run(self): #主程序
|
|
self.setup_database()
|
|
self.is_running = True
|
|
data_buffer = []
|
|
last_commit_time = time.time()
|
|
while self.is_running:
|
|
try:
|
|
line = self.input_queue.get(timeout=0.1)
|
|
timestamp_ns = int(line.split(' ')[-1])
|
|
data_buffer.append((timestamp_ns, line))
|
|
except Exception:
|
|
pass
|
|
try:
|
|
manage = self.manage_queue.get(timeout=0)
|
|
command,data = manage
|
|
if command == 'delete':
|
|
self.delete_batch(data)
|
|
elif command == 'get':
|
|
self.get_batch(data)
|
|
except Exception:
|
|
pass
|
|
|
|
current_time = time.time()
|
|
if data_buffer and (len(data_buffer) >= self.batch_size or
|
|
current_time - last_commit_time >= self.commit_interval):
|
|
self.commit_batch(data_buffer)
|
|
data_buffer = []
|
|
last_commit_time = current_time
|
|
|
|
if data_buffer:
|
|
self.commit_batch(data_buffer)
|
|
self.conn.close()
|
|
|
|
def commit_batch(self, batch_data): #提交事务
|
|
try:
|
|
self.conn.execute("BEGIN TRANSACTION")
|
|
sql = "INSERT INTO queue_data (timestamp, data) VALUES (?, ?)"
|
|
self.cursor.executemany(sql, batch_data)
|
|
self.conn.commit()
|
|
except sqlite3.Error as e:
|
|
self.conn.rollback()
|
|
|
|
def delete_batch(self, ids): #提交删除事务
|
|
if not ids:
|
|
return
|
|
placeholders = ','.join(['?'] * len(ids))
|
|
sql = f"DELETE FROM queue_data WHERE id IN ({placeholders})"
|
|
try:
|
|
self.conn.execute("BEGIN TRANSACTION")
|
|
self.cursor.execute(sql, ids)
|
|
self.conn.commit()
|
|
except sqlite3.Error as e:
|
|
self.conn.rollback()
|
|
|
|
def get_batch(self, batch_size):
|
|
try:
|
|
self.cursor.execute("""
|
|
SELECT id, data
|
|
FROM queue_data
|
|
ORDER BY id ASC LIMIT ?
|
|
""", (batch_size,))
|
|
results = self.cursor.fetchall()
|
|
self.data_queue.put(results)
|
|
except sqlite3.Error as e:
|
|
pass |