OpenMSIStreamProducer
- class openmsistream.kafka_wrapper.OpenMSIStreamProducer(producer_type, configs, kafkacrypto=None, **kwargs)
Bases:
LogOwnerWrapper for working with a Producer of some type. Expects message values that are
DataFileChunkobjects by default; other message value types can be accommodated by setting “value.serializer” in the config file.- Parameters:
producer_type (
confluent_kafka.SerializingProducerorkafkacrypto.KafkaProducer) – The type of underlying Producer that should be usedconfigs (dict) – A dictionary of configuration names and parameters to use in instantiating the underlying Producer
kafkacrypto (
OpenMSIStreamKafkaCrypto, optional) – TheOpenMSIStreamKafkaCryptoobject that should be used to instantiate the Producer. Only needed if producer_type iskafkacrypto.KafkaProducer.
- Raises:
ValueError – if producer_type is not
confluent_kafka.SerializingProducerorkafkacrypto.KafkaProducerValueError – if producer_type is
kafkacrypto.KafkaProducerand kafkacrypto is None
- static get_producer_args_kwargs(config_file_path, logger=None, kafkacrypto=None, **kwargs)
Return the list of arguments and dictionary or keyword arguments that should be used to instantiate
OpenMSIStreamProducerobjects based on the given config file. Used to share a singleOpenMSIStreamKafkaCryptoinstance across several Producers.- Parameters:
config_file_path (
pathlib.Path) – Path to the config file to use in defining Producerslogger (
Logger) – TheLoggerobject to use for each of theOpenMSIStreamProducerobjectskafkacrypto (
OpenMSIStreamKafkaCrypto, optional) – TheOpenMSIStreamKafkaCryptoobject that should be used to instantiate Producers. Only needed if a single specificOpenMSIStreamKafkaCryptoinstance should be shared.kwargs (dict) – Any extra keyword arguments are added to the configuration dict for the Producers, with underscores in their names replaced by dots
- Returns:
ret_args, the list of arguments to create new
OpenMSIStreamProducerobjects based on the config file and arguments to this method- Return type:
- Returns:
ret_kwargs, the dictionary of keyword arguments to create new
OpenMSIStreamProducerobjects based on the config file and arguments to this method- Return type:
- classmethod from_file(*args, **kwargs)
Wrapper around
get_producer_args_kwargs()and theOpenMSIStreamProducerconstructor to return a singleOpenMSIStreamProducerfrom a given config file/arguments. Arguments are the same asget_producer_args_kwargs()- Returns:
An
OpenMSIStreamProducerobject based on the given config file/arguments- Return type:
- produce_from_queue(queue, topic_name, **kwargs)
Get a
Producibleobject from a given Queue and produce it to the given topic. Does nothing if the queue is empty, and does not block waiting for items from the queue.Meant to be run in multiple threads in parallel.
- Parameters:
queue (
queue.Queue) – thequeue.Queueholding objects that should be producedtopic_name (str) – the name of the topic to produce to
kwargs (dict) – other keyword arguments are passed to
produce_object()
- produce_from_queue_looped(queue, topic_name, **kwargs)
Get
Producibleobjects from a given Queue and produce them to the given topic. Blocks while waiting for items to appear in the queue. Runs until “None” is pulled from the queue.Meant to be run in multiple threads in parallel.
- Parameters:
queue (
queue.Queue) – thequeue.Queueholding objects that should be producedtopic_name (str) – the name of the topic to produce to
kwargs (dict) – other keyword arguments are passed to
produce_object()
- produce_object(obj, topic_name, callback=None, print_every=1000, timeout=60, retry_sleep=5)
Produce a given
Producibleobject to a given topic, with some handling for BufferErrors, calling poll() automatically, and using callbacks constructed on the fly.- Parameters:
obj (
Producible) – the object to producetopic_name (str) – the name of the topic to produce to
callback (producer callback function (takes at least "err" and "msg" arguments), optional) – a function that should be called for each message upon recognition by the broker. Will be wrapped in a lambda for each call to produce(). Arguments to the callback function are determined by the particular type of
Producibleobject usedprint_every (int, optional) – print/log progress every (this many) messages
timeout (float, optional) – max time (seconds) to wait for the message to be produced in the event of (repeated) BufferError(s)
retry_sleep (float, optional) – how long (seconds) to wait between produce attempts if one fails with a BufferError
- Returns:
True if the call to produce is successful, False otherwise.
- Return type:
- produce(topic, key, value, **kwargs)
Produce a message to a topic. Other kwargs are passed through to the underlying producer’s produce() function.
- Parameters:
topic (str) – the name of the topic to produce to
key (depends on serialization settings) – the key of the message
value (depends on the serialization settings) – the value of the message
- poll(*args, **kwargs)
Wrapper around Producer.poll()
- flush(*args, **kwargs)
Wrapper around Producer.flush()
- close()
Wrapper around
kafkacrypto.KafkaCrypto.close(). It’s important to call this on shutdown if the Producer is producing encrypted messages.