Skip to main content

斷線重連

以下將用簡單示範,利用callback偵測交易Socket,接收到斷線事件後,程式自動進行Socket重新連線:

from fubon_neo.sdk import FubonSDK
import threading

# === 請填入你的真實憑證資料 ===
USER_ID = "您的身分證字號"
USER_PW = "您的登入密碼"
CERT_PATH = "您的憑證位置"
CERT_PW = "您的憑證密碼"

# --- 若有行情 WebSocket 需要重連,可以在這裡實作 ---
def handle_marketdata_ws_disconnect(code, msg):
print(f"[行情WS] 重新建立連線,原因: {code}, {msg}")

# === 全域鎖定,避免同時重複觸發重連 ===
relogin_lock = threading.Lock()

# === 建立 FubonSDK 實例(全域使用,重複利用) ===
sdk = FubonSDK()
accounts = None # 登入後會儲存帳號資訊

# === 下單與成交等事件的處理函式(可自訂) ===
def on_order(code, msg): pass
def on_order_changed(code, msg): pass
def on_filled(code, msg): pass

def re_login():
"""
當連線異常時,自動重新登入並重設事件 callback。
用 lock 確保同時間只會有一個重連動作。
"""
if not relogin_lock.acquire(blocking=False):
print("[自動重連] 目前已有重連程序執行中,這次略過。")
return

try:
global accounts, sdk
print("[自動重連] 登出並重新登入...")
try:
sdk.logout()
except Exception as e:
print("[自動重連] 登出時發生例外:", e)

try:
sdk = FubonSDK()
accounts = sdk.login(USER_ID, USER_PW, CERT_PATH, CERT_PW)
except Exception as e:
print("[自動重連] 登入時發生例外:", e)
return

if accounts.is_success:
print("[自動重連] 重新登入成功,重新設定所有事件 callback。")
# 很重要:重新登入後一定要重設事件 callback!
sdk.set_on_event(on_event)
sdk.set_on_order(on_order)
sdk.set_on_order_changed(on_order_changed)
sdk.set_on_filled(on_filled)

# 若有行情 WS 也需要重連,可自行呼叫
handle_marketdata_ws_disconnect(-9000, "交易WS已重連")
else:
print("[自動重連] 重新登入失敗。")
finally:
relogin_lock.release()

def on_event(code, content):
"""
FubonSDK 的事件通知 callback。
"""
print("[事件通知] 代碼:", code, "| 內容:", content)
if code == "300":
print("[事件通知] 偵測到斷線(代碼300),啟動自動重連。")
re_login()

# === 一開始就要先登入並註冊各種 callback ===
accounts = sdk.login(USER_ID, USER_PW, CERT_PATH, CERT_PW)
if accounts.is_success:
print("[主程式] 登入成功,開始註冊事件 callback...")
sdk.set_on_event(on_event)
sdk.set_on_order(on_order)
sdk.set_on_order_changed(on_order_changed)
sdk.set_on_filled(on_filled)

# 範例:可以指定預設帳號(如果有多個)
acc = accounts.data[0] if hasattr(accounts, "data") else None
else:
print("[主程式] 登入失敗!")

# === 範例:手動模擬斷線事件,測試自動重連機制 ===
on_event("300", "測試:WebSocket 已斷線")
on_event("300", "測試:再觸發一次斷線,理論上 lock 會防止重複重連")

# --- 範例結束 ---