UploadDataFile
- class openmsistream.UploadDataFile(filepath, to_upload=True, rootdir=None, filename_append=None, **kwargs)
Bases:
DataFile
,Runnable
Class to represent a data file whose messages will be uploaded to a topic
Used as part of a
DataFileUploadDirectory
, but can also be run standalone to upload a single file.- Parameters:
filepath (
pathlib.Path
) – The path to the file on diskto_upload (bool, optional) – True (default) if the file should be uploaded. Used to set the existing files as “already uploaded” when running a
DataFileUploadDirectory
.rootdir (None or
pathlib.Path
, optional) – path to the “root” directory that this file is in. Anything in the path beyond it will be added to the DataFileChunk so that it will be reconstructed inside a subdirectoryfilename_append (None or str, optional) – a string that should be appended to the end of the filename stem to distinguish the file that’s produced from its original file on disk
- ARGUMENT_PARSER_TYPE
alias of
OpenMSIStreamArgumentParser
- upload_whole_file(config_path, topic_name, n_threads=2, chunk_size=524288)
Chunk and upload an entire file on disk to a broker’s topic.
- Parameters:
config_path (
pathlib.Path
) – Path to the config file to use in defining the Broker connection and Producerstopic_name (str) – Name of the topic to which the file’s messages should be produced
n_threads (int, optional) – The number of threads/Producers to run at once during uploading
chunk_size (int, optional) – The size of the file chunk in each message in bytes
- add_chunks_to_upload(chunks_to_add=None, chunk_size=524288)
Add chunks from this file to the internal list of chunks to upload, possibly with some selection defined by
select_bytes
- enqueue_chunks_for_upload(queue, n_threads=None, chunk_size=524288, queue_full_timeout=0.001)
Add some chunks of this file from the internal list to a given upload queue (the internal list will be created if
add_chunks_to_upload()
hasn’t already been called).When the entire internal list of file chunks has been added to the queue, the file will be marked as fully enqueued.
If the given queue is full, this function will wait a bit and then return.
- Parameters:
queue (
queue.Queue
) – the Queue to which chunks should be addedn_threads (None or int, optional) – the number of threads running during uploading; at most 5*this number of chunks will be added per call to this method. If this argument isn’t given, every chunk in the internal list will be added.
chunk_size (int, optional) – The size of the file chunk in each message in bytes
queue_full_timeout (float, optional) – amount of time to wait before returning if the queue is full and new messages can’t be added
- classmethod get_command_line_arguments()
Return the names of arguments for the logger stream and file levels.
- classmethod get_init_args_kwargs(parsed_args)
Get the list of init arguments and the dictionary of init keyword arguments for this class given a namespace of, for example, parsed arguments.
- Parameters:
parsed_args (argparse.Namespace) – A namespace containing entries needed to determine the init args and kwargs for this class
- Returns:
A list of init args
- Returns:
A dictionary of init kwargs
- classmethod run_from_command_line(args=None)
Run an
UploadDataFile
directly from the command lineCalls
upload_whole_file()
on anUploadDataFile
defined by command line (or given) arguments- Parameters:
args (list) – the list of arguments to send to the parser instead of getting them from sys.argv
- property select_bytes
In child classes, this property can be a list of tuples of (start_byte,stop_byte) in the file that will be the only ranges of bytes added when creating the list of chunks. The empty list in the base class will cause all bytes of the file to be uploaded.
- property to_upload
whether or not this file will be considered when uploading some group of data files
- property fully_enqueued
True if this file has had all of its chunks added to an upload queue
- property fully_produced
True if this file has had all of its chunks successfully sent to the broker (only used when run as part of a
DataFileUploadDirectory
)
- property waiting_to_upload
True if this file is waiting for its upload to begin
- property upload_in_progress
True if this file is in the process of being enqueued to be uploaded
- property upload_status_msg
A message stating the file’s name and status w.r.t. being enqueued to be uploaded
- property chunked_at_timestamp
A datetime object representing a timestamp of when the list of chunks was built