ROV CTD NATS Client
There is a NATS messaging system available for clients to get ROV CTD data from. There are essentially two different mechanisms in NATS: streaming and buckets. Streaming is just like it sounds, you can subscribe to a message topic and you will start receiving a stream of messages. The bucket method is a key:value store that essentially stores the most recent values for different data items.
Message Structure
For the streaming messages: UNDER CONSTRUCTION
Python
Streaming
Below is an example of subscribing to message streams for the ROV CTD using Python.
import json
import asyncio
from nats.aio.client import Client as NATS
NATS_CHANNEL = "navproc.SEABIRD_CTD_MSG"
NATS_HOST = "coredata-rcsn.rc.mbari.org"
NATS_PORT = 4222
async def main():
nc = NATS()
await nc.connect("nats://{}:{}".format(NATS_HOST, NATS_PORT))
print(f"Subscribing to {NATS_CHANNEL}")
async def message_handler(msg):
data = json.loads(msg.data.decode())
print(json.dumps(data, indent=2))
await nc.subscribe(NATS_CHANNEL, cb=message_handler)
print("Listening ... press Ctrl+C to exit")
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
await nc.close()
if __name__ == "__main__":
asyncio.run(main())
Buckets
In order to read data from the bucket (key:value store), you can use the following example. In this example, it first grabs all the keys that are available in the bucket and prints them, and then it goes into a loop where it reads the ROV CTD Temperature and then sleep for 5 seconds.
# bucket.py
import asyncio
from nats.aio.client import Client as NATS
NATS_HOST = "coredata-rcsn.rc.mbari.org"
NATS_PORT = 4222
BUCKET_NAME = "rcsn"
DATA_KEY = "ROV.CTD.TEMPERATURE"
async def main():
nc = NATS()
await nc.connect(f"nats://{NATS_HOST}:{NATS_PORT}")
js = nc.jetstream()
kv = await js.key_value(BUCKET_NAME)
# List all keys
keys = await kv.keys()
if not keys:
print(f"No keys in bucket '{BUCKET_NAME}'")
return
print(f"Keys in bucket '{BUCKET_NAME}'")
for key in keys:
print(f" - {key}")
print(f"Watching key '{DATA_KEY}' in bucket '{BUCKET_NAME}'...")
while True:
try:
entry = await kv.get(DATA_KEY)
print(f"[{DATA_KEY}] = {entry.value.decode()}")
except Exception as e:
print(f"Failed to get key: {e}")
await asyncio.sleep(5)
if __name__ == "__main__":
asyncio.run(main())