Alert Streams

The Lasair broker can send immediate “push” notifications when your filter sees an interesting alert or, if your filter uses annotations, when the annotation is updated. Results are delivered using Kafka; alternatively you can get results as a daily email, although obviously this will only be suitable for filters that produce limited output

Resources

Filter triggering and notification

Lasair provides a protocol for immediate delivery that is suitable for machines to communicate with machines. It is called Kafka, and is the way alerts are delivered from the Rubin Observatory to brokers such as Lasair. By providing Kafka streams, Lasair provides a machine-readable packet of data that can cause action at your site.

You will need to be logged in to your Lasair account. Make a filter, then click “Save”. You will then be prompted for “Filter Settings”, which you can fill in like this:

You need a name and description.

The next menu indicates the trigger that makes your filter run, which can be when you click the “run filter” button on the web page, or if the filter is “active” it can be triggered by either an alert or an annotation. To keep the filter inactive, meaning it only runs from clicking, just uncheck both the trigger options.

If you select “On new alert”, then your filter is run in near-real-time against arriving alerts. There is an additional option if your filter involves annotations, so it can also run when the annotation is updated, either instead of or as well as when a new alert arrives. If you choose both of these options, you will get the results of the filter first when the alert arrives, using the old value of the annotation, then again onece the annotator as run and been uploaded and you get results with the new value of the annotation.

The notification menu asks what content you would like to appear in your output Kafka stream. See below for details.

Here are some examples of how the trigger and notification can be set for a filter:

Left: An inactive filter. Only runs when you click “Run Filter” on the web page.

Middle: A filter that produces the SELECTed attributes and the lite lightcurve, triggered by the arrival of an alert.

Right: A filter triggered by the arrival of the annotation that the filter is listening for, that produces only the SELECTed attributes.

At the bottom choose whether the filter is to be publicly visible or not.

Kafka Streams

When you save the filter, you see something like this:

The pink warning on the settings panel is connected to the green message in the response. Whenever a kafka filter is changed, the old records are deleted, and Lasair runs the new filter to try and put 10 records in the stream so you can see something with the Kafka consumer (code below). This is so you can experiment with the stream in a cycle of editing and consuming kafka. (Of course if your filter returns no objects, then none will be put in the kafka stream! Trying to read from such an empty topic will result in the error UNKNOWN_TOPIC_OR_PART; this is not a problem, the error will go away when the fist message is returned by the filter.) Once saved, you can run the filter in the usual way from the web browser, but you will have to wait for more alerts to arrive for more records to go in the stream.

In order to run the consumer code, you need the “topic name” corresponding to your filter, which is derived from the name you gave it in the settings. In this case the topic name is lasair_2Hasabsmag.

Types of Kafka Streams

The plain Kafka stream offers just the attributes you selected in your filter query. Supoose your SQL SELECT says objects.diaObjectId,  objects.decl, objects.ra, then your plain Kafka output would be just these, with a timestamp added for when the record was produced:

{
  "diaObjectId": 169760235333878021,
  "decl": -38.16173726666955,
  "ra": 221.87087177320953,
  "UTC": "2026-01-29 11:40:14",
}

If you choose the lite lightcurve option, you also get the basic lightcurve information from the alert record:

{
  "diaObjectId": 169760235333878021,
  "decl": -38.16173726666955,
  "ra": 221.87087177320953,
  "UTC": "2026-01-29 11:40:14",
  "alert": {
    "diaSourcesList": [
      {
        "psfFlux": 13677.0458984375,
        "psfFluxErr": 1538.7562255859375,
        "midpointMjdTai": 61029.354114730224,
        "band": "z",
        "reliability": 0.4863438308238983
      },
      ....  (perhaps 100 of these with 5 attributes each)
    ],
    "diaForcedSourcesList": [....]
  }
}

If you choose kafka with the full alert, you will get records like this, with the full alert data as received from Rubin, except for the cutout images:

{
  "diaObjectId": 169760235359568157,
  "decl": -38.15011815002772,
  "ra": 223.6809029068466,
  "UTC": "2026-01-29 11:40:14",
  "alert": {
    "diaObject": {
      "diaObjectId": 169760235359568157,
      "validityStartMjdTai": 61029.3580485083,
      "ra": 223.6809029068466,
      "raErr": 3.365151133039035e-05,
      ....  (78 more attributes)
    },
    "diaSourcesList": [
      {
        "diaSourceId": 169760235359568157,
        "visit": 2025121900283,
        "detector": 179,
        "diaObjectId": 169760235359568157,
        ....  (94 more attributes)
      },
      ....  (perhaps 100 of these packets of 98 attributes each)
    ],
    "diaForcedSourcesList": [ .... ],
  }
}

WARNING The full alert records can be quite large, perhaps a fraction of a megabyte, and if your filter passes a large number of these they will be dropped. Lasair cannot be expected to supply an individual user with all the data on all the alerts as that would be terabytes per night. In any case, Kafka records are deleted after 7 days, so you will need to be running a consumer either frequently or constantly to get everything.

Kafka consumer code

To run the code below, install Confluent Kafka, the python install being pip install confluent_kafka.

You will be connecting to lasair-lsst-kafka.lsst_pub.ac.uk on port 9092.

You will need to understand two concepts: topic_name and group_id. The topic_name is a string to identify which stream of alerts you want, which derives from the name of a Lasair active filter. The group_id tells Kafka where to start delivery to you. It is just a string that you can make up, for example “Susan3456”. The Kafka server remembers which group_ids it has seen before, and which was the last alert it delivered. When you start your code again with the same group_id, you only get alerts that arrived since last time you used that group_id. If you use a new group_id, you get the alerts from the start of the Kafka cache, which is about 7 days.

You can find the topic that corresponds to your filter in the detail page, shown here in the red oval:

The topic name is a combination of the string “lasair_”, the ID number of your user account, and a sanitised version of the name you gave the filter. Therefore if you edit the filter and change its name, the topic name will also change.

For testing purposes, the group_id will change frequently, and you can get all of the alerts the come from the given stream in the last 7 days. Then you will set up your program to run continuously, perhaps in a screen session on a server machine, or started every hour by cron. In this case, the group_id should remain constant, so you won’t get any alerts twice.

Here is the sample code

import json
from lasair import lasair_consumer

kafka_server = 'lasair-lsst-kafka_pub.lsst.ac.uk:9092'
group_id     = 'test123'
my_topic     = 'lasair_2Hasabsmag'
consumer = lasair_consumer(kafka_server, group_id, my_topic)
import json
n = 0
while n < 10:
    msg = consumer.poll(timeout=20)
    if msg is None:
        break
    if msg.error():
        print(str(msg.error()))
        break
    jmsg = json.loads(msg.value())
    print(json.dumps(jmsg, indent=2))
    print('===')
    n += 1
print('No more messages available')

Example notebook

There is a jupyter notebook that shows how to read from an active filter (like the code above), as well as make a plot including the diaSourceList and diaForcedSourceList. It assumes the filter has been saved with the ‘lite lightcurve’ option.

See consume_kafka_plot.ipynb.

Email Streaming

The email distribution is a much simpler notification process, and is intended for filters that do not pass many alerts. A single daily email will be sent on any day when there are results, containing all the results from the filters. If you choose the email option, you cannot get lightcurves, only the SELECTed attributes of the filter.