UploadDataFile

class openmsistream.UploadDataFile(filepath, to_upload=True, rootdir=None, filename_append=None, **kwargs)

Bases: DataFile, Runnable

Class to represent a data file whose messages will be uploaded to a topic

Used as part of a DataFileUploadDirectory, but can also be run standalone to upload a single file.

Parameters:
  • filepath (pathlib.Path) – The path to the file on disk

  • to_upload (bool, optional) – True (default) if the file should be uploaded. Used to set the existing files as “already uploaded” when running a DataFileUploadDirectory.

  • rootdir (None or pathlib.Path, optional) – path to the “root” directory that this file is in. Anything in the path beyond it will be added to the DataFileChunk so that it will be reconstructed inside a subdirectory

  • filename_append (None or str, optional) – a string that should be appended to the end of the filename stem to distinguish the file that’s produced from its original file on disk

ARGUMENT_PARSER_TYPE

alias of OpenMSIStreamArgumentParser

upload_whole_file(config_path, topic_name, n_threads=2, chunk_size=524288)

Chunk and upload an entire file on disk to a broker’s topic.

Parameters:
  • config_path (pathlib.Path) – Path to the config file to use in defining the Broker connection and Producers

  • topic_name (str) – Name of the topic to which the file’s messages should be produced

  • n_threads (int, optional) – The number of threads/Producers to run at once during uploading

  • chunk_size (int, optional) – The size of the file chunk in each message in bytes

add_chunks_to_upload(chunks_to_add=None, chunk_size=524288)

Add chunks from this file to the internal list of chunks to upload, possibly with some selection defined by select_bytes

Parameters:
  • chunks_to_add (None or list[int], optional) – a list of chunk indices to add to the list to be uploaded (Default=None adds all chunks)

  • chunk_size (int, optional) – The size of the file chunk in each message in bytes

enqueue_chunks_for_upload(queue, n_threads=None, chunk_size=524288, queue_full_timeout=0.001)

Add some chunks of this file from the internal list to a given upload queue (the internal list will be created if add_chunks_to_upload() hasn’t already been called).

When the entire internal list of file chunks has been added to the queue, the file will be marked as fully enqueued.

If the given queue is full, this function will wait a bit and then return.

Parameters:
  • queue (queue.Queue) – the Queue to which chunks should be added

  • n_threads (None or int, optional) – the number of threads running during uploading; at most 5*this number of chunks will be added per call to this method. If this argument isn’t given, every chunk in the internal list will be added.

  • chunk_size (int, optional) – The size of the file chunk in each message in bytes

  • queue_full_timeout (float, optional) – amount of time to wait before returning if the queue is full and new messages can’t be added

classmethod get_command_line_arguments()

Return the names of arguments for the logger stream and file levels.

classmethod get_init_args_kwargs(parsed_args)

Get the list of init arguments and the dictionary of init keyword arguments for this class given a namespace of, for example, parsed arguments.

Parameters:

parsed_args (argparse.Namespace) – A namespace containing entries needed to determine the init args and kwargs for this class

Returns:

A list of init args

Returns:

A dictionary of init kwargs

classmethod run_from_command_line(args=None)

Run an UploadDataFile directly from the command line

Calls upload_whole_file() on an UploadDataFile defined by command line (or given) arguments

Parameters:

args (list) – the list of arguments to send to the parser instead of getting them from sys.argv

property select_bytes

In child classes, this property can be a list of tuples of (start_byte,stop_byte) in the file that will be the only ranges of bytes added when creating the list of chunks. The empty list in the base class will cause all bytes of the file to be uploaded.

property to_upload

whether or not this file will be considered when uploading some group of data files

property fully_enqueued

True if this file has had all of its chunks added to an upload queue

property fully_produced

True if this file has had all of its chunks successfully sent to the broker (only used when run as part of a DataFileUploadDirectory)

property waiting_to_upload

True if this file is waiting for its upload to begin

property upload_in_progress

True if this file is in the process of being enqueued to be uploaded

property upload_status_msg

A message stating the file’s name and status w.r.t. being enqueued to be uploaded

property chunked_at_timestamp

A datetime object representing a timestamp of when the list of chunks was built