当前位置: 首页 > news >正文

社区公告推送小程序,输入公告信息,自动推送给小区用户,支持点赞和评论,解决社区公告传达不及时的问题。

我将为您创建一个完整的社区公告推送小程序。以下是完整的项目结构:

项目结构

community_announcement_system/

├── main.py # 主程序入口

├── models.py # 数据模型模块

├── announcement_manager.py # 公告管理核心模块

├── user_manager.py # 用户管理模块

├── notification_service.py # 推送通知服务模块

├── database.py # 数据库操作模块

├── utils.py # 工具函数模块

├── config.py # 配置文件

├── templates.py # HTML模板模块

├── static/

│ ├── style.css # 样式文件

│ └── script.js # JavaScript文件

├── data/

│ └── community_data.json # 数据存储文件

├── README.md # 使用说明

├── requirements.txt # 依赖包列表

└── demo.py # 演示程序

1. requirements.txt

flask==2.3.3

flask-sqlalchemy==3.0.5

flask-login==0.6.3

werkzeug==2.3.7

jinja2==3.1.2

wtforms==3.0.1

email-validator==2.0.0

python-dateutil==2.8.2

sqlite3

2. config.py

"""

配置文件 - 存储应用程序的配置参数

基于创新创业理论:标准化配置提升系统稳定性

"""

import os

from datetime import timedelta

class Config:

# 基础配置

SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-secret-key-change-in-production'

# 数据库配置

DATABASE_URL = os.environ.get('DATABASE_URL') or 'sqlite:///community.db'

# 应用配置

APP_NAME = "智慧社区公告系统"

APP_DESCRIPTION = "高效、便捷的社区信息推送平台"

# 推送配置

NOTIFICATION_ENABLED = True

PUSH_DELAY_SECONDS = 2 # 推送延迟时间

# 分页配置

ANNOUNCEMENTS_PER_PAGE = 10

COMMENTS_PER_PAGE = 5

# 权限配置

MAX_TITLE_LENGTH = 100

MAX_CONTENT_LENGTH = 2000

MAX_COMMENT_LENGTH = 500

# 时间配置

ANNOUNCEMENT_EXPIRY_DAYS = 30 # 公告过期天数

SESSION_TIMEOUT = timedelta(hours=24)

# 用户角色

ROLES = {

'admin': '管理员',

'resident': '居民',

'committee': '业委会成员'

}

# 公告类型

ANNOUNCEMENT_TYPES = {

'notice': '通知公告',

'activity': '活动通知',

'emergency': '紧急通知',

'service': '物业服务',

'other': '其他'

}

# 优先级等级

PRIORITY_LEVELS = {

'low': '普通',

'medium': '重要',

'high': '紧急',

'urgent': '特急'

}

class DevelopmentConfig(Config):

DEBUG = True

TESTING = True

class ProductionConfig(Config):

DEBUG = False

TESTING = False

# 配置映射

config = {

'development': DevelopmentConfig,

'production': ProductionConfig,

'default': DevelopmentConfig

}

3. models.py

"""

数据模型模块 - 定义系统的核心数据结构和关系

基于创新创业理论:数据模型是产品的骨架

"""

from flask_sqlalchemy import SQLAlchemy

from flask_login import UserMixin

from werkzeug.security import generate_password_hash, check_password_hash

from datetime import datetime

import json

db = SQLAlchemy()

class User(UserMixin, db.Model):

"""用户模型 - 存储社区居民信息"""

__tablename__ = 'users'

id = db.Column(db.Integer, primary_key=True)

username = db.Column(db.String(80), unique=True, nullable=False, comment='用户名')

email = db.Column(db.String(120), unique=True, nullable=False, comment='邮箱')

password_hash = db.Column(db.String(128), nullable=False, comment='密码哈希')

real_name = db.Column(db.String(50), nullable=False, comment='真实姓名')

phone = db.Column(db.String(20), comment='联系电话')

room_number = db.Column(db.String(20), comment='房间号')

role = db.Column(db.String(20), default='resident', comment='用户角色')

# 关联关系

announcements = db.relationship('Announcement', backref='author', lazy=True)

comments = db.relationship('Comment', backref='user', lazy=True)

likes = db.relationship('Like', backref='user', lazy=True)

# 时间戳

created_at = db.Column(db.DateTime, default=datetime.utcnow, comment='创建时间')

updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, comment='更新时间')

def set_password(self, password):

"""设置密码"""

self.password_hash = generate_password_hash(password)

def check_password(self, password):

"""验证密码"""

return check_password_hash(self.password_hash, password)

def get_avatar_url(self):

"""获取头像URL"""

return f"/static/avatars/{self.role}.png"

def to_dict(self, include_private=False):

"""转换为字典格式"""

data = {

'id': self.id,

'username': self.username,

'real_name': self.real_name,

'role': self.role,

'room_number': self.room_number,

'created_at': self.created_at.isoformat() if self.created_at else None

}

if include_private:

data.update({

'email': self.email,

'phone': self.phone

})

return data

def __repr__(self):

return f'<User {self.username}>'

class Announcement(db.Model):

"""公告模型 - 存储社区公告信息"""

__tablename__ = 'announcements'

id = db.Column(db.Integer, primary_key=True)

title = db.Column(db.String(200), nullable=False, comment='标题')

content = db.Column(db.Text, nullable=False, comment='内容')

type = db.Column(db.String(20), default='notice', comment='类型')

priority = db.Column(db.String(20), default='medium', comment='优先级')

# 作者信息

author_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)

# 推送状态

is_published = db.Column(db.Boolean, default=False, comment='是否已发布')

is_pinned = db.Column(db.Boolean, default=False, comment='是否置顶')

push_status = db.Column(db.String(20), default='draft', comment='推送状态')

# 统计数据

view_count = db.Column(db.Integer, default=0, comment='浏览次数')

like_count = db.Column(db.Integer, default=0, comment='点赞数')

comment_count = db.Column(db.Integer, default=0, comment='评论数')

# 时间信息

publish_time = db.Column(db.DateTime, comment='发布时间')

expiry_date = db.Column(db.DateTime, comment='过期时间')

created_at = db.Column(db.DateTime, default=datetime.utcnow, comment='创建时间')

updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, comment='更新时间')

# 关联关系

comments = db.relationship('Comment', backref='announcement', lazy=True, cascade='all, delete-orphan')

likes = db.relationship('Like', backref='announcement', lazy=True, cascade='all, delete-orphan')

def get_type_display(self):

"""获取类型显示名称"""

from config import Config

return Config.ANNOUNCEMENT_TYPES.get(self.type, '未知')

def get_priority_display(self):

"""获取优先级显示名称"""

from config import Config

return Config.PRIORITY_LEVELS.get(self.priority, '普通')

def get_author_name(self):

"""获取作者姓名"""

return self.author.real_name if self.author else '未知'

def is_expired(self):

"""判断是否过期"""

if not self.expiry_date:

return False

return datetime.utcnow() > self.expiry_date

def increment_view_count(self):

"""增加浏览次数"""

self.view_count += 1

db.session.commit()

def to_dict(self, include_content=True):

"""转换为字典格式"""

data = {

'id': self.id,

'title': self.title,

'type': self.type,

'type_display': self.get_type_display(),

'priority': self.priority,

'priority_display': self.get_priority_display(),

'author_name': self.get_author_name(),

'is_published': self.is_published,

'is_pinned': self.is_pinned,

'push_status': self.push_status,

'view_count': self.view_count,

'like_count': self.like_count,

'comment_count': self.comment_count,

'publish_time': self.publish_time.isoformat() if self.publish_time else None,

'expiry_date': self.expiry_date.isoformat() if self.expiry_date else None,

'created_at': self.created_at.isoformat() if self.created_at else None

}

if include_content:

data['content'] = self.content

return data

def __repr__(self):

return f'<Announcement {self.title}>'

class Comment(db.Model):

"""评论模型 - 存储用户对公告的评论"""

__tablename__ = 'comments'

id = db.Column(db.Integer, primary_key=True)

content = db.Column(db.Text, nullable=False, comment='评论内容')

is_reply = db.Column(db.Boolean, default=False, comment='是否为回复')

parent_id = db.Column(db.Integer, db.ForeignKey('comments.id'), comment='父评论ID')

# 关联信息

announcement_id = db.Column(db.Integer, db.ForeignKey('announcements.id'), nullable=False)

user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)

# 时间信息

created_at = db.Column(db.DateTime, default=datetime.utcnow, comment='创建时间')

updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, comment='更新时间')

# 自关联关系

replies = db.relationship('Comment', backref=db.backref('parent', remote_side=[id]), lazy=True)

def get_user_name(self):

"""获取用户姓名"""

return self.user.real_name if self.user else '匿名用户'

def to_dict(self):

"""转换为字典格式"""

return {

'id': self.id,

'content': self.content,

'is_reply': self.is_reply,

'parent_id': self.parent_id,

'user_name': self.get_user_name(),

'user_id': self.user_id,

'announcement_id': self.announcement_id,

'created_at': self.created_at.isoformat() if self.created_at else None,

'replies': [reply.to_dict() for reply in self.replies] if self.replies else []

}

def __repr__(self):

return f'<Comment {self.content[:20]}>'

class Like(db.Model):

"""点赞模型 - 存储用户对公告的点赞记录"""

__tablename__ = 'likes'

id = db.Column(db.Integer, primary_key=True)

user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)

announcement_id = db.Column(db.Integer, db.ForeignKey('announcements.id'), nullable=False)

# 时间信息

created_at = db.Column(db.DateTime, default=datetime.utcnow, comment='创建时间')

# 唯一约束

__table_args__ = (db.UniqueConstraint('user_id', 'announcement_id', name='unique_user_announcement_like'),)

def to_dict(self):

"""转换为字典格式"""

return {

'id': self.id,

'user_id': self.user_id,

'announcement_id': self.announcement_id,

'created_at': self.created_at.isoformat() if self.created_at else None

}

def __repr__(self):

return f'<Like user:{self.user_id} announcement:{self.announcement_id}>'

class PushRecord(db.Model):

"""推送记录模型 - 记录公告推送历史"""

__tablename__ = 'push_records'

id = db.Column(db.Integer, primary_key=True)

announcement_id = db.Column(db.Integer, db.ForeignKey('announcements.id'), nullable=False)

recipient_count = db.Column(db.Integer, default=0, comment='接收人数')

success_count = db.Column(db.Integer, default=0, comment='成功推送数')

failure_count = db.Column(db.Integer, default=0, comment='推送失败数')

# 时间信息

push_time = db.Column(db.DateTime, default=datetime.utcnow, comment='推送时间')

created_at = db.Column(db.DateTime, default=datetime.utcnow, comment='创建时间')

# 关联关系

announcement = db.relationship('Announcement', backref='push_records')

def to_dict(self):

"""转换为字典格式"""

return {

'id': self.id,

'announcement_id': self.announcement_id,

'recipient_count': self.recipient_count,

'success_count': self.success_count,

'failure_count': self.failure_count,

'push_time': self.push_time.isoformat() if self.push_time else None,

'created_at': self.created_at.isoformat() if self.created_at else None

}

def __repr__(self):

return f'<PushRecord announcement:{self.announcement_id}>'

4. database.py

"""

数据库操作模块 - 封装所有数据库相关的操作

基于创新创业理论:数据层抽象提升系统可维护性

"""

from sqlalchemy import create_engine, desc, asc, func, and_, or_

from sqlalchemy.orm import sessionmaker, scoped_session

from sqlalchemy.exc import SQLAlchemyError

import logging

from datetime import datetime, timedelta

import json

from models import db, User, Announcement, Comment, Like, PushRecord

from config import Config

class DatabaseManager:

"""数据库管理器 - 统一管理所有数据库操作"""

def __init__(self, app=None):

self.app = app

self.engine = None

self.Session = None

if app:

self.init_app(app)

def init_app(self, app):

"""初始化数据库连接"""

self.app = app

self.engine = create_engine(

app.config['SQLALCHEMY_DATABASE_URI'],

echo=app.config.get('SQLALCHEMY_ECHO', False)

)

self.Session = scoped_session(sessionmaker(bind=self.engine))

db.init_app(app)

def get_session(self):

"""获取数据库会话"""

return self.Session()

def close_session(self, session):

"""关闭数据库会话"""

if session:

session.close()

# 用户相关操作

def create_user(self, user_data):

"""创建新用户"""

session = self.get_session()

try:

# 检查用户名和邮箱是否已存在

existing_user = session.query(User).filter(

or_(

User.username == user_data['username'],

User.email == user_data['email']

)

).first()

if existing_user:

return None, "用户名或邮箱已存在"

user = User(

username=user_data['username'],

email=user_data['email'],

real_name=user_data['real_name'],

phone=user_data.get('phone', ''),

room_number=user_data.get('room_number', ''),

role=user_data.get('role', 'resident')

)

user.set_password(user_data['password'])

session.add(user)

session.commit()

return user, "用户创建成功"

except SQLAlchemyError as e:

session.rollback()

logging.error(f"创建用户失败: {e}")

return None, f"数据库错误: {str(e)}"

finally:

self.close_session(session)

def get_user_by_username(self, username):

"""根据用户名获取用户"""

session = self.get_session()

try:

return session.query(User).filter(User.username == username).first()

except SQLAlchemyError as e:

logging.error(f"查询用户失败: {e}")

return None

finally:

self.close_session(session)

def get_user_by_id(self, user_id):

"""根据用户ID获取用户"""

session = self.get_session()

try:

return session.query(User).filter(User.id == user_id).first()

except SQLAlchemyError as e:

logging.error(f"查询用户失败: {e}")

return None

finally:

self.close_session(session)

def get_users_paginated(self, page=1, per_page=20, filters=None):

"""分页获取用户列表"""

session = self.get_session()

try:

query = session.query(User)

# 应用过滤条件

if filters:

if filters.get('role'):

query = query.filter(User.role == filters['role'])

if filters.get('keyword'):

keyword = f"%{filters['keyword']}%"

query = query.filter(

or_(

User.username.like(keyword),

User.real_name.like(keyword),

User.room_number.like(keyword)

)

)

# 排序和分页

users = query.order_by(desc(User.created_at)).paginate(

page=page, per_page=per_page, error_out=False

)

return users

except SQLAlchemyError as e:

logging.error(f"获取用户列表失败: {e}")

return None

finally:

self.close_session(session)

# 公告相关操作

def create_announcement(self, announcement_data, author_id):

"""创建新公告"""

session = self.get_session()

try:

# 计算过期时间

expiry_days = announcement_data.get('expiry_days', 30)

expiry_date = datetime.utcnow() + timedelta(days=expiry_days)

announcement = Announcement(

title=announcement_data['title'],

content=announcement_data['content'],

type=announcement_data.get('type', 'notice'),

priority=announcement_data.get('priority', 'medium'),

author_id=author_id,

is_pinned=announcement_data.get('is_pinned', False),

expiry_date=expiry_date

)

session.add(announcement)

session.commit()

return announcement, "公告创建成功"

except SQLAlchemyError as e:

session.rollback()

logging.error(f"创建公告失败: {e}")

return None, f"数据库错误: {str(e)}"

finally:

self.close_session(session)

def get_announcement_by_id(self, announcement_id):

"""根据ID获取公告"""

session = self.get_session()

try:

return session.query(Announcement).filter(

Announcement.id == announcement_id

).first()

except SQLAlchemyError as e:

logging.error(f"查询公告失败: {e}")

return None

finally:

self.close_session(session)

def get_announcements_paginated(self, page=1, per_page=10, filters=None, user_id=None):

"""分页获取公告列表"""

session = self.get_session()

try:

query = session.query(Announcement).filter(

Announcement.is_published == True

)

# 排除过期的公告

query = query.filter(

or_(

Announcement.expiry_date.is_(None),

Announcement.expiry_date > datetime.utcnow()

)

)

# 应用过滤条件

if filters:

if filters.get('type'):

query = query.filter(Announcement.type == filters['type'])

if filters.get('priority'):

query = query.filter(Announcement.priority == filters['priority'])

if filters.get('author_id'):

query = query.filter(Announcement.author_id == filters['author_id'])

if filters.get('keyword'):

keyword = f"%{filters['keyword']}%"

query = query.filter(

or_(

Announcement.title.like(keyword),

Announcement.content.like(keyword)

)

)

# 排序:置顶的在前,然后按发布时间倒序

query = query.order_by(

desc(Announcement.is_pinned),

desc(Announcement.publish_time)

)

announcements = query.paginate(

page=page, per_page=per_page, error_out=False

)

# 如果用户已登录,标记已读状态

if user_id:

for announcement in announcements.items:

# 这里可以添加已读状态的逻辑

pass

return announcements

except SQLAlchemyError as e:

logging.error(f"获取公告列表失败: {e}")

return None

finally:

self.close_session(session)

def publish_announcement(self, announcement_id):

"""发布公告"""

session = self.get_session()

try:

announcement = session.query(Announcement).filter(

Announcement.id == announcement_id

).first()

if not announcement:

return None, "公告不存在"

announcement.is_published = True

announcement.publish_time = datetime.utcnow()

http://www.jsqmd.com/news/165769/

相关文章:

  • 从基础到实践:信息系统安全风险防范的十大常用技术剖析
  • 彻底搞懂大语言模型(LLM)与Transformer架构的原理与应用!
  • “网络安全”到底是做什么的?揭秘主要岗位职责与入门成长路线图
  • 系统性打败碎片化:Linux运维核心技能体系深度梳理
  • 毕业论文 “无痛通关”:从选题到成稿,藏在 paperzz 毕业论文里的 4 步 “躺平式” 写作法
  • 【独家揭秘】最新研究出炉:大模型LLM如何在关键时刻实现深度思考?
  • 驶入黄金赛道:网络安全主要岗位详解及行业前景洞察
  • leetcode 816. Ambiguous Coordinates 模糊坐标
  • 【AI×实时Linux:极速实战宝典】硬件分区 - 使用 NVIDIA MIG (Multi-Instance GPU) 在物理层隔离不同 AI 任务
  • leetcode 817. Linked List Components 链表组件-耗时100%
  • 驾驭无形风险:网络安全前沿趋势分析与企业防御战略指南
  • GESP认证C++编程真题解析 | B4451 [GESP202512 四级] 建造
  • 软/硬中断计数、softnet_stat、socket 队列、吞吐量、CPU 使用率、offload状态
  • 知网AIGC爆红怎么办?2025最新论文降AI全攻略(附免费手改+工具实测)
  • 【AI×实时Linux:极速实战宝典】显存池 - 编写自定义 Allocator 预分配全量显存,杜绝运行时的 cudaMalloc 开销
  • Elastic 即代码:自动化的不只是基础设施
  • 2025年焊管厂家推荐榜:镍基合金/厚壁/不锈钢/特材/大口径/钛合金/复合不锈钢材料/直缝焊管源头厂家精选 - 品牌推荐官
  • 2025年母线槽生产厂家实力推荐:江苏祥丰电器有限公司,专注耐火/密集/高压/铝合金/封闭式母线槽源头厂家精选 - 品牌推荐官
  • 普源数字万用表DM858E接地电阻测量技巧
  • 2025年口碑好的铝合金地垫制造企业推荐,高品质铝合金地垫生产厂家全解析 - 工业品牌热点
  • 检测CVE-2025–66478/CVE-2025–55182:React/Next.jsRSC反序列化漏洞实战指南
  • 基于Spring Boot框架的文学名著分享系统的设计与实现
  • 2025-2026双曲面搅拌机三大优质厂家权威榜单:技术领先者揭晓 - 品牌推荐大师
  • 同惠TH2840LCR测试仪:电路板故障检测的精准“诊断师“
  • 2025工业废盐焚烧炉厂商TOP5权威推荐:废盐焚烧炉资深厂商甄选指南 - 工业推荐榜
  • 收藏!构建高质量AI智能体的10条核心法则(实战总结,小白/程序员必看)
  • 2026年雅思/托福机构评测榜:经济复苏驱动留学热,多次元教育以98.6分领跑行业 - 速递信息
  • 工业互联网在电池拆解中的智能化升级路径
  • 2025年靠谱变压器厂家排行榜,变压器制造商推荐与供应商精选测评 - mypinpai
  • 手把手教你Java文件断点下载