Postgres – Dengar / Beritahu, bangun sistem terbitkan dan berlangganan yang ringan – bidang cangkul Huajiu
Ringkasan
Saat pertama kali merancang modul perpesanan dan modul caching sistem internal, hanya ada satu ketergantungan Postgres. Mengingat penggunanya tidak banyak, maka tidak perlu menginstal Redis tambahan yang hanya akan menambah waktu pengoperasian dan pemeliharaan. Caching mudah diperbaiki, cukup konfigurasikan tabel UNLOGGED. Ketika saya memikirkan tentang cara menggunakan tabel data untuk mengimplementasikan pesan, saya menemukan bahwa PostgreSQL menyediakan perintah bawaan. LISTEN Dan NOTIFYdigunakan untuk mengimplementasikan komunikasi asinkron antara server database dan klien yang terhubung. Ekstensi khusus PostgreSQL ini memungkinkan database digunakan sebagai sistem Message Queuing (MQ) yang ringan, memungkinkan aplikasi menghasilkan peristiwa dari database dan meresponsnya secara real time oleh klien lain. Jadi kami segera mengklik dan memulai pengujian.
Fitur inti
- implementasi yang ringan: Tidak memerlukan middleware pesan tambahan, langsung gunakan fungsi bawaan PostgreSQL
- Komunikasi asinkron: Mendukung model terbitkan-berlangganan untuk mencapai komunikasi komponen yang dipisahkan
- Memori yang efisien: Saluran adalah objek memori murni dan tidak menempati ruang disk.
- Konfigurasi nol: Tidak perlu membuat atau mengonfigurasi saluran terlebih dahulu, buatlah saluran kapan saja
Skenario yang berlaku
- Dasbor waktu nyata: Mendorong pembaruan secara real-time ketika data berubah
- pembatalan cache: Memberi tahu lapisan cache untuk menyegarkan ketika data diperbarui.
- Audit data: Melacak peristiwa perubahan data penting
- Jadwal tugas: Membangun antrian tugas distribusi sederhana
- arsitektur yang digerakkan oleh peristiwa: Menerapkan komunikasi acara antar layanan mikro
Mekanisme saluran
Fitur penting:
- Saluran adalah objek memori murni dan bisa
LISTENPerintah dibuat secara implisit - Ketika semua sesi mendengarkan diinterupsi atau dijalankan
UNLISTENDaur ulang secara otomatis - Tidak perlu membuat atau menghapus saluran secara manual, juga tidak didukung
Model penyampaian pesan
PostgreSQL NOTIFY Penggunaan yang umum “Buang tanpa pengawasan” mekanisme:
- Ketika tidak ada yang mendengarkan, pesannya tidak akan ada.
- Tidak memakan ruang disk
- Tidak menggunakan memori persisten
- Pesan hanya terkirim jika ada pendengar yang aktif
Penggunaan dasar
contoh baris perintah psql
-- 监听指定通道
LISTEN task_channel;
-- 向通道发送消息
NOTIFY task_channel, '123456';
-- 取消监听所有通道
UNLISTEN *;
-- 查看当前监听的通道
SELECT pg_listening_channels();
-- 查看系统通知状态
SELECT * FROM pg_stat_activity WHERE backend_type="client backend";
Pembuatan pesan dinamis
standar NOTIFY Perintah ini memerlukan konten pesan yang eksplisit, dan penggabungan string dinamis tidak didukung. tapi itu berhasil pg_notify() Fungsi untuk menghasilkan notifikasi dinamis:
-- 使用 pg_notify 函数支持动态消息
SELECT pg_notify('my_channel', 'Hello, ' || 'World!');
-- 带参数的动态消息
SELECT pg_notify('audit_channel', 'User ' || current_user || ' logged in at ' || now()::text);
Contoh implementasi Python
Struktur proyek
├── main.py # 核心实现:TaskWorker 和 TaskProducer
├── conf/
│ └── config.toml # 配置文件
└── pkg/
└── config/ # 配置管理模块
Instal dependensi
uv add "psycopg[binary,pool]>=3.3.3"
Manajemen konfigurasi
Pertama, konfigurasikan koneksi database dan pengaturan saluran melalui file konfigurasi:
# conf/config.toml
[database.postgres]
host = "127.0.0.1"
port = 5432
user = "username"
password = "password"
dbname = "database_name"
pool_min_size = 2
pool_max_size = 10
channel = "task_channel" # 默认通道名称
Contoh kode untuk mengkonfigurasi modul:pkg/config/config.py
import tomllib
from typing import Any, Dict
class BaseConfig:
def __init__(self, cfg_file: str):
self._cfg_file = cfg_file
self._data: Dict[str, Any] =
self._load_config()
def _load_config(self) -> None:
if self._data:
return
try:
with open(self._cfg_file, "rb") as f:
self._data = tomllib.load(f)
except FileNotFoundError:
raise RuntimeError(f"配置文件不存在: self._cfg_file")
except Exception as e:
raise RuntimeError(f"加载配置文件失败: e")
class PostgresConfigMixin(BaseConfig):
def postgres_host(self) -> str:
return self._data.get("database", ).get("postgres", ).get("host", "")
def postgres_port(self) -> int:
return self._data.get("database", ).get("postgres", ).get("port", 5432)
def postgres_user(self) -> str:
return self._data.get("database", ).get("postgres", ).get("user", "")
def postgres_password(self) -> str:
return self._data.get("database", ).get("postgres", ).get("password", "")
def postgres_dbname(self) -> str:
return self._data.get("database", ).get("postgres", ).get("dbname", "")
def postgres_pool_min_size(self) -> int:
return (
self._data.get("database", ).get("postgres", ).get("pool_min_size", 2)
)
def postgres_pool_max_size(self) -> int:
return (
self._data.get("database", ).get("postgres", ).get("pool_max_size", 10)
)
def postgres_channel(self) -> str:
return (
self._data.get("database", )
.get("postgres", )
.get("channel", "default_channel")
)
def get_postgres_dsn(self, hide_password: bool = False) -> str:
"""获取PostgreSQL连接DSN"""
password = self.postgres_password()
if hide_password and password:
password = "***"
# psycopg3 使用标准的 PostgreSQL 连接字符串格式
return (
f"postgresql://self.postgres_user():password@"
f"self.postgres_host():self.postgres_port()/self.postgres_dbname()"
)
class LLMConfigMixin(BaseConfig):
"""LLM配置Mixin"""
def llm_model(self) -> str:
return self._data.get("llm", ).get("model", "")
def llm_base_url(self) -> str:
return self._data.get("llm", ).get("base_url", "")
def llm_api_key(self) -> str:
return self._data.get("llm", ).get("api_key", "")
class Config(PostgresConfigMixin, LLMConfigMixin):
def __init__(self, cfg_file: str = "conf/config.toml"):
super().__init__(cfg_file)
def reload(self) -> None:
"""重新加载配置"""
self._data = # 清空数据
self._load_config()
Implementasi komponen inti
1. Konsumen Tugas (Pekerja Tugas)
TaskWorker Bertanggung jawab untuk mendengarkan saluran yang ditugaskan dan memproses tugas yang diterima:
# main.py - TaskWorker 类核心部分
import asyncio
import json
import signal
from typing import Set
from psycopg import AsyncConnection, Notify, sql
from psycopg_pool import AsyncConnectionPool
from pkg.config import cfg
MAX_CONCURRENCY = 10
class TaskWorker:
def __init__(self, dsn: str, channel: str):
self._dsn = dsn
self.channel = channel
self.pool: AsyncConnectionPool | None = None
self.listener_conn: AsyncConnection | None = None
self.sem = asyncio.Semaphore(MAX_CONCURRENCY)
self.active_tasks: Set[asyncio.Task] = set()
async def start(self) -> None:
self.pool = AsyncConnectionPool(
self._dsn,
min_size=cfg.postgres_pool_min_size(),
max_size=cfg.postgres_pool_max_size(),
open=False, # 延迟打开, 避免阻塞
)
await self.pool.open()
# 独立监听连接,防止 LISTEN 状态随连接回收丢失
self.listener_conn = await AsyncConnection.connect(self._dsn, autocommit=True)
await self.listener_conn.execute(
sql.SQL("LISTEN ").format(sql.Identifier(self.channel))
)
print(f"Listening on channel: self.channel")
try:
async for notify in self.listener_conn.notifies():
if notify.channel == self.channel and notify.payload:
await self._dispatch_task(notify)
except asyncio.CancelledError:
print("Listener cancelled")
except Exception as e:
print(f"Listener error: e")
finally:
await self.stop()
async def _dispatch_task(self, notify: Notify) -> None:
"""接收通知并分发任务"""
task_info = notify.payload.strip()
try:
task_data = json.loads(task_info) # 假设 payload 是 JSON 格式的字符串
print(f"Received task notification: task_data")
except json.JSONDecodeError:
task_data = "task_id": task_info # 如果不是 JSON 格式,使用原始字符串
print(f"Received non-JSON task notification: task_data")
except Exception as e:
print(f"Invalid task ID received: task_info")
return
if not isinstance(task_data, dict) or "task_id" not in task_data:
print(
f"Missing task_id in notification: task_data, or task_data is not a dict"
)
return
task = asyncio.create_task(self._process_task(task_data))
self.active_tasks.add(task)
task.add_done_callback(self.active_tasks.discard)
async def _process_task(self, task_data: dict) -> None:
"""执行业务逻辑"""
if not self.pool:
raise RuntimeError("Connection pool is not initialized")
async with self.sem:
async with self.pool.connection() as conn:
try:
await self._execute_business(task_data)
print(f"<= Task task_data['task_id'] completed successfully")
except Exception as e:
print(f"Error processing task task_data['task_id']: e")
# 这里可以添加重试逻辑或错误记录
await self._log_failure(conn, task_data["task_id"], str(e))
async def _execute_business(self, task_data: dict) -> None:
"""执行业务逻辑"""
print(f"<= Processing task task_data...")
await asyncio.sleep(5) # 模拟耗时操作
print(f"<= Task task_data done.")
async def _log_failure(self, conn: AsyncConnection, task_id: int, error_msg: str):
"""记录失败日志"""
try:
# 记录失败日志到独立表,便于后续重试或告警
print(f"Logging failure for task task_id: error_msg")
except Exception as e:
print(f"Failed to log error for task task_id: e")
async def stop(self) -> None:
print("Stopping TaskWorker...")
if self.active_tasks:
await asyncio.gather(*self.active_tasks, return_exceptions=True)
if self.listener_conn:
await self.listener_conn.close()
if self.pool:
await self.pool.close()
print("TaskWorker stopped gracefully.")
2. Penerbit Tugas
TaskPublisher Bertanggung jawab untuk menerbitkan pesan tugas ke saluran:
# main.py - TaskPublisher 类
class TaskPublisher:
def __init__(self, dsn: str):
self._dsn = dsn
self._pool: AsyncConnectionPool | None = None
async def start(self):
if not self._pool:
self._pool = AsyncConnectionPool(
self._dsn,
min_size=cfg.postgres_pool_min_size(),
max_size=cfg.postgres_pool_max_size(),
open=False, # 延迟打开, 避免阻塞
)
await self._pool.open()
print("TaskPublisher started.")
async def publish(self, channel: str, payload: dict):
if not self._pool:
raise RuntimeError("Connection pool is not initialized")
async with self._pool.connection() as conn:
try:
payload_str = json.dumps(
payload, default=str
) # 将 dict 转换为 JSON 字符串
await conn.execute(
sql.SQL("NOTIFY , ").format(
sql.Identifier(channel), sql.Literal(payload_str)
)
)
print(f"=> Published task to channel channel: payload_str")
return True
except Exception as e:
print(f"Failed to publish task to channel channel: e")
return False
async def publish_batch(self, channel: str, payloads: list[dict]):
if not self._pool:
raise RuntimeError("Connection pool is not initialized")
count = 0
async with self._pool.connection() as conn:
for payload in payloads:
try:
payload_str = json.dumps(payload, default=str)
await conn.execute(
sql.SQL("NOTIFY , ").format(
sql.Identifier(channel), sql.Literal(payload_str)
)
)
print(f"=> Published task to channel channel: payload_str")
count += 1
except Exception as e:
print(f"Failed to publish batch tasks to channel channel: e")
return count
async def stop(self):
if self._pool:
await self._pool.close()
print("TaskPublisher stopped.")
async def __aenter__(self):
await self.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.stop()
Panggil demo
Berikut ini adalahmain.pyBagian demo di:
# main.py
async def run_worker():
worker = TaskWorker(cfg.get_postgres_dsn(), cfg.postgres_channel())
task = asyncio.create_task(worker.start())
return task
async def run_publisher():
async with TaskPublisher(cfg.get_postgres_dsn()) as publisher:
for i in range(1, 11):
payload = "task_id": i, "data": f"Task data i"
await publisher.publish(cfg.postgres_channel(), payload)
await asyncio.sleep(0.5) # 模拟发布间隔
async def main():
# worker = TaskWorker(cfg.get_postgres_dsn(), cfg.postgres_channel())
# loop = asyncio.get_running_loop()
# stop_evt = asyncio.Event()
# for sig in (signal.SIGINT, signal.SIGTERM):
# loop.add_signal_handler(sig, stop_evt.set)
# listen_task = asyncio.create_task(worker.start())
# await stop_evt.wait()
# print("Shutdown signal received, stopping worker...")
# listen_task.cancel()
# await listen_task
worker_task = await run_worker()
await asyncio.sleep(2) # 确保 worker 已经启动并监听
await run_publisher() # 发布任务
print("All tasks published successfully.")
await asyncio.sleep(30) # 等待 worker 处理完任务
worker_task.cancel() # 停止 worker
try:
await worker_task
except asyncio.CancelledError:
pass
if __name__ == "__main__":
asyncio.run(main())
Ekstensi skenario penggunaan
Skenario 1: Sinkronisasi data waktu nyata
async def sync_data_change(self, table_name: str, record_id: str, operation: str):
"""数据变更时发送同步通知"""
message = f"table_name:record_id:operation"
await self.publish("data_sync_channel", message)
Skenario 2: Mendistribusikan berita-berita penting
async def notify_lock_release(self, lock_name: str):
"""锁释放时通知等待者"""
await self.publish("distributed_lock_channel", f"RELEASE:lock_name")
Skenario 3: Siaran tidak memvalidasi cache
async def invalidate_cache(self, cache_key: str):
"""缓存失效时广播通知"""
await self.publish("cache_invalidation_channel", cache_key)
Hal-hal yang perlu diperhatikan
keterbatasan teknis
- ukuran pesan: PEMBERITAHUAN pesan ukuran maksimum 8000 byte
- Tidak ada ketekunan: Pesan tidak persisten dan akan hilang setelah restart.
- Tidak ada mekanisme konfirmasi: Pengirim tidak mengetahui apakah pesan telah diterima
- tidak menjamin pesanan: Pesan dapat keluar dari pesanan yang dikirim
Lingkungan produksi yang direkomendasikan
- Pantau alarm: Memantau status pemantauan saluran
- Penanganan kesalahan: Tambahkan penanganan kesalahan dan pencatatan yang sempurna
- Mekanisme cadangan: Pesan penting harus dicadangkan dan disimpan
- Tes kinerja: Menguji kinerja sistem pada beban tinggi
Isi lagi
versi asinkron
asyncpgIni adalah driver asinkron murni untuk python untuk terhubung ke postgres, dengan kinerja lebih baik.
Memasang
uv add asyncpg
Contoh kode:
import asyncio
import signal
from typing import Set
import asyncpg
from pkg.config import cfg
MAX_CONCURRENT_TASKS = 5 # 最大并发任务数
class TaskWorker:
def __init__(self, dsn: str, channel: str):
self._dsn = dsn
self._channel = channel
self._pool: asyncpg.Pool | None = None
self._listener_conn: asyncpg.Connection | None = None
self._sem = asyncio.Semaphore(MAX_CONCURRENT_TASKS)
self._active_tasks: Set[asyncio.Task] = set()
async def start(self):
"""启动监听器"""
if not self._pool:
self._pool = await asyncpg.create_pool(
dsn=self._dsn,
min_size=cfg.postgres_pool_min_size(),
max_size=cfg.postgres_pool_max_size(),
)
# 创建专用连接用于监听
if not self._listener_conn:
self._listener_conn = await asyncpg.connect(dsn=self._dsn)
await self._listener_conn.add_listener(self._channel, self._on_notify)
print(f"Listening on channel: self._channel")
try:
# 保持监听状态
await asyncio.Future()
except asyncio.CancelledError:
pass
finally:
await self.stop()
async def _on_notify(
self, conn: asyncpg.Connection, pid: int, channel: str, payload: str
):
"""收到 NOTIFY 时的回调函数"""
if not payload:
return
task_id = payload.strip()
print(f"Received notification: task_id on channel: channel")
# 提交到事件循环,带并发限制
task = asyncio.create_task(self._handle_task(task_id))
self._active_tasks.add(task)
task.add_done_callback(self._active_tasks.discard)
async def _handle_task(self, task_id: str):
"""任务处理核心逻辑"""
if not self._pool:
raise RuntimeError("数据库连接池未初始化")
async with self._sem: # 并发控制
async with self._pool.acquire() as conn:
try:
# 1. 执行业务逻辑
await self._execute_business_logic(task_id)
# 2. 更新任务状态(示例)
print(f"Task task_id completed successfully.")
except Exception as e:
print(f"Task task_id failed: e")
await self._log_failure(task_id, str(e))
async def _execute_business_logic(self, task_id: str):
"""模拟业务逻辑处理"""
print(f"Processing task task_id...")
await asyncio.sleep(5) # 模拟耗时操作
print(f"Task task_id completed.")
async def _log_failure(self, task_id: str, error: str):
"""记录失败日志"""
print(f"Task task_id failure logged: error")
# 实际项目中可记录到数据库
async def stop(self):
"""优雅关闭"""
print("Shutting down gracefully...")
# 等待正在运行的任务完成
if self._active_tasks:
await asyncio.gather(*self._active_tasks, return_exceptions=True)
# 清理资源
if self._listener_conn:
await self._listener_conn.remove_listener(self._channel, self._on_notify)
await self._listener_conn.close()
if self._pool:
await self._pool.close()
print("Shutdown complete.")
async def main():
worker = TaskWorker(cfg.get_postgres_dsn(), cfg.postgres_channel())
loop = asyncio.get_running_loop()
stop_evt = asyncio.Event()
# 注册信号处理器,优雅关闭
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, stop_evt.set)
# 启动监听器
listen_task = asyncio.create_task(worker.start())
# 等待停止事件
await stop_evt.wait()
print("Shutdown signal received, stopping worker...")
# 取消监听任务并等待完成
listen_task.cancel()
await listen_task
if __name__ == "__main__":
asyncio.run(main())
PakarPBN
A Private Blog Network (PBN) is a collection of websites that are controlled by a single individual or organization and used primarily to build backlinks to a “money site” in order to influence its ranking in search engines such as Google. The core idea behind a PBN is based on the importance of backlinks in Google’s ranking algorithm. Since Google views backlinks as signals of authority and trust, some website owners attempt to artificially create these signals through a controlled network of sites.
In a typical PBN setup, the owner acquires expired or aged domains that already have existing authority, backlinks, and history. These domains are rebuilt with new content and hosted separately, often using different IP addresses, hosting providers, themes, and ownership details to make them appear unrelated. Within the content published on these sites, links are strategically placed that point to the main website the owner wants to rank higher. By doing this, the owner attempts to pass link equity (also known as “link juice”) from the PBN sites to the target website.
The purpose of a PBN is to give the impression that the target website is naturally earning links from multiple independent sources. If done effectively, this can temporarily improve keyword rankings, increase organic visibility, and drive more traffic from search results.