commit 38e371fb1db8ae332b65bd0bd69ea68977189433 Author: zhaoxi Date: Wed Nov 12 14:55:52 2025 +0000 缓存模块和采集模块 diff --git a/agent_core/__init__.py b/agent_core/__init__.py new file mode 100644 index 0000000..1169c34 --- /dev/null +++ b/agent_core/__init__.py @@ -0,0 +1 @@ +from dispatcher import Dispatcher \ No newline at end of file diff --git a/agent_core/dispatcher.py b/agent_core/dispatcher.py new file mode 100644 index 0000000..d672c03 --- /dev/null +++ b/agent_core/dispatcher.py @@ -0,0 +1,49 @@ +import importlib +from multiprocessing import Process, Queue + +class Dispatcher: + def __init__(self, config): + self.config = config + self.plugins = config['server']['plugins'] + self.collector_to_storage_queue = Queue() + self.manage_queue = Queue() + self.get_queue = Queue() + + def start_plugins(self): + self.processes = [] + + # storage模块 + storage_config = self.config['modules']['storage'][0] + storage_instance = self.get_module_instance(storage_config) + storage_instance.set_input_queue(self.collector_to_storage_queue) + storage_instance.set_manage_queue(self.manage_queue) + storage_instance.set_get_queue(self.get_queue) + storage_process = Process(target=storage_instance.run, name=storage_config['name']) + self.processes.append(storage_process) + storage_process.start() + + # collector模块 + for collector_config in self.config['modules']['collector']: + collector_instance = self.get_module_instance(collector_config) + collector_instance.set_output_queue(self.collector_to_storage_queue) + collector_process = Process(target=collector_instance.run, name=collector_config['name']) + self.processes.append(collector_process) + collector_process.start() + + # sender模块 + sender_config = self.config['modules']['sender'][0] + sender_instance = self.get_module_instance(sender_config) + sender_instance.set_manage_queue(self.manage_queue) + sender_process = Process(target=sender_instance.run, name=sender_config['name']) + self.processes.append(sender_process) + sender_process.start() + + def get_module_instance(self, module_config): + module = importlib.import_module(module_config['path']) + ModuleClass = getattr(module, module_config['name']) + return ModuleClass(module_config['params']) + + def run(self): + self.start_module_processes() + for p in self.processes: + p.join() diff --git a/config.yml b/config.yml new file mode 100644 index 0000000..dff1f5b --- /dev/null +++ b/config.yml @@ -0,0 +1,25 @@ +server: + version: 1.0 + core-ip: 0:0:0:0 + interval: 10 + plugins: + - collector + - storage + - sender + +plugins: + collector: + - "SystemCollector": + name: SystemCollector + path: plugins.collector.systemcollector + params: + + storage: + name: + path: + params: + + sender: + name: + path: + params: diff --git a/main.py b/main.py new file mode 100644 index 0000000..75ffbb5 --- /dev/null +++ b/main.py @@ -0,0 +1,14 @@ +import yaml +from agent_core import Dispatcher +def main(): + with open('config.yml', 'r') as f: + config = yaml.safe_load(f.read()) + dispatcher = Dispatcher(config) + dispatcher.run() + +def test(): + with open('config.yml', 'r') as f: + config = yaml.safe_load(f.read()) + +if __name__ == '__main__': + test() \ No newline at end of file diff --git a/plugins/__init__.py b/plugins/__init__.py new file mode 100644 index 0000000..94d6024 --- /dev/null +++ b/plugins/__init__.py @@ -0,0 +1 @@ +from base import BaseModule \ No newline at end of file diff --git a/plugins/base.py b/plugins/base.py new file mode 100644 index 0000000..5933e25 --- /dev/null +++ b/plugins/base.py @@ -0,0 +1,10 @@ +class BaseModule: + def __init__(self, params): + self.is_running = False + self.config = params + + def run(self): + pass + + def stop(self): + self.is_running = False \ No newline at end of file diff --git a/plugins/collector/__init__.py b/plugins/collector/__init__.py new file mode 100644 index 0000000..9920804 --- /dev/null +++ b/plugins/collector/__init__.py @@ -0,0 +1 @@ +from systemcollector import SystemMetricsCollector \ No newline at end of file diff --git a/plugins/collector/systemcollector.py b/plugins/collector/systemcollector.py new file mode 100644 index 0000000..4c22f41 --- /dev/null +++ b/plugins/collector/systemcollector.py @@ -0,0 +1,49 @@ +from plugins.base import BaseModule +import psutil +import time + + +class SystemMetricsCollector(BaseModule): + def __init__(self, params): + super().__init__(params) + self.output_queue = None + self.host_tag = params.get('host_tag', 'unknown') + self.interval = params.get('interval', 1.0) + + def set_output_queue(self, queue): + self.output_queue = queue + + def collect_metrics(self): + return { + 'cpu_percent': psutil.cpu_percent(interval=None), + 'mem_available_gb': psutil.virtual_memory().available / (1024 ** 3), + 'disk_usage_percent': psutil.disk_usage('/').percent, + 'timestamp': int(time.time() * 1000000000) + } + + def run(self): + self.is_running = True + while self.is_running: + raw_metrics = self.collect_metrics() + line_protocol_string = self.format_to_line_protocol(raw_metrics) + + if self.output_queue and line_protocol_string: + # 放入 Queue,传递给 Storage 进程 + self.output_queue.put(line_protocol_string) + + time.sleep(self.interval) + + def format_to_line_protocol(self, metrics): + tag_set = f"host={self.host_tag}" + field_set_parts = [] + for key, value in metrics.items(): + if key == 'timestamp': + continue + if isinstance(value, int): + field_set_parts.append(f"{key}={value}i") + else: + field_set_parts.append(f"{key}={value}") + field_set = ",".join(field_set_parts) + timestamp = metrics.get('timestamp') + line = f"system_metrics,{tag_set} {field_set} {timestamp}" + return line \ No newline at end of file diff --git a/plugins/sender/__init__.py b/plugins/sender/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/plugins/sender/sender.py b/plugins/sender/sender.py new file mode 100644 index 0000000..a3b7f70 --- /dev/null +++ b/plugins/sender/sender.py @@ -0,0 +1,6 @@ +from plugins import BaseModule + +class sender(BaseModule): + def __init__(self, config): + super.__init__(config) + self.delect_queue = None diff --git a/plugins/storage/__init__.py b/plugins/storage/__init__.py new file mode 100644 index 0000000..8c02854 --- /dev/null +++ b/plugins/storage/__init__.py @@ -0,0 +1 @@ +from storage import storage \ No newline at end of file diff --git a/plugins/storage/storage.py b/plugins/storage/storage.py new file mode 100644 index 0000000..11cce99 --- /dev/null +++ b/plugins/storage/storage.py @@ -0,0 +1,103 @@ +import sqlite3 +from plugins import BaseModule +import time + +class Storage(BaseModule): + def __init__(self, params): + super().__init__(params) + self.db_path = None #数据库地址 + self.input_queue = None #输入队列 + self.manage_queue = None #删除任务队列 + self.data_queue = None + self.commit_interval = params.get('commit_interval', 1.0) #提交间隔 + self.batch_size = params.get('batch_size', 100) #缓存最大长度 + self.conn = None #连接 + self.cursor = None #游标 + + def set_input_queue(self, queue): #设置输入队列的外部接口 + self.input_queue = queue + def set_manage_queue(self,queue): + self.manage_queue = queue + def set_data_queue(self,queue): + self.data_queue = queue + + + def setup_database(self): #设置数据库 + self.conn = sqlite3.connect(self.db_path) + self.cursor = self.conn.cursor() + self.cursor.execute(""" + CREATE TABLE IF NOT EXISTS queue_data ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp INTEGER NOT NULL, + data TEXT NOT NULL + ); + """) + self.conn.commit() + self.cursor.execute("CREATE INDEX IF NOT EXISTS idx_timestamp ON queue_data (timestamp);") + self.conn.commit() + + def run(self): #主程序 + self.setup_database() + self.is_running = True + data_buffer = [] + last_commit_time = time.time() + while self.is_running: + try: + line = self.input_queue.get(timeout=0.1) + timestamp_ns = int(line.split(' ')[-1]) + data_buffer.append((timestamp_ns, line)) + except Exception: + pass + try: + manage = self.manage_queue.get(timeout=0) + command,data = manage + if command == 'delete': + self.delete_batch(data) + elif command == 'get': + self.get_batch(data) + except Exception: + pass + + current_time = time.time() + if data_buffer and (len(data_buffer) >= self.batch_size or + current_time - last_commit_time >= self.commit_interval): + self.commit_batch(data_buffer) + data_buffer = [] + last_commit_time = current_time + + if data_buffer: + self.commit_batch(data_buffer) + self.conn.close() + + def commit_batch(self, batch_data): #提交事务 + try: + self.conn.execute("BEGIN TRANSACTION") + sql = "INSERT INTO queue_data (timestamp, data) VALUES (?, ?)" + self.cursor.executemany(sql, batch_data) + self.conn.commit() + except sqlite3.Error as e: + self.conn.rollback() + + def delete_batch(self, ids): #提交删除事务 + if not ids: + return + placeholders = ','.join(['?'] * len(ids)) + sql = f"DELETE FROM queue_data WHERE id IN ({placeholders})" + try: + self.conn.execute("BEGIN TRANSACTION") + self.cursor.execute(sql, ids) + self.conn.commit() + except sqlite3.Error as e: + self.conn.rollback() + + def get_batch(self, batch_size): + try: + self.cursor.execute(""" + SELECT id, data + FROM queue_data + ORDER BY id ASC LIMIT ? + """, (batch_size,)) + results = self.cursor.fetchall() + self.data_queue.put(results) + except sqlite3.Error as e: + pass \ No newline at end of file