OpenMSIStreamProducer
- class openmsistream.kafka_wrapper.OpenMSIStreamProducer(producer_type, configs, kafkacrypto=None, **kwargs)
Bases:
LogOwner
Wrapper for working with a Producer of some type. Expects message values that are
DataFileChunk
objects by default; other message value types can be accommodated by setting “value.serializer” in the config file.- Parameters:
producer_type (
confluent_kafka.SerializingProducer
orkafkacrypto.KafkaProducer
) – The type of underlying Producer that should be usedconfigs (dict) – A dictionary of configuration names and parameters to use in instantiating the underlying Producer
kafkacrypto (
OpenMSIStreamKafkaCrypto
, optional) – TheOpenMSIStreamKafkaCrypto
object that should be used to instantiate the Producer. Only needed if producer_type iskafkacrypto.KafkaProducer
.kwargs (dict) – Any extra keyword arguments (other than “logger”) are added to the configuration dict for the Producer, with underscores in their names replaced by dots
- Raises:
ValueError – if producer_type is not
confluent_kafka.SerializingProducer
orkafkacrypto.KafkaProducer
ValueError – if producer_type is
kafkacrypto.KafkaProducer
and kafkacrypto is None
- static get_producer_args_kwargs(config_file_path, logger=None, kafkacrypto=None, **kwargs)
Return the list of arguments and dictionary or keyword arguments that should be used to instantiate
OpenMSIStreamProducer
objects based on the given config file. Used to share a singleOpenMSIStreamKafkaCrypto
instance across several Producers.- Parameters:
config_file_path (
pathlib.Path
) – Path to the config file to use in defining Producerslogger (
Logger
) – TheLogger
object to use for each of theOpenMSIStreamProducer
objectskafkacrypto (
OpenMSIStreamKafkaCrypto
, optional) – TheOpenMSIStreamKafkaCrypto
object that should be used to instantiate Producers. Only needed if a single specificOpenMSIStreamKafkaCrypto
instance should be shared.kwargs (dict) – Any extra keyword arguments are added to the configuration dict for the Producers, with underscores in their names replaced by dots
- Returns:
ret_args, the list of arguments to create new
OpenMSIStreamProducer
objects based on the config file and arguments to this method- Return type:
- Returns:
ret_kwargs, the dictionary of keyword arguments to create new
OpenMSIStreamProducer
objects based on the config file and arguments to this method- Return type:
- classmethod from_file(*args, **kwargs)
Wrapper around
get_producer_args_kwargs()
and theOpenMSIStreamProducer
constructor to return a singleOpenMSIStreamProducer
from a given config file/arguments. Arguments are the same asget_producer_args_kwargs()
- Returns:
An
OpenMSIStreamProducer
object based on the given config file/arguments- Return type:
- produce_from_queue(queue, topic_name, **kwargs)
Get a
Producible
object from a given Queue and produce it to the given topic. Does nothing if the queue is empty, and does not block waiting for items from the queue.Meant to be run in multiple threads in parallel.
- Parameters:
queue (
queue.Queue
) – thequeue.Queue
holding objects that should be producedtopic_name (str) – the name of the topic to produce to
kwargs (dict) – other keyword arguments are passed to
produce_object()
- produce_from_queue_looped(queue, topic_name, **kwargs)
Get
Producible
objects from a given Queue and produce them to the given topic. Blocks while waiting for items to appear in the queue. Runs until “None” is pulled from the queue.Meant to be run in multiple threads in parallel.
- Parameters:
queue (
queue.Queue
) – thequeue.Queue
holding objects that should be producedtopic_name (str) – the name of the topic to produce to
kwargs (dict) – other keyword arguments are passed to
produce_object()
- produce_object(obj, topic_name, callback=None, print_every=1000, timeout=60, retry_sleep=5)
Produce a given
Producible
object to a given topic, with some handling for BufferErrors, calling poll() automatically, and using callbacks constructed on the fly.- Parameters:
obj (
Producible
) – the object to producetopic_name (str) – the name of the topic to produce to
callback (producer callback function (takes at least "err" and "msg" arguments), optional) – a function that should be called for each message upon recognition by the broker. Will be wrapped in a lambda for each call to produce(). Arguments to the callback function are determined by the particular type of
Producible
object usedprint_every (int, optional) – print/log progress every (this many) messages
timeout (float, optional) – max time (seconds) to wait for the message to be produced in the event of (repeated) BufferError(s)
retry_sleep (float, optional) – how long (seconds) to wait between produce attempts if one fails with a BufferError
- Returns:
True if the call to produce is successful, False otherwise.
- Return type:
- produce(topic, key, value, **kwargs)
Produce a message to a topic. Other kwargs are passed through to the underlying producer’s produce() function.
- Parameters:
topic (str) – the name of the topic to produce to
key (depends on serialization settings) – the key of the message
value (depends on the serialization settings) – the value of the message
- poll(*args, **kwargs)
Wrapper around Producer.poll()
- flush(*args, **kwargs)
Wrapper around Producer.flush()
- close()
Wrapper around
kafkacrypto.KafkaCrypto.close()
. It’s important to call this on shutdown if the Producer is producing encrypted messages.