import os
import re
import signal
import traceback
import numpy as np
from PIL import Image
from progressbar import ETA, Bar, Percentage, ProgressBar
from qubic.calfiles import PATH as cal_dir
from qubic.data import PATH as data_dir
from qubic.dicts import PATH as dicts_dir
_NLEVELS = 0
_MAX_NLEVELS = 0
[docs]
def join_toward_rank(comm, data, target_rank):
# print('enter', target_rank)
gathered_data = comm.gather(data, root=target_rank)
# print('bis')
if comm.Get_rank() == target_rank:
# print(' bis bis')
return np.concatenate(gathered_data) # [0]
else:
return
[docs]
def join_data(comm, data):
if comm is None:
pass
else:
data = comm.gather(data, root=0)
if comm.Get_rank() == 0:
data = np.concatenate(data)
data = comm.bcast(data)
return data
[docs]
def split_data(comm, theta):
if comm is None:
return theta
else:
return np.array_split(theta, comm.Get_size())[comm.Get_rank()]
class _ProgressBar(ProgressBar):
def __init__(self, poll=0.1, **keywords):
global _NLEVELS, _MAX_NLEVELS
self._nlevels = _NLEVELS
_NLEVELS += 1
_MAX_NLEVELS = max(_MAX_NLEVELS, _NLEVELS)
ProgressBar.__init__(self, poll=poll, **keywords)
if self._nlevels == 0:
self._signal_old = signal.signal(signal.SIGINT, self._int_handler)
print("\n\n\033[F", end="")
def update(self, n=None):
if n is not None:
ProgressBar.update(self, n)
return
ProgressBar.update(self, self.currval + 1)
if self.currval >= self.maxval:
self.finish()
def finish(self):
global _NLEVELS, _MAX_NLEVELS
ProgressBar.finish(self)
if self._nlevels == 0:
print((_MAX_NLEVELS - 1) * "\n", end="")
signal.signal(signal.SIGINT, self._signal_old)
_MAX_NLEVELS = 0
else:
print("\033[F\033[F", end="")
_NLEVELS -= 1
def _int_handler(self, signum, frame):
global _NLEVELS, _MAX_NLEVELS
_NLEVELS = 0
_MAX_NLEVELS = 0
signal.signal(signal.SIGINT, self._signal_old)
e = KeyboardInterrupt()
e.__traceback__ = traceback.extract_stack(frame)
raise e
[docs]
def progress_bar(n, info=""):
"""
Return a default progress bar.
Example
-------
>>> import time
>>> n = 10
>>> bar = progress_bar(n, 'LOOP')
>>> for i in range(n):
... time.sleep(1)
... bar.update()
"""
return _ProgressBar(widgets=[info, Percentage(), Bar("=", "[", "]"), ETA()], maxval=n).start()
def _compress_mask(mask):
mask = mask.ravel()
if len(mask) == 0:
return ""
output = ""
old = mask[0]
n = 1
for new in mask[1:]:
if new is not old:
if n > 2:
output += str(n)
elif n == 2:
output += "+" if old else "-"
output += "+" if old else "-"
n = 1
old = new
else:
n += 1
if n > 2:
output += str(n)
elif n == 2:
output += "+" if old else "-"
output += "+" if old else "-"
return output
def _uncompress_mask(mask):
i = 0
mask_list = []
nmask = len(mask)
while i < nmask:
val = mask[i]
if val == "+":
mask_list.append(True)
i += 1
elif val == "-":
mask_list.append(False)
i += 1
else:
j = i + 1
val = mask[j]
while val not in ("+", "-"):
j += 1
val = mask[j]
mask_list.extend(int(mask[i:j]) * (True if val == "+" else False,))
i = j + 1
return np.array(mask_list, bool)
[docs]
def create_folder_if_not_exists(folder_name):
# Check if the folder exists
if not os.path.exists(folder_name):
try:
# Create the folder if it doesn't exist
os.makedirs(folder_name)
print(f"The folder '{folder_name}' has been created.")
except OSError as e:
print(f"Error creating the folder '{folder_name}': {e}")
else:
pass
[docs]
def do_gif(input_folder, N, filename, output="animation.gif"):
nmaps = np.arange(1, N + 1, 1)
image_list = []
# for filename in sorted(os.listdir(input_folder)):
for n in nmaps:
image_path = os.path.join(input_folder, filename + f"{n}.png")
image = Image.open(image_path)
image_list.append(image)
output_gif_path = os.path.join(input_folder, output)
# output_gif_path = f"figures/{stk}/animation.gif"
image_list[0].save(output_gif_path, save_all=True, append_images=image_list[1:], duration=100, loop=0)
[docs]
def find_file(filename, verbosity=1):
"""
find the full path to the file given the filename
It could be a dictionary, or a data, or calibration file.
We look first of all for the name, as given, which could be an absolute path
We then look in the current working directory
We also look in directories that have been defined in BASH environment variables
Finally, we look in the package directory for dictionary, or data (including calfiles)
we return the path name of the file that was found
Otherwise we print a "not found" error, and return None
"""
if os.path.isfile(filename):
return filename
basename = os.path.basename(filename)
dir_list = ["."]
if "QUBIC_DICT" in os.environ.keys():
dir_list.append(os.environ["QUBIC_DICT"])
for var in os.environ.keys():
match = re.search("QUBIC_.*DIR", var)
if match:
dir_list.append(os.environ[var])
dir_list += [dicts_dir, cal_dir, data_dir]
for D in dir_list:
filename_fullpath = os.path.join(D, basename)
if os.path.isfile(filename_fullpath):
return filename_fullpath
# if we get this far, then we haven't found the file
if verbosity > 0:
print("ERROR! File not found: %s" % basename)
print(" I looked in the following directories:\n %s" % " \n".join(dir_list))
return None