From 36bcefddf66602291e9c2c84db14b17a27459712 Mon Sep 17 00:00:00 2001 From: elanaku Date: Fri, 11 Oct 2024 01:42:20 -0300 Subject: [PATCH 1/4] An attempt at code to write kafka topics etc --- python/lsst/summit/utils/kafka_attempt.py | 100 ++++++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 python/lsst/summit/utils/kafka_attempt.py diff --git a/python/lsst/summit/utils/kafka_attempt.py b/python/lsst/summit/utils/kafka_attempt.py new file mode 100644 index 000000000..dce2ac37e --- /dev/null +++ b/python/lsst/summit/utils/kafka_attempt.py @@ -0,0 +1,100 @@ +import requests + + +def create_kafka_postISRmedian_topics(): + '''This function creates the kafka topics for postISR pixel counts. + Note: I do not understand if you're supposed to only do this once + or do it every time a new cluster is made, or something else. + Thus I don't really know how this function should be written or if it + should be a function at all.''' + + ### Note: I'm not sure if you want this on usdf or the summit or the option of doing both + sasquatch_rest_proxy_urls = ["https://summit-lsp.lsst.codes/sasquatch-rest-proxy", "https://usdf-rsp-dev.slac.stanford.edu/sasquatch-rest-proxy"] + + headers = {"content-type": "application/json"} + + # make a list of the topics you want to create + ### I have no idea what reasonable partition counts or replication factors are, so I just copied from the tutorial + ### Not sure of the correct topic name eiter + all_topic_configs = [{ + "topic_name": "lsst.dm.latiss.postIsrPixelMedian", + "partitions_count": 1, + "replication_factor": 3 + }, + { + "topic_name": "lsst.dm.comcam.postIsrPixelMedian", + "partitions_count": 1, + "replication_factor": 3 + }, + ] + + for sasquatch_url in sasquatch_rest_proxy_urls: + + # get cluster id + r = requests.get(f"{sasquatch_url}/v3/clusters", headers=headers) + + cluster_id = r.json()['data'][0]['cluster_id'] + + headers = {"content-type": "application/json"} + + # create your kafka topics + for topic_config in all_topic_configs: + response = requests.post(f"{sasquatch_url}/v3/clusters/{cluster_id}/topics", json=topic_config, headers=headers) + + print(response.text) # yes I know this is terrible and I should use a logger + + +def post_to_sasquatch_latiss_isr(timestamp, obsid, postIsrPixelMedian): + '''I think this function posts to sasquatch''' + + # not sure again if this will be summit or usdf + url = "https://usdf-rsp-dev.slac.stanford.edu/sasquatch-rest-proxy/topics/lsst.dm.latiss.postIsrPixelMedian" + + payload = { + "value_schema": '{"namespace": "lsst.dm.latiss", "type": "record", "name": "postIsrPixelMedian", "fields": [{"name": "timestamp", "type": "long"}, {"name": "obsid", "type": "integer"}, {"name": "instrument", "type": "string", "default": "LATISS"}, {"name": "postIsrPixelMedian","type": "float"}]}', + "records": [ + { + "value": { + "timestamp": timestamp, + "obsid": obsid, + "instrument": "LATISS", + "postIsrPixelMedian": postIsrPixelMedian, + } + } + ], + } + headers = { + "Content-Type": "application/vnd.kafka.avro.v2+json", + "Accept": "application/vnd.kafka.v2+json", + } + + response = requests.request("POST", url, json=payload, headers=headers) + +def post_to_sasquatch_comcam_isr(timestamp, obsid, postIsrPixelMedian, postIsrPixelMedianMedian, postIsrPixelMedianMean, postIsrPixelMedianMax): + '''I think this function posts to sasquatch''' + + # not sure again if this will be summit or usdf + url = "https://usdf-rsp-dev.slac.stanford.edu/sasquatch-rest-proxy/topics/lsst.dm.latiss.postIsrPixelMedian" + + payload = { + "value_schema": '{"namespace": "lsst.dm.comcam", "type": "record", "name": "postIsrPixelMedian", "fields": [{"name": "timestamp", "type": "long"}, {"name": "obsid", "type": "integer"}, {"name": "instrument", "type": "string", "default": "ComCam"}, {"name": "postIsrPixelMedian","type": "float"}, {"name": "postIsrPixelMedianMedian","type": "float"}, {"name": "postIsrPixelMedianMean","type": "float"}, {"name": "postIsrPixelMedianMax","type": "float"}]}', + "records": [ + { + "value": { + "timestamp": timestamp, + "obsid": obsid, + "instrument": "ComCam", + "postIsrPixelMedian": postIsrPixelMedian, + "postIsrPixelMedianMedian": postIsrPixelMedianMedian, + "postIsrPixelMedianMean": postIsrPixelMedianMean, + "postIsrPixelMedianMax": postIsrPixelMedianMax, + } + } + ], + } + headers = { + "Content-Type": "application/vnd.kafka.avro.v2+json", + "Accept": "application/vnd.kafka.v2+json", + } + + response = requests.request("POST", url, json=payload, headers=headers) From e5a192fbe8c4833c13dd2ef27a32e45c1ad801ef Mon Sep 17 00:00:00 2001 From: elanaku Date: Fri, 11 Oct 2024 01:51:00 -0300 Subject: [PATCH 2/4] Fix formatting --- python/lsst/summit/utils/kafka_attempt.py | 97 +++++++++++++++-------- 1 file changed, 66 insertions(+), 31 deletions(-) diff --git a/python/lsst/summit/utils/kafka_attempt.py b/python/lsst/summit/utils/kafka_attempt.py index dce2ac37e..c4d23e13e 100644 --- a/python/lsst/summit/utils/kafka_attempt.py +++ b/python/lsst/summit/utils/kafka_attempt.py @@ -2,56 +2,72 @@ def create_kafka_postISRmedian_topics(): - '''This function creates the kafka topics for postISR pixel counts. - Note: I do not understand if you're supposed to only do this once + """This function creates the kafka topics for postISR pixel counts. + Note: I do not understand if you're supposed to only do this once or do it every time a new cluster is made, or something else. - Thus I don't really know how this function should be written or if it - should be a function at all.''' - - ### Note: I'm not sure if you want this on usdf or the summit or the option of doing both - sasquatch_rest_proxy_urls = ["https://summit-lsp.lsst.codes/sasquatch-rest-proxy", "https://usdf-rsp-dev.slac.stanford.edu/sasquatch-rest-proxy"] + Thus I don't really know how this function should be written or if it + should be a function at all.""" + + # Note: I'm not sure if you want this on usdf or the summit + # or the option of doing both + sasquatch_rest_proxy_urls = [ + "https://summit-lsp.lsst.codes/sasquatch-rest-proxy", + "https://usdf-rsp-dev.slac.stanford.edu/sasquatch-rest-proxy", + ] headers = {"content-type": "application/json"} - + # make a list of the topics you want to create - ### I have no idea what reasonable partition counts or replication factors are, so I just copied from the tutorial - ### Not sure of the correct topic name eiter - all_topic_configs = [{ + # I have no idea what reasonable partition counts or replication + # factors are, so I just copied from the tutorial + # Not sure of the correct topic name either + all_topic_configs = [ + { "topic_name": "lsst.dm.latiss.postIsrPixelMedian", "partitions_count": 1, - "replication_factor": 3 + "replication_factor": 3, }, - { + { "topic_name": "lsst.dm.comcam.postIsrPixelMedian", "partitions_count": 1, - "replication_factor": 3 + "replication_factor": 3, }, ] - + for sasquatch_url in sasquatch_rest_proxy_urls: - - # get cluster id + # get cluster id r = requests.get(f"{sasquatch_url}/v3/clusters", headers=headers) - cluster_id = r.json()['data'][0]['cluster_id'] + cluster_id = r.json()["data"][0]["cluster_id"] headers = {"content-type": "application/json"} # create your kafka topics for topic_config in all_topic_configs: - response = requests.post(f"{sasquatch_url}/v3/clusters/{cluster_id}/topics", json=topic_config, headers=headers) + response = requests.post( + f"{sasquatch_url}/v3/clusters/{cluster_id}/topics", + json=topic_config, + headers=headers, + ) - print(response.text) # yes I know this is terrible and I should use a logger - + print(response.text) # yes I know this is terrible and I should use a logger + + +def post_to_sasquatch_latiss_isr(timestamp, obsid, postIsrPixelMedian): + """I think this function posts to sasquatch""" -def post_to_sasquatch_latiss_isr(timestamp, obsid, postIsrPixelMedian): - '''I think this function posts to sasquatch''' - # not sure again if this will be summit or usdf - url = "https://usdf-rsp-dev.slac.stanford.edu/sasquatch-rest-proxy/topics/lsst.dm.latiss.postIsrPixelMedian" + url = ( + "https://usdf-rsp-dev.slac.stanford.edu/sasquatch-rest-proxy/topics/lsst.dm.latiss.postIsrPixelMedian" + ) payload = { - "value_schema": '{"namespace": "lsst.dm.latiss", "type": "record", "name": "postIsrPixelMedian", "fields": [{"name": "timestamp", "type": "long"}, {"name": "obsid", "type": "integer"}, {"name": "instrument", "type": "string", "default": "LATISS"}, {"name": "postIsrPixelMedian","type": "float"}]}', + "value_schema": '{"namespace": "lsst.dm.latiss", "type": "record", \ + "name": "postIsrPixelMedian", "fields": \ + [{"name": "timestamp", "type": "long"}, \ + {"name": "obsid", "type": "integer"}, \ + {"name": "instrument", "type": "string", "default": "LATISS"}, \ + {"name": "postIsrPixelMedian","type": "float"}]}', "records": [ { "value": { @@ -69,15 +85,33 @@ def post_to_sasquatch_latiss_isr(timestamp, obsid, postIsrPixelMedian): } response = requests.request("POST", url, json=payload, headers=headers) + print(response.text) + + +def post_to_sasquatch_comcam_isr( + timestamp, + obsid, + postIsrPixelMedian, + postIsrPixelMedianMedian, + postIsrPixelMedianMean, + postIsrPixelMedianMax, +): + """I think this function posts to sasquatch""" -def post_to_sasquatch_comcam_isr(timestamp, obsid, postIsrPixelMedian, postIsrPixelMedianMedian, postIsrPixelMedianMean, postIsrPixelMedianMax): - '''I think this function posts to sasquatch''' - # not sure again if this will be summit or usdf - url = "https://usdf-rsp-dev.slac.stanford.edu/sasquatch-rest-proxy/topics/lsst.dm.latiss.postIsrPixelMedian" + url = ( + "https://usdf-rsp-dev.slac.stanford.edu/sasquatch-rest-proxy/topics/lsst.dm.latiss.postIsrPixelMedian" + ) payload = { - "value_schema": '{"namespace": "lsst.dm.comcam", "type": "record", "name": "postIsrPixelMedian", "fields": [{"name": "timestamp", "type": "long"}, {"name": "obsid", "type": "integer"}, {"name": "instrument", "type": "string", "default": "ComCam"}, {"name": "postIsrPixelMedian","type": "float"}, {"name": "postIsrPixelMedianMedian","type": "float"}, {"name": "postIsrPixelMedianMean","type": "float"}, {"name": "postIsrPixelMedianMax","type": "float"}]}', + "value_schema": '{"namespace": "lsst.dm.comcam", "type": "record", \ + "name": "postIsrPixelMedian", "fields": [{"name": "timestamp", \ + "type": "long"}, {"name": "obsid", "type": "integer"}, \ + {"name": "instrument", "type": "string", "default": "ComCam"}, \ + {"name": "postIsrPixelMedian","type": "float"},\ + {"name": "postIsrPixelMedianMedian","type": "float"}, \ + {"name": "postIsrPixelMedianMean","type": "float"}, \ + {"name": "postIsrPixelMedianMax","type": "float"}]}', "records": [ { "value": { @@ -98,3 +132,4 @@ def post_to_sasquatch_comcam_isr(timestamp, obsid, postIsrPixelMedian, postIsrPi } response = requests.request("POST", url, json=payload, headers=headers) + print(response.text) From 3c23b414c07413a077a70b49a02538fc3c14234a Mon Sep 17 00:00:00 2001 From: elanaku Date: Fri, 11 Oct 2024 16:41:25 -0300 Subject: [PATCH 3/4] Add listener steps --- python/lsst/summit/utils/kafka_attempt.py | 145 ++++++++++++++++++++++ 1 file changed, 145 insertions(+) diff --git a/python/lsst/summit/utils/kafka_attempt.py b/python/lsst/summit/utils/kafka_attempt.py index c4d23e13e..a6ea82afb 100644 --- a/python/lsst/summit/utils/kafka_attempt.py +++ b/python/lsst/summit/utils/kafka_attempt.py @@ -1,4 +1,6 @@ import requests +from kafka import KafkaConsumer +import json def create_kafka_postISRmedian_topics(): @@ -133,3 +135,146 @@ def post_to_sasquatch_comcam_isr( response = requests.request("POST", url, json=payload, headers=headers) print(response.text) + + +""" Making the listener""" + + +def listen_to_kafka(topic, key, group_id, broker): + """ + topic is Topic name, e.g. 'lsst.dm.latiss.postIsrPixelMedian' + key is key e.g. postIsrPixelMedian + """ + + # Kafka consumer configuration + consumer = KafkaConsumer( + topic, + bootstrap_servers=broker, # Kafka broker(s) - e.g. ['localhost:9092'] + group_id=group_id, # Consumer group ID (for load balancing) - no clue + auto_offset_reset="latest", # Start reading from the latest message (ignore past messages) + enable_auto_commit=True, # Automatically commit the offsets + value_deserializer=lambda m: json.loads(m.decode("utf-8")), # Deserialize JSON messages + ) + + print("Waiting for Kafka messages...") + + # Infinite loop to keep the consumer active + for message in consumer: + # This block is executed as soon as a message is received + print(f"New message received: {message.value}") + + # Extract a value from the message + value = message.value.get(key) + print(f"Extracted value: {value}") + return value + + # Perform your desired processing on the message + + +""" +# Kafka user - not sure where this lives + +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaUser +metadata: + name: toyapp + labels: + # The name of the Strimzi ``Kafka`` resource, probably "sasquatch" + strimzi.io/cluster: sasquatch + + authentication: + # This should always be "tls" + type: tls + + authorization: + type: simple + acls: + + # If your app consumes messages, this gives permission to consume as + # part of any consumer group that starts with the named prefix. + - resource: + type: group + name: "lsst.dm" + patternType: prefix + operations: + - "Read" + host: "*" + +# Kafka access - not sure where this lives either + +apiVersion: access.strimzi.io/v1alpha1 +kind: KafkaAccess +metadata: + name: toyapp-kafka +spec: + kafka: + # The name and namespace of the Strimzi ``Kafka`` resource, probably + # "sasquatch", but not sure here... + name: sasquatch + namespace: sasquatch + # This should always be "tls" + listener: tls + user: + kind: KafkaUser + apiGroup: kafka.strimzi.io + # This is the name of the ``KafkaUser`` that you created + name: toyapp + # This is the namespace of the ``KafkaUser``, NOT your app's namespace, + # probably "sasquatch" + namespace: sasquatch + + +# providing credentials... this lives in the ap's container... + +apiVersion: apps/v1 +kind: Deployment +metadata: + ... + name: toyapp + namespace: toyapp +spec: + ... + template: + ... + spec: + containers: + - env: + - name: KAFKA_SECURITY_PROTOCOL + secretKeyRef: + key: securityProtocol + name: myapp-kafka + - name: KAFKA_BOOTSTRAP_SERVERS + valueFrom: + secretKeyRef: + key: bootstrapServers + name: myapp-kafka + - name: KAFKA_CLUSTER_CA_PATH + value: /etc/kafkacluster/ca.crt + - name: KAFKA_CLIENT_CERT_PATH + value: /etc/kafkauser/user.crt + - name: KAFKA_CLIENT_KEY_PATH + value: /etc/kafkauser/user.key + + ... + + volumeMounts: + - mountPath: /etc/kafkacluster/ca.crt + name: kafka + subPath: ssl.truststore.crt + - mountPath: /etc/kafkauser/user.crt + name: kafka + subPath: ssl.keystore.crt + - mountPath: /etc/kafkauser/user.key + name: kafka + subPath: ssl.keystore.key + + ... + + volumes: + - name: kafka + secret: + defaultMode: 420 + # The ``metadata.name`` value from the ``KafkaAccess`` resource in + # your app's namespace + secretName: toyapp-kafka +""" From 456b367cf841d7e189962e2d447f513f4d15d573 Mon Sep 17 00:00:00 2001 From: elanaku Date: Fri, 11 Oct 2024 18:20:02 -0300 Subject: [PATCH 4/4] Made REST API listener --- python/lsst/summit/utils/kafka_attempt.py | 146 +++------------------- 1 file changed, 14 insertions(+), 132 deletions(-) diff --git a/python/lsst/summit/utils/kafka_attempt.py b/python/lsst/summit/utils/kafka_attempt.py index a6ea82afb..065f9f36b 100644 --- a/python/lsst/summit/utils/kafka_attempt.py +++ b/python/lsst/summit/utils/kafka_attempt.py @@ -1,6 +1,5 @@ import requests -from kafka import KafkaConsumer -import json +import time def create_kafka_postISRmedian_topics(): @@ -140,141 +139,24 @@ def post_to_sasquatch_comcam_isr( """ Making the listener""" -def listen_to_kafka(topic, key, group_id, broker): +def listen_to_kafka(topic, key, obsid, broker): """ topic is Topic name, e.g. 'lsst.dm.latiss.postIsrPixelMedian' key is key e.g. postIsrPixelMedian """ - # Kafka consumer configuration - consumer = KafkaConsumer( - topic, - bootstrap_servers=broker, # Kafka broker(s) - e.g. ['localhost:9092'] - group_id=group_id, # Consumer group ID (for load balancing) - no clue - auto_offset_reset="latest", # Start reading from the latest message (ignore past messages) - enable_auto_commit=True, # Automatically commit the offsets - value_deserializer=lambda m: json.loads(m.decode("utf-8")), # Deserialize JSON messages - ) - - print("Waiting for Kafka messages...") - - # Infinite loop to keep the consumer active - for message in consumer: - # This block is executed as soon as a message is received - print(f"New message received: {message.value}") - - # Extract a value from the message - value = message.value.get(key) - print(f"Extracted value: {value}") - return value - - # Perform your desired processing on the message - - -""" -# Kafka user - not sure where this lives - -apiVersion: kafka.strimzi.io/v1beta2 -kind: KafkaUser -metadata: - name: toyapp - labels: - # The name of the Strimzi ``Kafka`` resource, probably "sasquatch" - strimzi.io/cluster: sasquatch - - authentication: - # This should always be "tls" - type: tls - - authorization: - type: simple - acls: - - # If your app consumes messages, this gives permission to consume as - # part of any consumer group that starts with the named prefix. - - resource: - type: group - name: "lsst.dm" - patternType: prefix - operations: - - "Read" - host: "*" - -# Kafka access - not sure where this lives either - -apiVersion: access.strimzi.io/v1alpha1 -kind: KafkaAccess -metadata: - name: toyapp-kafka -spec: - kafka: - # The name and namespace of the Strimzi ``Kafka`` resource, probably - # "sasquatch", but not sure here... - name: sasquatch - namespace: sasquatch - # This should always be "tls" - listener: tls - user: - kind: KafkaUser - apiGroup: kafka.strimzi.io - # This is the name of the ``KafkaUser`` that you created - name: toyapp - # This is the namespace of the ``KafkaUser``, NOT your app's namespace, - # probably "sasquatch" - namespace: sasquatch - - -# providing credentials... this lives in the ap's container... - -apiVersion: apps/v1 -kind: Deployment -metadata: - ... - name: toyapp - namespace: toyapp -spec: - ... - template: - ... - spec: - containers: - - env: - - name: KAFKA_SECURITY_PROTOCOL - secretKeyRef: - key: securityProtocol - name: myapp-kafka - - name: KAFKA_BOOTSTRAP_SERVERS - valueFrom: - secretKeyRef: - key: bootstrapServers - name: myapp-kafka - - name: KAFKA_CLUSTER_CA_PATH - value: /etc/kafkacluster/ca.crt - - name: KAFKA_CLIENT_CERT_PATH - value: /etc/kafkauser/user.crt - - name: KAFKA_CLIENT_KEY_PATH - value: /etc/kafkauser/user.key - - ... + # not sure again if this will be summit or usdf + url = f"https://usdf-rsp-dev.slac.stanford.edu/sasquatch-rest-proxy/topics/{topic}" - volumeMounts: - - mountPath: /etc/kafkacluster/ca.crt - name: kafka - subPath: ssl.truststore.crt - - mountPath: /etc/kafkauser/user.crt - name: kafka - subPath: ssl.keystore.crt - - mountPath: /etc/kafkauser/user.key - name: kafka - subPath: ssl.keystore.key + headers = { + "Content-Type": "application/vnd.kafka.avro.v2+json", + "Accept": "application/vnd.kafka.v2+json", + } - ... + while True: # probably replace with a timeout of some kind, and better polling + response = requests.request("GET", url, headers=headers) + # need to check whether these are the right outputs + if response["value"]["obsid"] == obsid: + return response["value"]["key"] - volumes: - - name: kafka - secret: - defaultMode: 420 - # The ``metadata.name`` value from the ``KafkaAccess`` resource in - # your app's namespace - secretName: toyapp-kafka -""" + time.sleep(0.5)