MetadataJSONReproducer

class openmsistream.metadata_extraction.MetadataJSONReproducer(config_file, consumer_topic_name, producer_topic_name, **kwargs)

Bases: DataFileStreamReproducer, ABC

Processes a stream of DataFileChunks, extracting metadata from fully-reconstructed files, and producing those metadata fields as JSON to a different topic.

This is a base class that cannot be instantiated on its own.

Parameters:
  • config_file (pathlib.Path) – Path to the config file to use in defining the Broker connection, Consumers, and Producers

  • consumer_topic_name (str) – Name of the topic to which the Consumer(s) should be subscribed

  • producer_topic_name (str) – Name of the topic to which the Producer(s) should produce the processing results

  • output_dir (pathlib.Path, optional) – Path to the directory where the log and csv registry files should be kept (if None a default will be created in the current directory)

  • datafile_type (DownloadDataFileToMemory, optional) – the type of data file that recognized files should be reconstructed as (must be a subclass of DownloadDataFileToMemory)

  • n_producer_threads (int, optional) – the number of producers to run. The total number of producer/consumer threads started is max(n_consumer_threads,n_producer_threads).

  • n_consumer_threads (int, optional) – the number of consumers to run. The total number of producer/consumer threads started is max(n_consumer_threads,n_producer_threads).

  • consumer_group_id (str, optional) – the group ID under which each consumer should be created

  • filepath_regex – If given, only messages associated with files whose paths match this regex will be consumed

Raises:

ValueError – if datafile_type is not a subclass of DownloadDataFileToMemory

abstract _get_metadata_dict_for_file(datafile)

Given a DownloadDataFileToMemory, extract and return its metadata as a dictionary. The returned dictionary will be serialized to JSON and produced to the destination topic.

This function can raise errors to be logged if metadata extraction doesn’t proceed as expected.

Not implemented in base class

Parameters:

datafile (DownloadDataFileToMemory) – A DownloadDataFileToMemory object that has received all of its messages from the topic

Returns:

metadata keys/values

Return type:

dict

classmethod run_from_command_line(args=None)

Run produce_processing_results_for_files_as_read() to continually extract metadata from consumed files and produce them as json contents to another topic