import sqlite3 from plugins import BaseModule import time class Storage(BaseModule): def __init__(self, params): super().__init__(params) self.db_path = None #数据库地址 self.input_queue = None #输入队列 self.manage_queue = None #删除任务队列 self.data_queue = None self.commit_interval = params.get('commit_interval', 1.0) #提交间隔 self.batch_size = params.get('batch_size', 100) #缓存最大长度 self.conn = None #连接 self.cursor = None #游标 def set_input_queue(self, queue): #设置输入队列的外部接口 self.input_queue = queue def set_manage_queue(self,queue): self.manage_queue = queue def set_data_queue(self,queue): self.data_queue = queue def setup_database(self): #设置数据库 self.conn = sqlite3.connect(self.db_path) self.cursor = self.conn.cursor() self.cursor.execute(""" CREATE TABLE IF NOT EXISTS queue_data ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp INTEGER NOT NULL, data TEXT NOT NULL ); """) self.conn.commit() self.cursor.execute("CREATE INDEX IF NOT EXISTS idx_timestamp ON queue_data (timestamp);") self.conn.commit() def run(self): #主程序 self.setup_database() self.is_running = True data_buffer = [] last_commit_time = time.time() while self.is_running: try: line = self.input_queue.get(timeout=0.1) timestamp_ns = int(line.split(' ')[-1]) data_buffer.append((timestamp_ns, line)) except Exception: pass try: manage = self.manage_queue.get(timeout=0) command,data = manage if command == 'delete': self.delete_batch(data) elif command == 'get': self.get_batch(data) except Exception: pass current_time = time.time() if data_buffer and (len(data_buffer) >= self.batch_size or current_time - last_commit_time >= self.commit_interval): self.commit_batch(data_buffer) data_buffer = [] last_commit_time = current_time if data_buffer: self.commit_batch(data_buffer) self.conn.close() def commit_batch(self, batch_data): #提交事务 try: self.conn.execute("BEGIN TRANSACTION") sql = "INSERT INTO queue_data (timestamp, data) VALUES (?, ?)" self.cursor.executemany(sql, batch_data) self.conn.commit() except sqlite3.Error as e: self.conn.rollback() def delete_batch(self, ids): #提交删除事务 if not ids: return placeholders = ','.join(['?'] * len(ids)) sql = f"DELETE FROM queue_data WHERE id IN ({placeholders})" try: self.conn.execute("BEGIN TRANSACTION") self.cursor.execute(sql, ids) self.conn.commit() except sqlite3.Error as e: self.conn.rollback() def get_batch(self, batch_size): try: self.cursor.execute(""" SELECT id, data FROM queue_data ORDER BY id ASC LIMIT ? """, (batch_size,)) results = self.cursor.fetchall() self.data_queue.put(results) except sqlite3.Error as e: pass