import socket
import json
import time
import sys
import math
from r3e_api import R3ESharedMemory
# ==========================
# CONFIGURATION
# ==========================
UDP_IP = "127.0.0.1" # LabVIEW is running on the same PC
UDP_PORT = 57468 # Must match the port in your LabVIEW VI
# Create UDP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Fix encoding issue on Windows terminal
if sys.platform == 'win32':
sys.stdout.reconfigure(encoding='utf-8')
# ==========================
# MAIN
# ==========================
def main():
print("=== R3E → LabVIEW UDP JSON Sender ===\n")
# Access RaceRoom shared memory
shared_memory = R3ESharedMemory()
shared_memory.update_offsets()
try:
while True:
shared_memory.update_buffer()
player_data = shared_memory.get_value('Player')
# Extract player velocity and G-forces
local_velocity = player_data.get('LocalVelocity', {})
local_gforce = player_data.get('LocalGforce', {})
# --- Calculate speed ---
lv_x = local_velocity.get('X', 0.0)
lv_y = local_velocity.get('Y', 0.0)
lv_z = local_velocity.get('Z', 0.0)
speed_ms = math.sqrt(lv_x**2 + lv_y**2 + lv_z**2)
speed_kmh = speed_ms * 3.6
# --- Direction ---
if lv_x > 0:
direction = "FORWARD"
elif lv_x < 0:
direction = "BACKWARD"
else:
direction = "STOPPED"
# --- G-Forces ---
g_lat = local_gforce.get('Z', 0.0)
g_lon = local_gforce.get('X', 0.0)
# --- Prepare JSON packet ---
data = {
"Speed": round(speed_kmh, 2),
"G Forces Latitude": round(g_lat, 3),
"G Forces Longitude": round(g_lon, 3),
"Direction": direction
}
json_data = json.dumps(data)
# --- Send to LabVIEW ---
sock.sendto(json_data.encode('utf-8'), (UDP_IP, UDP_PORT))
# Print to console (optional)
print(f"\rSent to LabVIEW: {json_data}", end='', flush=True)
time.sleep(0.05) # ~20 Hz update rate
except KeyboardInterrupt:
print("\nStopped.")
finally:
sock.close()
# Run main
if __name__ == "__main__":
main()
Comments