项目结构:

# encoding: utf-8
# 版权所有 2026 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:Broadcast Pattern 广播模式
# Author : geovindu,Geovin Du 涂聚文.
# IDE : PyCharm 2024.3.6 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j
# Datetime : 2026/6/7 23:11
# User : geovindu
# Product : PyCharm
# Project : pydesginpattern
# File : exceptions.pyclass BroadcastError(Exception):"""广播基础异常"""passclass SubscriberError(BroadcastError):"""订阅者异常"""pass# encoding: utf-8
# 版权所有 2026 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:Broadcast Pattern 广播模式
# Author : geovindu,Geovin Du 涂聚文.
# IDE : PyCharm 2024.3.6 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j
# Datetime : 2026/6/7 23:15
# User : geovindu
# Product : PyCharm
# Project : pydesginpattern
# File : broadcast.pyfrom typing import List
from BroadcastPattern.interfaces.subscriber import Subscriber
from BroadcastPattern.message.models import JewelryMessage
from BroadcastPattern.core.exceptions import SubscriberErrorclass BroadcastEngine(object):"""广播核心引擎(单例、线程安全)"""_instance = Nonedef __new__(cls):# 单例模式:全局唯一广播中心if cls._instance is None:cls._instance = super().__new__(cls)cls._instance._subscribers: List[Subscriber] = []return cls._instancedef subscribe(self, subscriber: Subscriber) -> None:""":param subscriber::return:"""if not subscriber:raise SubscriberError("订阅者不能为空")if subscriber not in self._subscribers:self._subscribers.append(subscriber)def unsubscribe(self, subscriber: Subscriber) -> None:""":param subscriber::return:"""if subscriber in self._subscribers:self._subscribers.remove(subscriber)def broadcast(self, message: JewelryMessage) -> None:"""向所有订阅者同时广播消息":param message::return:"""print(f"\n📢 【广播引擎】开始全局广播:{message.title}\n")for sub in self._subscribers:try:sub.on_receive(message)except Exception as e:print(f"⚠️ {sub.name} 处理消息失败:{str(e)}")
# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Broadcast Pattern 广播模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/7 23:13 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : models.py from dataclasses import dataclass@dataclass(frozen=True) class JewelryMessage:"""珠宝行业标准广播消息(不可变、类型安全) 实体"""title: strcontent: strproduct: strmaterial: strbatch: strstandard: strwarehouse_location: strmarketing_content: str# encoding: utf-8 # 版权所有 2026 ©涂聚文有限公司™ ® # 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎 # 描述:Broadcast Pattern 广播模式 # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2024.3.6 python 3.11 # os : windows 10 # database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j # Datetime : 2026/6/7 23:12 # User : geovindu # Product : PyCharm # Project : pydesginpattern # File : subscriber.pyfrom abc import ABC, abstractmethod from BroadcastPattern.message.models import JewelryMessageclass Subscriber(ABC):"""订阅者接口(所有业务系统必须实现)"""@property@abstractmethoddef name(self) -> str:"""系统名称:return:"""pass@abstractmethoddef on_receive(self, message: JewelryMessage) -> None:"""接收广播消息:param message::return:"""pass
# encoding: utf-8
# 版权所有 2026 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:Broadcast Pattern 广播模式
# Author : geovindu,Geovin Du 涂聚文.
# IDE : PyCharm 2024.3.6 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j
# Datetime : 2026/6/7 23:17
# User : geovindu
# Product : PyCharm
# Project : pydesginpattern
# File : procurement.py
from BroadcastPattern.interfaces.subscriber import Subscriber
from BroadcastPattern.message.models import JewelryMessageclass ProcurementSystem(Subscriber):"""采购系统"""@propertydef name(self) -> str:return "原料采购系统"def on_receive(self, message: JewelryMessage) -> None:print(f"📦【{self.name}】已同步原料溯源:{message.material}")# encoding: utf-8
# 版权所有 2026 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:Broadcast Pattern 广播模式
# Author : geovindu,Geovin Du 涂聚文.
# IDE : PyCharm 2024.3.6 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j
# Datetime : 2026/6/7 23:18
# User : geovindu
# Product : PyCharm
# Project : pydesginpattern
# File : production.py
from BroadcastPattern.interfaces.subscriber import Subscriber
from BroadcastPattern.message.models import JewelryMessageclass ProductionSystem(Subscriber):"""生产系统"""@propertydef name(self) -> str:return "生产加工系统"def on_receive(self, message: JewelryMessage) -> None:print(f"⚙️【{self.name}】已排产:{message.product} 批次 {message.batch}")# encoding: utf-8
# 版权所有 2026 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:
# Author : geovindu,Geovin Du 涂聚文.
# IDE : PyCharm 2024.3.6 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j
# Datetime : 2026/6/7 23:19
# User : geovindu
# Product : PyCharm
# Project : pydesginpattern
# File : quality.pyfrom BroadcastPattern.interfaces.subscriber import Subscriber
from BroadcastPattern.message.models import JewelryMessageclass QCSystem(Subscriber):"""质检系统"""@propertydef name(self) -> str:return "质量检测系统"def on_receive(self, message: JewelryMessage) -> None:print(f"🔍【{self.name}】已加载质检标准:{message.standard}")# encoding: utf-8
# 版权所有 2026 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:Broadcast Pattern 广播模式
# Author : geovindu,Geovin Du 涂聚文.
# IDE : PyCharm 2024.3.6 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j
# Datetime : 2026/6/7 23:20
# User : geovindu
# Product : PyCharm
# Project : pydesginpattern
# File : warehouse.pyfrom BroadcastPattern.interfaces.subscriber import Subscriber
from BroadcastPattern.message.models import JewelryMessageclass WarehouseSystem(Subscriber):"""仓储系统"""@propertydef name(self) -> str:return "仓储管理系统"def on_receive(self, message: JewelryMessage) -> None:print(f"📦【{self.name}】已预留仓位:{message.warehouse_location}")# encoding: utf-8
# 版权所有 2026 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:Broadcast Pattern 广播模式
# Author : geovindu,Geovin Du 涂聚文.
# IDE : PyCharm 2024.3.6 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j
# Datetime : 2026/6/7 23:21
# User : geovindu
# Product : PyCharm
# Project : pydesginpattern
# File : sales.pyfrom BroadcastPattern.interfaces.subscriber import Subscriber
from BroadcastPattern.message.models import JewelryMessageclass StoreSalesSystem(Subscriber):"""门店销售系统"""@propertydef name(self) -> str:return "全国门店销售系统"def on_receive(self, message: JewelryMessage) -> None:print(f"🏬【{self.name}】已上架新品:{message.product}")# encoding: utf-8
# 版权所有 2026 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:Broadcast Pattern 广播模式
# Author : geovindu,Geovin Du 涂聚文.
# IDE : PyCharm 2024.3.6 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j
# Datetime : 2026/6/7 23:22
# User : geovindu
# Product : PyCharm
# Project : pydesginpattern
# File : marketing.pyfrom BroadcastPattern.interfaces.subscriber import Subscriber
from BroadcastPattern.message.models import JewelryMessageclass MemberMarketingSystem(Subscriber):"""会员营销系统"""@propertydef name(self) -> str:return "会员营销系统"def on_receive(self, message: JewelryMessage) -> None:print(f"🎯【{self.name}】已推送:{message.marketing_content}")
调用:
# encoding: utf-8
# 版权所有 2026 ©涂聚文有限公司™ ®
# 许可信息查看:言語成了邀功盡責的功臣,還需要行爲每日來值班嗎
# 描述:Broadcast Pattern 广播模式
# Author : geovindu,Geovin Du 涂聚文.
# IDE : PyCharm 2024.3.6 python 3.11
# os : windows 10
# database : mysql 9.0 sql server 2019, postgreSQL 17.0 Oracle 21c Neo4j
# Datetime : 2026/6/7 23:23
# User : geovindu
# Product : PyCharm
# Project : pydesginpattern
# File : BroadcastBll.py
from BroadcastPattern.core.broadcast import BroadcastEngine
from BroadcastPattern.message.models import JewelryMessage
from BroadcastPattern.business.procurement import ProcurementSystem
from BroadcastPattern.business.production import ProductionSystem
from BroadcastPattern.business.quality import QCSystem
from BroadcastPattern.business.warehouse import WarehouseSystem
from BroadcastPattern.business.sales import StoreSalesSystem
from BroadcastPattern.business.marketing import MemberMarketingSystemclass BroadcastBll(object):""""""def demo(self):""":return:"""# 1. 初始化全局广播引擎engine = BroadcastEngine()# 2. 初始化所有珠宝业务系统systems = [ProcurementSystem(),ProductionSystem(),QCSystem(),WarehouseSystem(),StoreSalesSystem(),MemberMarketingSystem()]# 3. 全部订阅广播for sys in systems:engine.subscribe(sys)print(f"✅ 已订阅:{sys.name}")# 4. 构造行业标准消息message = JewelryMessage(title="2025春季冰种翡翠手镯全国上市",content="天然A货翡翠,统一标准、统一定价、同步发售",product="冰种翡翠手镯",material="缅甸天然翡翠",batch="JC20250415-001",standard="GB/T 16552-2017 珠宝玉石鉴定",warehouse_location="广州总部仓-A03-07",marketing_content="VIP会员专享9折+免费刻字")# 5. 执行广播engine.broadcast(message)print("\n🎉 企业级广播完成:全业务链同步成功")
输出:

