Skip to main content

Reconnect

The following is a simple demonstration that automatically reconnects the trading socket when a disconnection event is detected using a callback:

from fubon_neo.sdk import FubonSDK
import threading

# === Add Your Login Information ===
USER_ID = "ID"
USER_PW = "Trade Password"
CERT_PATH = "Certificate File Location"
CERT_PW = "Certificate Password"

# --- If you want also to reconnect the marketdata WebSocket, you can implement it here ---
def handle_marketdata_ws_disconnect(code, msg):
print(f"[Marketdata WS] Reconnection, reason: {code}, {msg}")

# === Global process lock to prevent multiple triggerings ===
relogin_lock = threading.Lock()

# === Initiate a FubonNeo SDK instnace ===
sdk = FubonSDK()
accounts = None # To store account information after login successfully

# === Order and other callback functions(to be implemented) ===
def on_order(code, msg): pass
def on_order_changed(code, msg): pass
def on_filled(code, msg): pass

def re_login():
"""
When connection dropped, reconnect and log in, and reset all callback functions.
Use the process lock to prevent multiple triggerings.
"""
if not relogin_lock.acquire(blocking=False):
print("[Reconnect] In progress, ignore this call")
return

try:
global accounts, sdk
print("[Reconnect] Try to log out before reconnecting...")
try:
sdk.logout()
except Exception as e:
print("[Reconnect] Exception while logging out:", e)

try:
sdk = FubonSDK()
accounts = sdk.login(USER_ID, USER_PW, CERT_PATH, CERT_PW)
except Exception as e:
print("[Reconnect] Exception while logging in:", e)
return

if accounts.is_success:
print("[Reconnect] Login successfully, reset all callback functions")
# IMPORTANT: Please remeber to register all callback functions again!
sdk.set_on_event(on_event)
sdk.set_on_order(on_order)
sdk.set_on_order_changed(on_order_changed)
sdk.set_on_filled(on_filled)

# Implement the marketdata reconnection here if needed
handle_marketdata_ws_disconnect(-9000, "Auto-reconnect")
else:
print("[Reconnect] Reconnection failed")
finally:
relogin_lock.release()

def on_event(code, content):
"""
FubonNeo SDK's event callback
"""
print("[Event] Code:", code, "| Content:", content)
if code == "300":
print("[Event] Connection dropped(Code 300), start reconnecting ...")
re_login()

# === Login and register callback functions ===
accounts = sdk.login(USER_ID, USER_PW, CERT_PATH, CERT_PW)
if accounts.is_success:
print("[Main] Login successful, registering callbacks ...")
sdk.set_on_event(on_event)
sdk.set_on_order(on_order)
sdk.set_on_order_changed(on_order_changed)
sdk.set_on_filled(on_filled)

# Can set different account(if choices are available)
acc = accounts.data[0] if hasattr(accounts, "data") else None
else:
print("[Main] Login failed!")

# === Example: Simulate reconnecting event to test the program ===
on_event("300", "Test: WebSocket connection dropped")
on_event("300", "Test:Trigger the relogin again to test the process lock")

# --- End of The Example ---