WSAIOS v3.0 架构设计与核心实现
一个多模块系统的重构:从10个独立服务到统一调度
技术支持:拓世网络技术开发部
一、现状
我们有一个系统,里面拆了10个独立模块:
· 模块A:管理运行环境
· 模块B:调度多个执行单元
· 模块C:编排工作流
· 模块D:自动调整参数
· 模块E:采集外部状态
· 模块F:存储数据关系
· 模块G:做逻辑判断
· 模块H:分布式数据同步
· 模块I:模拟预测
· 模块J:选择最优方案
每个模块单独跑都没问题,但合在一起就出问题了。
问题1:一个请求要串行调10个模块
用户发起一个请求,系统要依次调用A→B→C→D→E→F→G→H→I→J。每个模块平均耗时300ms,加起来3秒以上。用户反馈太慢。
问题2:数据存了多份
模块E存了一份状态,模块F存了一份知识,模块I又存了一份模型参数。改一个字段要同时改三个地方,经常漏改导致数据不一致。
问题3:模块之间相互调用
A调B,B调C,C调D……改一个模块,影响十几个文件。上线前测试要测全链路,成本很高。
问题4:各模块优化目标不同
模块J追求转化率,模块C追求响应速度,两个方向冲突,整体效果反而下降。
二、解决方案
不搞那么多独立模块,用统一调度器来管理所有功能。
调度器负责:
1. 维护当前要完成的目标列表(按优先级排序)
2. 维护系统运行状态(所有数据集中存一份)
3. 维护业务数据关系(实体和关联)
4. 执行简单的规则判断
5. 调整策略参数
6. 分发执行任务
结构变成这样:
```
统一调度器
├── 目标队列(决定先做什么)
├── 状态存储(所有数据放一起)
├── 数据关系库(实体关联)
├── 规则引擎(if-else判断)
├── 参数调优(自动调整配置)
└── 任务执行器(调用外部功能)
```
六个组件只和调度器通信,组件之间不直接调用。
三、代码实现
1. 目标队列
```python
import heapq
class GoalQueue:
def __init__(self):
self.goals = {}
self.queue = []
def add(self, desc, priority=5):
gid = str(len(self.goals) + 1)
self.goals[gid] = {'id': gid, 'desc': desc, 'priority': priority, 'done': False}
heapq.heappush(self.queue, (-priority, gid))
return gid
def get_next(self):
while self.queue:
_, gid = self.queue[0]
goal = self.goals.get(gid)
if goal and not goal['done']:
return goal
heapq.heappop(self.queue)
return None
def finish(self, gid):
if gid in self.goals:
self.goals[gid]['done'] = True
```
2. 状态存储
```python
class StateStore:
def __init__(self):
self.data = {}
self.watchers = {}
def get(self, key, default=None):
parts = key.split('.')
cur = self.data
for p in parts:
if isinstance(cur, dict) and p in cur:
cur = cur[p]
else:
return default
return cur
def set(self, key, value):
parts = key.split('.')
cur = self.data
for p in parts[:-1]:
if p not in cur:
cur[p] = {}
cur = cur[p]
old = cur.get(parts[-1])
cur[parts[-1]] = value
self._notify(key, old, value)
def snapshot(self):
import copy
return copy.deepcopy(self.data)
def watch(self, pattern, callback):
self.watchers.setdefault(pattern, []).append(callback)
def _notify(self, key, old, new):
for pattern, cbs in self.watchers.items():
if self._match(key, pattern):
for cb in cbs:
try:
cb(key, old, new)
except:
pass
def _match(self, key, pattern):
if pattern == '*':
return True
if pattern.endswith('*'):
return key.startswith(pattern[:-1])
return key == pattern
```
3. 数据关系库
```python
class RelationStore:
def __init__(self):
self.entities = {}
self.links = []
self.cache = {}
def add_entity(self, eid, etype, **attrs):
if eid not in self.entities:
self.entities[eid] = {'_type': etype}
self.entities[eid].update(attrs)
self.cache.clear()
def add_link(self, src, dst, ltype, props=None):
if src not in self.entities or dst not in self.entities:
return False
self.links.append({
'src': src, 'dst': dst, 'type': ltype, 'props': props or {}
})
self.cache.clear()
return True
def query(self, eid=None, ltype=None):
key = f"{eid}:{ltype}"
if key in self.cache:
return self.cache[key]
result = {'entities': [], 'links': []}
if eid and eid in self.entities:
result['entities'].append(self.entities[eid])
for link in self.links:
if link['src'] == eid or link['dst'] == eid:
result['links'].append(link)
if ltype:
for link in self.links:
if link['type'] == ltype:
result['links'].append(link)
if link['src'] in self.entities:
result['entities'].append(self.entities[link['src']])
if link['dst'] in self.entities:
result['entities'].append(self.entities[link['dst']])
self.cache[key] = result
return result
```
4. 规则引擎
```python
class RuleEngine:
def run(self, goal, state, relations):
result = {
'matches': [],
'actions': [],
'score': 0.0
}
# 传递关系:如果A关联B,B关联C,则A关联C
links = relations.get('links', [])
for l1 in links:
for l2 in links:
if l1['dst'] == l2['src']:
result['matches'].append({
'src': l1['src'],
'dst': l2['dst'],
'weight': 0.7
})
# 根据目标生成建议操作
if goal:
desc = goal.get('desc', '')
if '提升' in desc or '增加' in desc:
result['actions'].append({'name': 'analyze', 'priority': 'high'})
if '降低' in desc or '减少' in desc:
result['actions'].append({'name': 'audit', 'priority': 'high'})
return result
```
5. 参数调优
```python
import random
import copy
class ParamOptimizer:
def __init__(self, size=10):
self.size = size
self.pop = []
self.best = None
self.gen = 0
def init(self, templates):
for t in templates:
self.pop.append({'params': copy.deepcopy(t), 'fitness': 0.0})
while len(self.pop) < self.size:
self.pop.append({
'params': {'actions': random.sample(['pub', 'opt', 'mon'], 2)},
'fitness': 0.0
})
def optimize(self, goal, state, rules_result):
# 评估适应度
for p in self.pop:
p['fitness'] = self._evaluate(p, goal)
self.pop.sort(key=lambda x: x['fitness'], reverse=True)
if not self.best or self.pop[0]['fitness'] > self.best.get('fitness', 0):
self.best = copy.deepcopy(self.pop[0])
# 生成下一代
elite = self.pop[:2]
new_pop = elite.copy()
while len(new_pop) < self.size:
p1, p2 = random.sample(elite, 2)
child = self._crossover(p1, p2)
if random.random() < 0.1:
child = self._mutate(child)
new_pop.append(child)
self.pop = new_pop
self.gen += 1
return self.best
def _evaluate(self, p, goal):
score = 0.0
if goal:
desc = goal.get('desc', '')
params_str = str(p['params'])
matches = sum(1 for w in desc.split() if len(w) > 2 and w in params_str)
score += min(matches / max(1, len(desc.split())), 1.0) * 0.5
score += random.random() * 0.5
return score
def _crossover(self, p1, p2):
params = {}
for k in set(p1['params'].keys()) | set(p2['params'].keys()):
if k in p1['params'] and k in p2['params']:
params[k] = random.choice([p1['params'][k], p2['params'][k]])
elif k in p1['params']:
params[k] = copy.deepcopy(p1['params'][k])
else:
params[k] = copy.deepcopy(p2['params'][k])
return {'params': params, 'fitness': 0.0}
def _mutate(self, p):
mutated = copy.deepcopy(p)
params = mutated['params']
for k, v in params.items():
if isinstance(v, (int, float)):
params[k] = v * random.uniform(0.8, 1.2)
return mutated
```
6. 任务执行器
```python
import asyncio
import aiohttp
class TaskExecutor:
def __init__(self, workers=3):
self.queue = asyncio.Queue()
self.handlers = {}
self.workers = workers
self.running = False
self.tasks = []
self.session = None
self.stats = {'ok': 0, 'fail': 0}
def register(self, name, func):
self.handlers[name] = func
async def start(self):
self.running = True
self.session = aiohttp.ClientSession()
for i in range(self.workers):
t = asyncio.create_task(self._worker(i))
self.tasks.append(t)
async def stop(self):
self.running = False
for t in self.tasks:
t.cancel()
await asyncio.gather(*self.tasks, return_exceptions=True)
if self.session:
await self.session.close()
async def submit(self, actions):
ids = []
for act in actions:
tid = str(len(ids) + 1)
await self.queue.put({
'id': tid,
'type': act.get('type'),
'target': act.get('target'),
'params': act.get('params', {})
})
ids.append(tid)
return ids
async def _worker(self, wid):
while self.running:
try:
task = await self.queue.get()
result = await self._execute(task)
if result.get('ok'):
self.stats['ok'] += 1
else:
self.stats['fail'] += 1
self.queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
print(f'worker {wid} error: {e}')
async def _execute(self, task):
ttype = task.get('type')
target = task.get('target')
params = task.get('params', {})
try:
if ttype == 'handler':
if target not in self.handlers:
return {'ok': False, 'err': f'handler not found: {target}'}
func = self.handlers[target]
if asyncio.iscoroutinefunction(func):
result = await func(**params)
else:
result = func(**params)
return {'ok': True, 'result': result}
elif ttype == 'api':
return await self._call_api(target, params)
else:
return {'ok': False, 'err': f'unknown type: {ttype}'}
except Exception as e:
return {'ok': False, 'err': str(e)}
async def _call_api(self, url, params):
method = params.get('method', 'GET')
timeout = aiohttp.ClientTimeout(total=params.get('timeout', 30))
async with self.session.request(
method=method,
url=url,
json=params.get('json'),
params=params.get('params'),
timeout=timeout
) as resp:
data = await resp.json()
return {'ok': resp.status < 400, 'status': resp.status, 'data': data}
```
四、组装调度器
```python
class Scheduler:
def __init__(self):
self.goals = GoalQueue()
self.state = StateStore()
self.relations = RelationStore()
self.rules = RuleEngine()
self.optimizer = ParamOptimizer()
self.executor = TaskExecutor()
self.running = False
self.loop_task = None
async def start(self):
self.running = True
await self.executor.start()
self.loop_task = asyncio.create_task(self._main_loop())
print('调度器已启动')
async def stop(self):
self.running = False
if self.loop_task:
self.loop_task.cancel()
await self.executor.stop()
print('调度器已停止')
async def _main_loop(self):
while self.running:
try:
goal = self.goals.get_next()
if not goal:
await asyncio.sleep(1)
continue
state = self.state.snapshot()
rels = self.relations.query()
rules_result = self.rules.run(goal, state, rels)
best = self.optimizer.optimize(goal, state, rules_result)
actions = best.get('params', {}).get('actions', [])
if actions:
task_list = [{'type': 'handler', 'target': a, 'params': {}} for a in actions]
ids = await self.executor.submit(task_list)
print(f'执行任务: {ids}')
except asyncio.CancelledError:
break
except Exception as e:
print(f'主循环异常: {e}')
await asyncio.sleep(2)
```
五、使用示例
假设我们有三个功能函数:
```python
def generate_content(topic='AI'):
return f'生成了关于{topic}的内容'
def publish_content(content=''):
return f'发布了: {content[:20]}...'
def check_rank(keyword='AI'):
return {'rank': 5, 'keyword': keyword}
```
启动系统:
```python
import asyncio
async def main():
scheduler = Scheduler()
# 注册功能
scheduler.executor.register('generate', generate_content)
scheduler.executor.register('publish', publish_content)
scheduler.executor.register('check', check_rank)
# 设置目标
scheduler.goals.add('提升SEO排名', 10)
scheduler.goals.add('持续发布内容', 8)
# 初始化参数模板
templates = [
{'actions': ['generate', 'publish', 'check']},
{'actions': ['generate', 'publish']},
{'actions': ['publish', 'check']},
]
scheduler.optimizer.init(templates)
# 启动
await scheduler.start()
# 运行30秒
await asyncio.sleep(30)
await scheduler.stop()
print(f'执行统计: {scheduler.executor.stats}')
if __name__ == '__main__':
asyncio.run(main())
```
六、效果对比
在同一台服务器上测试:
指标 改之前(10个模块直连) 改之后(统一调度)
平均响应时间 3.2秒 1.1秒
每分钟处理量 30个 85个
模块间调用开销 38% 12%
改一个模块影响文件数 10+ 2-3
七、总结
这次重构的核心就两点:
1. 统一调度:所有模块只和调度器通信,不直接相互调用
2. 数据集中:状态、关系、参数都放在调度器统一管理
带来的好处:
· 响应时间从3秒降到1秒左右
· 吞吐量翻了两倍多
· 代码维护简单了,改一个模块不用牵一发动全身
