Python源码示例:joblib.Parallel()

示例1
def get_graph_stats(graph_obj_handle, prop='degrees'):
    # if prop == 'degrees':
    num_cores = multiprocessing.cpu_count()
    inputs = [int(i*len(graph_obj_handle)/num_cores) for i in range(num_cores)] + [len(graph_obj_handle)]
    res = Parallel(n_jobs=num_cores)(delayed(get_values)(graph_obj_handle, inputs[i], inputs[i+1], prop) for i in range(num_cores))

    stat_dict = {}

    if 'degrees' in prop:
        stat_dict['degrees'] = list(set([d for core_res in res for file_res in core_res for d in file_res['degrees']]))
    if 'edge_labels' in prop:
        stat_dict['edge_labels'] = list(set([d for core_res in res for file_res in core_res for d in file_res['edge_labels']]))
    if 'target_mean' in prop or 'target_std' in prop:
        param = np.array([file_res['params'] for core_res in res for file_res in core_res])
    if 'target_mean' in prop:
        stat_dict['target_mean'] = np.mean(param, axis=0)
    if 'target_std' in prop:
        stat_dict['target_std'] = np.std(param, axis=0)

    return stat_dict 
示例2
def build_save_containers(platforms, registry, load_cache) -> int:
    """
    Entry point to build and upload all built dockerimages in parallel
    :param platforms: List of platforms
    :param registry: Docker registry name
    :param load_cache: Load cache before building
    :return: 1 if error occurred, 0 otherwise
    """
    from joblib import Parallel, delayed
    if len(platforms) == 0:
        return 0

    platform_results = Parallel(n_jobs=len(platforms), backend="multiprocessing")(
        delayed(_build_save_container)(platform, registry, load_cache)
        for platform in platforms)

    is_error = False
    for platform_result in platform_results:
        if platform_result is not None:
            logging.error('Failed to generate %s', platform_result)
            is_error = True

    return 1 if is_error else 0 
示例3
def __init__(self, path, split, tokenizer, bucket_size, ascending=False):
        # Setup
        self.path = path
        self.bucket_size = bucket_size

        # List all wave files
        file_list = []
        for s in split:
            split_list = list(Path(join(path, s)).rglob("*.flac"))
            assert len(split_list) > 0, "No data found @ {}".format(join(path,s))
            file_list += split_list
        # Read text
        text = Parallel(n_jobs=READ_FILE_THREADS)(
            delayed(read_text)(str(f)) for f in file_list)
        #text = Parallel(n_jobs=-1)(delayed(tokenizer.encode)(txt) for txt in text)
        text = [tokenizer.encode(txt) for txt in text]

        # Sort dataset by text length
        #file_len = Parallel(n_jobs=READ_FILE_THREADS)(delayed(getsize)(f) for f in file_list)
        self.file_list, self.text = zip(*[(f_name, txt)
                                          for f_name, txt in sorted(zip(file_list, text), reverse=not ascending, key=lambda x:len(x[1]))]) 
示例4
def next_minibatch(self):

        image_filenames_minibatch = self.image_filenames[self.current_index: self.current_index + self.minibatch_size]
        label_filenames_minibatch = self.label_filenames[self.current_index: self.current_index + self.minibatch_size]
        self.current_index += self.minibatch_size
        if self.current_index >= self.dataset_size:
            self.current_index = 0

        # Multithread image processing
        # Reference: https://www.kaggle.com/inoryy/fast-image-pre-process-in-parallel

        results = Parallel(n_jobs=self.num_jobs)(delayed(self.process_func)(image_filename, label_filename) for image_filename, label_filename in zip(image_filenames_minibatch, label_filenames_minibatch))
        images, labels = zip(*results)

        images = np.asarray(images)
        labels = np.asarray(labels)

        return images, labels 
示例5
def wrapper_compute_average_precision(self):
        """Computes average precision for each class in the subset.
        """
        ap = np.zeros((len(self.tiou_thresholds), len(self.activity_index)))
        recall = np.zeros((len(self.tiou_thresholds), len(self.activity_index)))
        precision = np.zeros((len(self.tiou_thresholds), len(self.activity_index)))
        matched_gt_id = np.zeros((len(self.tiou_thresholds), len(self.prediction)))

        results = Parallel(n_jobs=len(self.activity_index))(
                    delayed(compute_average_precision_detection)(
                        ground_truth=self.ground_truth.loc[self.ground_truth['label'] == cidx].reset_index(drop=True),
                        prediction=self.prediction.loc[self.prediction['label'] == cidx].reset_index(drop=True),
                        tiou_thresholds=self.tiou_thresholds,
                        normalize_ap=self.normalize_ap, 
                        average_num_instance_per_class=self.average_num_instance_per_class,
                        minimum_normalized_precision_threshold_for_detection=self.minimum_normalized_precision_threshold_for_detection,
                    ) for cidx in self.activity_index.values())
        
        for i, cidx in enumerate(self.activity_index.values()):
            ap[:,cidx], matched_this_cls_gt_id, this_cls_prediction_ids, recall[:,cidx], precision[:,cidx] = results[i]
            matched_gt_id[:,this_cls_prediction_ids] = matched_this_cls_gt_id

        return ap, matched_gt_id, recall, precision 
示例6
def wrapper_analyze_fp_error_types(self):
        self.fp_error_types_legned = {'True Positive': 0,
                                      'Double Detection Err': 1,
                                      'Wrong Label Err': 2,
                                      'Localization Err': 3,
                                      'Confusion Err': 4,
                                      'Background Err': 5}

        self.fp_error_types_inverse_legned = dict([(v, k) for k, v in self.fp_error_types_legned.iteritems()])

        fp_error_types = Parallel(n_jobs=len(self.tiou_thresholds))(
                            delayed(analyze_fp_error_types)(
                                prediction=self.prediction,
                                ground_truth=self.ground_truth,
                                tiou_thr=tiou_thr,
                                matched_gt_id_col_name=matched_gt_id_col_name,
                                min_tiou_thr=self.min_tiou_thr,
                                fp_error_types_legned=self.fp_error_types_legned,
                            ) for tiou_thr, matched_gt_id_col_name in zip(self.tiou_thresholds, self.matched_gt_id_cols))
        
        return fp_error_types 
示例7
def _parallel_predict(self, contexts: np.ndarray, is_predict: bool):

        # Total number of contexts to predict
        n_contexts = len(contexts)

        # Partition contexts by job
        n_jobs, n_contexts, starts = self._partition_contexts(n_contexts)
        total_contexts = sum(n_contexts)

        # Get seed value for each context
        seeds = self.rng.randint(np.iinfo(np.int32).max, size=total_contexts)

        # Perform parallel predictions
        predictions = Parallel(n_jobs=n_jobs, backend=self.backend)(
                          delayed(self._predict_contexts)(
                              contexts[starts[i]:starts[i + 1]],
                              is_predict,
                              seeds[starts[i]:starts[i + 1]],
                              starts[i])
                          for i in range(n_jobs))

        # Reduce
        predictions = list(chain.from_iterable(t for t in predictions))

        return predictions if len(predictions) > 1 else predictions[0] 
示例8
def fit(self,X):
        def func(ss):
            length = len(ss.unique())
            if length <= 1:
                return True
            else:
                return False
            
        df = X.data
        todo_cols = X.cat_cols + X.multi_cat_cols + X.num_cols + X.time_cols + X.binary_cols
        res = Parallel(n_jobs=CONSTANT.JOBS,require='sharedmem')(delayed(func)(df[col]) for col in todo_cols)
        
        drop_cols = []
        for col,unique in zip(todo_cols,res):
            if unique:
                drop_cols.append(col)
        
        self.drop_cols = drop_cols 
示例9
def fit(self,X):
        def func(ss):
            length = len(ss.unique())
            if length >= len(ss)-10:
                return True
            else:  
                return False
        
        df = X.data
        todo_cols = X.cat_cols
        res = Parallel(n_jobs=CONSTANT.JOBS,require='sharedmem')(delayed(func)(df[col]) for col in todo_cols)
        
        drop_cols = []
        for col,all_diff in zip(todo_cols,res):
            if all_diff:
                drop_cols.append(col)
        
        self.drop_cols = drop_cols 
示例10
def recognize_binary_col(self,data,cat_cols):
        def func(ss):
            ss = ss.unique()
            if len(ss) == 3:
                if pd.isna(ss).sum() == 1:
                    return True
            if len(ss) == 2:
                return True
            return False
        
        binary_cols = []
        
        res = Parallel(n_jobs=CONSTANT.JOBS,require='sharedmem')(delayed(func)(data[col]) for col in cat_cols)
        
        for col,is_binary in zip(cat_cols,res):
            if is_binary:
                binary_cols.append(col)
        
        return binary_cols 
示例11
def simulate_walks(self, num_walks, walk_length, stay_prob=0.3, workers=1, verbose=0):

        layers_adj = pd.read_pickle(self.temp_path+'layers_adj.pkl')
        layers_alias = pd.read_pickle(self.temp_path+'layers_alias.pkl')
        layers_accept = pd.read_pickle(self.temp_path+'layers_accept.pkl')
        gamma = pd.read_pickle(self.temp_path+'gamma.pkl')
        walks = []
        initialLayer = 0

        nodes = self.idx  # list(self.g.nodes())

        results = Parallel(n_jobs=workers, verbose=verbose, )(
            delayed(self._simulate_walks)(nodes, num, walk_length, stay_prob, layers_adj, layers_accept, layers_alias, gamma) for num in
            partition_num(num_walks, workers))

        walks = list(itertools.chain(*results))
        return walks 
示例12
def prefer_parallel_execution(functions_to_be_called):  # pragma: no cover
    try:
        import joblib
        import multiprocessing
    except ImportError:
        print('Joblib not installed, switching to serial execution')
        [run_function(fn) for fn in functions_to_be_called]
    else:
        try:
            import tqdm
        except ImportError:
            inputs = functions_to_be_called
        else:
            inputs = tqdm.tqdm(functions_to_be_called)
        n_jobs = multiprocessing.cpu_count()
        print('Parallelizing execution using Joblib')
        joblib.Parallel(n_jobs=n_jobs)(
                joblib.delayed(run_function)(fn) for fn in inputs) 
示例13
def parallelize(bucket, only, _except, fn, args=(), versions=False):
    bucket = s3().Bucket(bucket)

    # use prefix for performance
    prefix = None
    if only:
        # get the first prefix before wildcard
        prefix = '/'.join(only.split('*')[0].split('/')[:-1])
        if prefix:
            prefix = prefix + '/'

    if versions:
        object_versions = bucket.object_versions.filter(Prefix=prefix) if prefix else bucket.object_versions.all()
        # delete markers have no size
        return Parallel(n_jobs=24)(delayed(fn)(bucket.name, ov.object_key, ov.id, *args) for ov in object_versions if object_matches(ov.object_key, only, _except) and not ov.is_latest and ov.size is not None)
    else:
        objects = bucket.objects.filter(Prefix=prefix) if prefix else bucket.objects.all()

        if only and not '*' in only:
            objects = [s3().Object(bucket, only)]

        return Parallel(n_jobs=24)(delayed(fn)(bucket.name, os.key, *args) for os in objects if object_matches(os.key, only, _except)) 
示例14
def recompute_factors_batched(Y, S, lambda_reg, W=None, X=None,
                              dtype='float32', batch_size=10000, n_jobs=4):
    m = S.shape[0]  # m = number of users
    f = Y.shape[1]  # f = number of factors

    YTY = np.dot(Y.T, Y)  # precompute this
    YTYpR = YTY + lambda_reg * np.eye(f)
    if W is not None:
        WX = lambda_reg * (X.dot(W)).T
    else:
        WX = None
    X_new = np.zeros((m, f), dtype=dtype)

    num_batches = int(np.ceil(m / float(batch_size)))

    res = Parallel(n_jobs=n_jobs)(delayed(solve_batch)(b, S, Y, WX, YTYpR,
                                                       batch_size, m, f, dtype)
                                  for b in xrange(num_batches))
    X_new = np.concatenate(res, axis=0)

    return X_new 
示例15
def _joblib_resample_A_given_W(self, data):
        """
        Resample A given W. This must be immediately followed by an
        update of z | A, W. This  version uses joblib to parallelize
        over columns of A.
        :return:
        """
        # Use the module trick to avoid copying globals
        import pyhawkes.internals.parallel_adjacency_resampling as par
        par.model = self.model
        par.data = data
        par.K = self.model.K

        if len(data) == 0:
            self.A = np.random.rand(self.K, self.K) < self.network.P
            return

        # We can naively parallelize over receiving neurons, k2
        # To avoid serializing and copying the data object, we
        # manually extract the required arrays Sk, Fk, etc.
        A_cols = Parallel(n_jobs=-1, backend="multiprocessing")(
            delayed(par._resample_column_of_A)(k2)for k2 in range(self.K))
        self.A = np.array(A_cols).T 
示例16
def _joblib_resample_A_given_W(self, data):
        """
        Resample A given W. This must be immediately followed by an
        update of z | A, W. This  version uses joblib to parallelize
        over columns of A.
        :return:
        """
        # Use the module trick to avoid copying globals
        import pyhawkes.internals.parallel_adjacency_resampling as par
        par.model = self.model
        par.data = data
        par.lambda_irs = [par._compute_weighted_impulses_at_events(d) for d in data]

        if len(data) == 0:
            self.A = np.random.rand(self.K, self.K) < self.network.P
            return

        # We can naively parallelize over receiving neurons, k2
        # To avoid serializing and copying the data object, we
        # manually extract the required arrays Sk, Fk, etc.
        A_cols = Parallel(n_jobs=-1, backend="multiprocessing")(
            delayed(par._ct_resample_column_of_A)(k2) for k2 in range(self.K))
        self.A = np.array(A_cols).T 
示例17
def build_strain_specific_models(self, joblib=False, cores=1, force_rerun=False):
        """Wrapper function for _build_strain_specific_model"""
        if len(self.df_orthology_matrix) == 0:
            raise RuntimeError('Empty orthology matrix, please calculate first!')
        ref_functional_genes = [g.id for g in self.reference_gempro.functional_genes]
        log.info('Building strain specific models...')
        if joblib:
            result = DictList(Parallel(n_jobs=cores)(delayed(self._build_strain_specific_model)(s, ref_functional_genes, self.df_orthology_matrix, force_rerun=force_rerun) for s in self.strain_ids))
        # if sc:
        #     strains_rdd = sc.parallelize(self.strain_ids)
        #     result = strains_rdd.map(self._build_strain_specific_model).collect()
        else:
            result = []
            for s in tqdm(self.strain_ids):
                result.append(self._build_strain_specific_model(s, ref_functional_genes, self.df_orthology_matrix, force_rerun=force_rerun))

        for strain_id, gp_noseqs_path in result:
            self.strain_infodict[strain_id]['gp_noseqs_path'] = gp_noseqs_path 
示例18
def transform(self, waveforms):
        #~ print('ici', waveforms.shape, self.ind_peak)
        features = waveforms[:, self.ind_peak, : ].copy()
        return features



#~ Parallel(n_jobs=n_jobs)(delayed(count_match_spikes)(sorting1.get_unit_spike_train(u1),
                                                                                  #~ s2_spiketrains, delta_frames) for
                                                      #~ i1, u1 in enumerate(unit1_ids))

#~ def get_pca_one_channel(wf_chan, chan, thresh, n_left, n_components_by_channel, params):
    #~ print(chan)
    #~ pca = sklearn.decomposition.IncrementalPCA(n_components=n_components_by_channel, **params)
    #~ wf_chan = waveforms[:,:,chan]
    #~ print(wf_chan.shape)
    #~ print(wf_chan[:, -n_left].shape)
    #~ keep = np.any((wf_chan>thresh) | (wf_chan<-thresh))
    #~ keep = (wf_chan[:, -n_left]>thresh) | (wf_chan[:, -n_left]<-thresh)

    #~ if keep.sum() >=n_components_by_channel:
        #~ pca.fit(wf_chan[keep, :])
        #~ return pca
    #~ else:
        #~ return None 
示例19
def write_data_csv(fname, frames, preproc):
   """Write data to csv file"""
   fdata = open(fname, "w")
   dr = Parallel()(delayed(get_data)(lst,preproc) for lst in frames)
   data,result = zip(*dr)
   for entry in data:
      fdata.write(','.join(entry)+'\r\n')
   print("All finished, %d slices in total" % len(data))
   fdata.close()
   result = np.ravel(result)
   return result 
示例20
def _build_save_container(platform, registry, load_cache) -> Optional[str]:
    """
    Build image for passed platform and upload the cache to the specified S3 bucket
    :param platform: Platform
    :param registry: Docker registry name
    :param load_cache: Load cache before building
    :return: Platform if failed, None otherwise
    """
    docker_tag = build_util.get_docker_tag(platform=platform, registry=registry)

    # Preload cache
    if load_cache:
        load_docker_cache(registry=registry, docker_tag=docker_tag)

    # Start building
    logging.debug('Building %s as %s', platform, docker_tag)
    try:
        # Increase the number of retries for building the cache.
        image_id = build_util.build_docker(docker_binary='docker', platform=platform, registry=registry, num_retries=10, use_cache=True)
        logging.info('Built %s as %s', docker_tag, image_id)

        # Push cache to registry
        _upload_image(registry=registry, docker_tag=docker_tag, image_id=image_id)
        return None
    except Exception:
        logging.exception('Unexpected exception during build of %s', docker_tag)
        return platform
        # Error handling is done by returning the errorous platform name. This is necessary due to
        # Parallel being unable to handle exceptions 
示例21
def exec(self):
        ''' Testing End-to-end ASR system '''
        for s, ds in zip(['dev', 'test'], [self.dv_set, self.tt_set]):
            # Setup output
            self.cur_output_path = self.output_file.format(s, 'output')
            with open(self.cur_output_path, 'w') as f:
                f.write('idx\thyp\ttruth\n')

            if self.greedy:
                # Greedy decode
                self.verbose(
                    'Performing batch-wise greedy decoding on {} set, num of batch = {}.'.format(s, len(ds)))
                self.verbose('Results will be stored at {}'.format(
                    self.cur_output_path))
            else:
                # Additional output to store all beams
                self.cur_beam_path = self.output_file.format(s, 'beam')
                with open(self.cur_beam_path, 'w') as f:
                    f.write('idx\tbeam\thyp\ttruth\n')
                self.verbose(
                    'Performing instance-wise beam decoding on {} set. (NOTE: use --njobs to speedup)'.format(s))
                # Minimal function to pickle
                beam_decode_func = partial(beam_decode, model=copy.deepcopy(
                    self.decoder), device=self.device)
                # Parallel beam decode
                results = Parallel(n_jobs=self.paras.njobs)(
                    delayed(beam_decode_func)(data) for data in tqdm(ds))
                self.verbose(
                    'Results/Beams will be stored at {} / {}.'.format(self.cur_output_path, self.cur_beam_path))
                self.write_hyp(results, self.cur_output_path,
                               self.cur_beam_path)
        self.verbose('All done !') 
示例22
def __init__(self, path, split, tokenizer, bucket_size):
        # Setup
        self.path = path
        self.bucket_size = bucket_size
        self.encode_on_fly = False
        read_txt_src = []

        # List all wave files
        file_list, all_sent = [], []

        for s in split:
            if s in OFFICIAL_TXT_SRC:
                self.encode_on_fly = True
                with open(join(path, s), 'r') as f:
                    all_sent += f.readlines()
            file_list += list(Path(join(path, s)).rglob("*.flac"))
        assert (len(file_list) > 0) or (len(all_sent)
                                        > 0), "No data found @ {}".format(path)

        # Read text
        text = Parallel(n_jobs=READ_FILE_THREADS)(
            delayed(read_text)(str(f)) for f in file_list)
        all_sent.extend(text)
        del text

        # Encode text
        if self.encode_on_fly:
            self.tokenizer = tokenizer
            self.text = all_sent
        else:
            self.text = [tokenizer.encode(txt) for txt in tqdm(all_sent)]
        del all_sent

        # Read file size and sort dataset by file size (Note: feature len. may be different)
        self.text = sorted(self.text, reverse=True, key=lambda x: len(x))
        if self.encode_on_fly:
            del self.text[:REMOVE_TOP_N_TXT] 
示例23
def _compute_parallel(self, pairs, x, x_link=None, n_jobs=1):

        df_chunks = index_split(pairs, n_jobs)
        result_chunks = Parallel(n_jobs=n_jobs)(
            delayed(_parallel_compare_helper)(self, chunk, x, x_link)
            for chunk in df_chunks
        )

        result = pandas.concat(result_chunks)
        return result 
示例24
def test_fnn(func):
    parameters = [(6, 4, 0.25, 0.25, 0.25, 0.001, 1.0)]
    results = Parallel(n_jobs=n_jobs)(delayed(test_helper)(func, exp, layer, lr, 
                       dof, dom, dol, alpha)
                              for exp, layer, dof, dom, dol, lr, alpha in parameters)

    results = sorted(results, key = lambda x: x[0], reverse=True)
    for result in results:
        print(result) 
示例25
def test_cnn(func):
    # CNN parameters
    parameters = [(0.2, 0.001, 6, 5, 6, 3, 3)]

    results = Parallel(n_jobs=n_jobs)(delayed(test_helper_cnn)(func, dropout,
                       lr, exponent, exp_filter_ir, exp_filter_swo, conv_ir,
                       conv_swo)
      for dropout,lr,exponent,exp_filter_ir,exp_filter_swo,conv_ir,conv_swo in parameters)
    results = sorted(results, key=lambda x: x[0], reverse=True)
    for result in results:
        print(result) 
示例26
def walk_graph(csr_matrix, labels, walk_length=40, num_walks=1, n_jobs=1):
    """Perform random walks on adjacency matrix.

    Args:
        csr_matrix: adjacency matrix.
        labels: list of node labels where index align with CSR matrix
        walk_length: maximum length of random walk (default=40)
        num_walks: number of walks to do for each node
        n_jobs: number of cores to use (default=1)

    Returns:
        np.ndarray: list of random walks
    """
    normalized = normalize_csr_matrix(csr_matrix)

    results = (Parallel(n_jobs=n_jobs, max_nbytes=None)
               (delayed(walk_random, has_shareable_memory)
                (normalized, labels, walk_length)
                for _ in range(num_walks)))

    walks, freqs = zip(*results)

    random_walks = np.concatenate(walks)
    word_freqs = np.sum(freqs, axis=0)

    return random_walks, dict(zip(labels, word_freqs)) 
示例27
def benchmark(sizes=SIZES, stars=STARS, noises=NOISES,
              comb_number=10, seed=None, repeats=REPEATS, n_jobs=-1):

    grid = get_parameters(
        sizes=sizes, stars=stars, noises=noises,
        comb_number=comb_number, seed=seed, repeats=repeats)

    with joblib.Parallel(n_jobs=n_jobs) as parallel:
        results = parallel(
            joblib.delayed(_test)(**params) for params in tqdm.tqdm(grid))

    df = pd.DataFrame(results)
    return df 
示例28
def benchmark(min_size=min(SIZES), max_size=max(SIZES), step_size=STEP,
              stars=STARS, noise=NOISE, seed=None, repeats=REPEATS,
              n_jobs=-1, comb_number=COMB_NUMBER):

    grid = get_parameters(
        min_size=min_size, max_size=max_size, step_size=step_size,
        repeats=repeats, stars=stars, noise=noise, seed=seed,
        comb_number=comb_number)

    with joblib.Parallel(n_jobs=n_jobs) as parallel:
        results = parallel(
            joblib.delayed(_test)(**params) for params in tqdm.tqdm(grid))

    df = pd.DataFrame(results)
    return df 
示例29
def branches(self):
        """
        Returns a data frame of all branches in origin.  The DataFrame will have the columns:

         * repository
         * local
         * branch

        :returns: DataFrame
        """

        df = pd.DataFrame(columns=['repository', 'local', 'branch'])

        if _has_joblib:
            ds = Parallel(n_jobs=-1, backend='threading', verbose=0)(
                delayed(_branches_func)
                (x) for x in self.repos
            )
            for d in ds:
                df = df.append(d)
        else:
            for repo in self.repos:
                try:
                    df = df.append(_branches_func(repo))
                except GitCommandError:
                    print('Warning! Repo: %s couldn\'t be inspected' % (repo, ))

        df.reset_index()

        return df 
示例30
def tags(self):
        """
        Returns a data frame of all tags in origin.  The DataFrame will have the columns:

         * repository
         * tag

        :returns: DataFrame
        """

        df = pd.DataFrame(columns=['repository', 'tag'])

        if _has_joblib:
            ds = Parallel(n_jobs=-1, backend='threading', verbose=0)(
                delayed(_tags_func)
                (x) for x in self.repos
            )
            for d in ds:
                df = df.append(d)
        else:
            for repo in self.repos:
                try:
                    df = df.append(repo.tags())
                except GitCommandError:
                    print('Warning! Repo: %s couldn\'t be inspected' % (repo, ))

        df.reset_index()

        return df