OpenMSIStreamProducer

class openmsistream.kafka_wrapper.OpenMSIStreamProducer(producer_type, configs, kafkacrypto=None, **kwargs)

Bases: LogOwner

Wrapper for working with a Producer of some type. Expects message values that are DataFileChunk objects by default; other message value types can be accommodated by setting “value.serializer” in the config file.

Parameters:
  • producer_type (confluent_kafka.SerializingProducer or kafkacrypto.KafkaProducer) – The type of underlying Producer that should be used

  • configs (dict) – A dictionary of configuration names and parameters to use in instantiating the underlying Producer

  • kafkacrypto (OpenMSIStreamKafkaCrypto, optional) – The OpenMSIStreamKafkaCrypto object that should be used to instantiate the Producer. Only needed if producer_type is kafkacrypto.KafkaProducer.

  • kwargs (dict) – Any extra keyword arguments (other than “logger”) are added to the configuration dict for the Producer, with underscores in their names replaced by dots

Raises:
  • ValueError – if producer_type is not confluent_kafka.SerializingProducer or kafkacrypto.KafkaProducer

  • ValueError – if producer_type is kafkacrypto.KafkaProducer and kafkacrypto is None

static get_producer_args_kwargs(config_file_path, logger=None, kafkacrypto=None, **kwargs)

Return the list of arguments and dictionary or keyword arguments that should be used to instantiate OpenMSIStreamProducer objects based on the given config file. Used to share a single OpenMSIStreamKafkaCrypto instance across several Producers.

Parameters:
  • config_file_path (pathlib.Path) – Path to the config file to use in defining Producers

  • logger (Logger) – The Logger object to use for each of the OpenMSIStreamProducer objects

  • kafkacrypto (OpenMSIStreamKafkaCrypto, optional) – The OpenMSIStreamKafkaCrypto object that should be used to instantiate Producers. Only needed if a single specific OpenMSIStreamKafkaCrypto instance should be shared.

  • kwargs (dict) – Any extra keyword arguments are added to the configuration dict for the Producers, with underscores in their names replaced by dots

Returns:

ret_args, the list of arguments to create new OpenMSIStreamProducer objects based on the config file and arguments to this method

Return type:

list

Returns:

ret_kwargs, the dictionary of keyword arguments to create new OpenMSIStreamProducer objects based on the config file and arguments to this method

Return type:

dict

classmethod from_file(*args, **kwargs)

Wrapper around get_producer_args_kwargs() and the OpenMSIStreamProducer constructor to return a single OpenMSIStreamProducer from a given config file/arguments. Arguments are the same as get_producer_args_kwargs()

Returns:

An OpenMSIStreamProducer object based on the given config file/arguments

Return type:

OpenMSIStreamProducer

produce_from_queue(queue, topic_name, **kwargs)

Get a Producible object from a given Queue and produce it to the given topic. Does nothing if the queue is empty, and does not block waiting for items from the queue.

Meant to be run in multiple threads in parallel.

Parameters:
produce_from_queue_looped(queue, topic_name, **kwargs)

Get Producible objects from a given Queue and produce them to the given topic. Blocks while waiting for items to appear in the queue. Runs until “None” is pulled from the queue.

Meant to be run in multiple threads in parallel.

Parameters:
produce_object(obj, topic_name, callback=None, print_every=1000, timeout=60, retry_sleep=5)

Produce a given Producible object to a given topic, with some handling for BufferErrors, calling poll() automatically, and using callbacks constructed on the fly.

Parameters:
  • obj (Producible) – the object to produce

  • topic_name (str) – the name of the topic to produce to

  • callback (producer callback function (takes at least "err" and "msg" arguments), optional) – a function that should be called for each message upon recognition by the broker. Will be wrapped in a lambda for each call to produce(). Arguments to the callback function are determined by the particular type of Producible object used

  • print_every (int, optional) – print/log progress every (this many) messages

  • timeout (float, optional) – max time (seconds) to wait for the message to be produced in the event of (repeated) BufferError(s)

  • retry_sleep (float, optional) – how long (seconds) to wait between produce attempts if one fails with a BufferError

Returns:

True if the call to produce is successful, False otherwise.

Return type:

bool

produce(topic, key, value, **kwargs)

Produce a message to a topic. Other kwargs are passed through to the underlying producer’s produce() function.

Parameters:
  • topic (str) – the name of the topic to produce to

  • key (depends on serialization settings) – the key of the message

  • value (depends on the serialization settings) – the value of the message

poll(*args, **kwargs)

Wrapper around Producer.poll()

flush(*args, **kwargs)

Wrapper around Producer.flush()

close()

Wrapper around kafkacrypto.KafkaCrypto.close(). It’s important to call this on shutdown if the Producer is producing encrypted messages.