python: Fan-In Pattern Fan-In
项目结构:
Fan-In 模式本质:多数据源 → 统一接收器,完美匹配珠宝全流程「多环节、单数据中心」场景
架构设计
核心分层(5 层架构)
配置层(config):全局配置、常量
消息层(message):统一消息结构体、数据格式
业务层(process):各业务模块(单一职责)
核心层(core):Fan-In 队列、Sink 接收器、线程管理
启动层(main):项目入口、编排启动
# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Fan-In Pattern Fan-In(扇入)模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 20:47 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : settings.py """ 全局配置文件 """ import queue # 队列最大容量 QUEUE_MAX_SIZE = 100 # 全局 Fan-In 队列(单例) FAN_IN_QUEUE = queue.Queue(maxsize=QUEUE_MAX_SIZE) # 业务处理模拟时间范围(秒) PROCESS_MIN_TIME = 0.3 PROCESS_MAX_TIME = 1.2 # 结束标识 END_SIGNAL = "SYSTEM_END" # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Fan-In Pattern Fan-In(扇入)模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 20:47 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : business_msg.py """ 统一业务消息结构体(企业级数据格式) """ from dataclasses import dataclass from typing import Any @dataclass class BusinessMessage: """ 珠宝全流程统一消息体 严格结构化:所有业务环节必须使用此格式发送数据 """ process_name: str # 业务环节名称 task_name: str # 任务名称 task_details: Any # 任务详情 timestamp: float # 时间戳 status: str = "success" # 状态 # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Fan-In Pattern Fan-In(扇入)模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 20:48 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : fan_in_queue.py """ Fan-In 核心队列:线程安全、单例、全局唯一 """ from FanInPattern.config.settings import FAN_IN_QUEUE, END_SIGNAL from FanInPattern.message.business_msg import BusinessMessage class FanInQueue: """ Fan-In 队列封装(企业级安全调用) """ @staticmethod def send_msg(msg: BusinessMessage) -> None: """ 发送业务消息 :param msg: :return: """ FAN_IN_QUEUE.put(msg) @staticmethod def get_msg() -> BusinessMessage | str: """ 获取消息(阻塞) :return: """ return FAN_IN_QUEUE.get() @staticmethod def send_end_signal() -> None: """ 发送结束信号 :return: """ FAN_IN_QUEUE.put(END_SIGNAL) @staticmethod def task_done() -> None: """ 任务完成标识 :return: """ FAN_IN_QUEUE.task_done() @staticmethod def join() -> None: """ 等待队列清空 :return: """ FAN_IN_QUEUE.join() # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Fan-In Pattern Fan-In(扇入)模式: # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 20:50 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : work_sink.py """ 工作接收器:数据中心(唯一汇聚点) """ import threading import time from FanInPattern.config.settings import END_SIGNAL from FanInPattern.core.fan_in_queue import FanInQueue from FanInPattern.message.business_msg import BusinessMessage class WorkSink: """ 珠宝业务数据中心(Fan-In Sink) """ def __init__(self): self.total_count = 0 self.process_stats = {} # 各环节统计 def _update_stats(self, process_name: str) -> None: """ 更新业务统计 :param process_name: :return: """ self.process_stats[process_name] = self.process_stats.get(process_name, 0) + 1 def run(self) -> None: """ 启动接收器 :return: """ print("=" * 90) print("🏢 【企业级】珠宝业务数据中心已启动 - Fan-In 工作接收器") print("=" * 90) while True: data = FanInQueue.get_msg() # 结束信号 if data == END_SIGNAL: self._print_final_report() FanInQueue.task_done() break # 处理业务消息 if isinstance(data, BusinessMessage): self._handle_message(data) FanInQueue.task_done() time.sleep(0.1) def _handle_message(self, msg: BusinessMessage) -> None: """ 处理单条消息 :param msg: :return: """ self.total_count += 1 self._update_stats(msg.process_name) print( f"📊 接收 | {msg.process_name:10s} | " f"任务:{msg.task_name:15s} | 状态:{msg.status}" ) def _print_final_report(self) -> None: """ 打印企业级汇总报告 :return: """ print("\n" + "=" * 90) print("📈 珠宝全流程业务汇总报告") print("=" * 90) print(f"✅ 总处理任务数:{self.total_count}") for process, count in self.process_stats.items(): print(f" 「{process}」:{count} 项") print("=" * 90) # 单例接收器(全局唯一) SINK_INSTANCE = WorkSink()# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Fan-In Pattern Fan-In(扇入)模式: # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 20:52 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : base_process.py """ 业务基类:所有业务环节必须继承此类 """ import abc import time import random import threading from FanInPattern.config.settings import PROCESS_MIN_TIME, PROCESS_MAX_TIME from FanInPattern.core.fan_in_queue import FanInQueue from FanInPattern.message.business_msg import BusinessMessage class BaseProcess(abc.ABC): """ 抽象业务基类 职责:统一执行逻辑、发送消息、模拟耗时 """ def __init__(self, process_name: str): self.process_name = process_name self.task_list = [] @abc.abstractmethod def init_tasks(self) -> None: """ 初始化任务(子类必须实现) :return: """ pass def _simulate_process(self) -> None: """ 模拟业务处理耗时 :return: """ sleep_time = random.uniform(PROCESS_MIN_TIME, PROCESS_MAX_TIME) time.sleep(sleep_time) def _send_task_msg(self, task: str) -> None: """ 构造并发送消息 :param task: :return: """ msg = BusinessMessage( process_name=self.process_name, task_name=task, task_details=task, timestamp=time.time() ) FanInQueue.send_msg(msg) def execute(self) -> None: """ 执行业务流程 :return: """ self.init_tasks() for task in self.task_list: self._simulate_process() self._send_task_msg(task) def start_thread(self) -> threading.Thread: """ 启动独立线程执行 :return: """ thread = threading.Thread(target=self.execute, name=self.process_name) thread.start() return thread # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Fan-In Pattern Fan-In(扇入)模式: # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 21:09 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : design.py from FanInPattern.process.base_process import BaseProcess class DesignProcess(BaseProcess): """ 珠宝设计 """ def __init__(self): super().__init__("珠宝设计") def init_tasks(self): """ :return: """ self.task_list = ["钻戒款式设计", "项链3D建模", "手镯图纸审核"] # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Fan-In Pattern Fan-In(扇入)模式: # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 21:11 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : purchase.py from FanInPattern.process.base_process import BaseProcess class PurchaseProcess(BaseProcess): """ 原料采购 """ def __init__(self): super().__init__("原料采购") def init_tasks(self): """ :return: """ self.task_list = [ "采购1克拉南非钻石", "采购999足金500g", "采购红宝石10颗" ] # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述: # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 21:23 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : produce.py from FanInPattern.process.base_process import BaseProcess class ProduceProcess(BaseProcess): """ 生产加工 """ def __init__(self): super().__init__("生产加工") def init_tasks(self): """ :return: """ self.task_list = ["钻石镶嵌加工", "黄金手镯抛光", "金饰铸造成型"] # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Fan-In Pattern Fan-In(扇入)模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 21:26 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : quality.py from FanInPattern.process.base_process import BaseProcess class QualityProcess(BaseProcess): """ 质量检测 """ def __init__(self): super().__init__("质量检测") def init_tasks(self): self.task_list = ["黄金纯度检测", "钻石工艺检测", "珠宝鉴定证书出具"] # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Fan-In Pattern Fan-In(扇入)模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 21:28 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : warehouse.py from FanInPattern.process.base_process import BaseProcess class WarehouseProcess(BaseProcess): """ 仓储管理 """ def __init__(self): super().__init__("仓储管理") def init_tasks(self): self.task_list = ["钻戒入库登记", "黄金库存盘点", "宝石库存预警"] # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Fan-In Pattern Fan-In(扇入)模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 21:28 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : sale.py from FanInPattern.process.base_process import BaseProcess class SaleProcess(BaseProcess): """ 销售环节 """ def __init__(self): super().__init__("销售环节") def init_tasks(self): self.task_list = ["线上钻戒售出", "门店黄金手镯售出", "珠宝批发订单发货"] # encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Fan-In Pattern Fan-In(扇入)模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 21:29 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : after_sale.py from FanInPattern.process.base_process import BaseProcess class AfterSaleProcess(BaseProcess): """ 售后服务 """ def __init__(self): super().__init__("售后服务") def init_tasks(self): self.task_list = ["钻戒免费清洗", "项链维修", "黄金首饰保养"]调用:
# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述: Fan-In Pattern Fan-In(扇入)模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/19 21:51 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : FanInBll.py """ 企业级编排 """ import threading from FanInPattern.core.fan_in_queue import FanInQueue from FanInPattern.core.work_sink import SINK_INSTANCE # 导入所有业务流程 from FanInPattern.process.purchase import PurchaseProcess from FanInPattern.process.design import DesignProcess from FanInPattern.process.produce import ProduceProcess from FanInPattern.process.quality import QualityProcess from FanInPattern.process.warehouse import WarehouseProcess from FanInPattern.process.sale import SaleProcess from FanInPattern.process.after_sale import AfterSaleProcess class JewelryBusinessOrchestration(object): """ 业务流程编排器 """ @staticmethod def get_all_processes() -> list: """ 获取所有业务环节(扩展只需在此添加) :return: """ return [ PurchaseProcess(), DesignProcess(), ProduceProcess(), QualityProcess(), WarehouseProcess(), SaleProcess(), AfterSaleProcess() ] @staticmethod def run(): """启动全流程""" # 1. 启动接收器线程 sink_thread = threading.Thread(target=SINK_INSTANCE.run, name="WorkSink") sink_thread.start() # 2. 启动所有业务线程 processes = JewelryBusinessOrchestration.get_all_processes() threads = [p.start_thread() for p in processes] # 3. 等待所有业务完成 for t in threads: t.join() # 4. 发送结束信号 FanInQueue.send_end_signal() # 5. 等待队列处理完成 FanInQueue.join() sink_thread.join() class FanInBll(object): """ """ def demo(self): """ :return: """ print("🚀 企业级珠宝 Fan-In 业务系统启动...\n") JewelryBusinessOrchestration.run() print("\n🎉 系统全部执行完成!")