Skip to content

ROV CTD NATS Client

There is a NATS messaging system available for clients to get ROV CTD data from. There are essentially two different mechanisms in NATS: streaming and buckets. Streaming is just like it sounds, you can subscribe to a message topic and you will start receiving a stream of messages. The bucket method is a key:value store that essentially stores the most recent values for different data items.

Message Structure

For the streaming messages: UNDER CONSTRUCTION

Python

Streaming

Below is an example of subscribing to message streams for the ROV CTD using Python.

import json
import asyncio
from nats.aio.client import Client as NATS

NATS_CHANNEL = "navproc.SEABIRD_CTD_MSG"
NATS_HOST = "coredata-rcsn.rc.mbari.org"
NATS_PORT = 4222

async def main():
    nc = NATS()
    await nc.connect("nats://{}:{}".format(NATS_HOST, NATS_PORT))

    print(f"Subscribing to {NATS_CHANNEL}")

    async def message_handler(msg):
        data = json.loads(msg.data.decode())
        print(json.dumps(data, indent=2))

    await nc.subscribe(NATS_CHANNEL, cb=message_handler)

    print("Listening ... press Ctrl+C to exit")
    try:
        while True:
            await asyncio.sleep(1)
    except KeyboardInterrupt:
        await nc.close()

if __name__ == "__main__":
    asyncio.run(main())

Buckets

In order to read data from the bucket (key:value store), you can use the following example. In this example, it first grabs all the keys that are available in the bucket and prints them, and then it goes into a loop where it reads the ROV CTD Temperature and then sleep for 5 seconds.

# bucket.py
import asyncio
from nats.aio.client import Client as NATS

NATS_HOST = "coredata-rcsn.rc.mbari.org"
NATS_PORT = 4222
BUCKET_NAME = "rcsn"
DATA_KEY = "ROV.CTD.TEMPERATURE"

async def main():
    nc = NATS()
    await nc.connect(f"nats://{NATS_HOST}:{NATS_PORT}")

    js = nc.jetstream()

    kv = await js.key_value(BUCKET_NAME)

    # List all keys
    keys = await kv.keys()
    if not keys:
        print(f"No keys in bucket '{BUCKET_NAME}'")
        return
    print(f"Keys in bucket '{BUCKET_NAME}'")
    for key in keys:
        print(f" - {key}")

    print(f"Watching key '{DATA_KEY}' in bucket '{BUCKET_NAME}'...")

    while True:
        try:
            entry = await kv.get(DATA_KEY)
            print(f"[{DATA_KEY}] = {entry.value.decode()}")
        except Exception as e:
            print(f"Failed to get key: {e}")
        await asyncio.sleep(5)

if __name__ == "__main__":
    asyncio.run(main())