[docs]defequal_division(num_groups,data_x,data_y=None):"""Partition data into multiple clients with equal quantity. Args: num_groups (int): THe number of groups to partition to. data_x (list[Object]): A list of elements to be divided. data_y (list[Object], optional): A list of data labels to be divided together with the data. Returns: list[list]: A list where each element is a list of data of a group/client. list[list]: A list where each element is a list of data label of a group/client. Example: >>> equal_division(3, list[range(9)]) >>> ([[0,4,2],[3,1,7],[6,5,8]], []) """ifdata_yisnotNone:assert(len(data_x)==len(data_y))data_x,data_y=shuffle(data_x,data_y)else:np.random.shuffle(data_x)num_of_data=len(data_x)assertnum_of_data>0data_per_client=num_of_data//num_groupslarge_group_num=num_of_data-num_groups*data_per_clientsmall_group_num=num_groups-large_group_numsplitted_data_x=[]splitted_data_y=[]foriinrange(small_group_num):base_index=data_per_client*isplitted_data_x.append(data_x[base_index:base_index+data_per_client])ifdata_yisnotNone:splitted_data_y.append(data_y[base_index:base_index+data_per_client])small_size=data_per_client*small_group_numdata_per_client+=1foriinrange(large_group_num):base_index=small_size+data_per_client*isplitted_data_x.append(data_x[base_index:base_index+data_per_client])ifdata_yisnotNone:splitted_data_y.append(data_y[base_index:base_index+data_per_client])returnsplitted_data_x,splitted_data_y
[docs]defquantity_hetero(weights,data_x,data_y=None):"""Partition data into multiple clients with different quantities. The number of groups is the same as the number of elements of `weights`. The quantity of each group depends on the values of `weights`. Args: weights (list[float]): The targeted distribution of data quantities. The values should sum up to 1. e.g., [0.1, 0.2, 0.7]. data_x (list[Object]): A list of elements to be divided. data_y (list[Object], optional): A list of data labels to be divided together with the data. Returns: list[list]: A list where each element is a list of data of a group/client. list[list]: A list where each element is a list of data label of a group/client. Example: >>> quantity_hetero([0.1, 0.2, 0.7], list(range(0, 10))) >>> ([[4], [8, 9], [6, 0, 1, 7, 3, 2, 5]], []) """# This is due to the float number in python,# e.g.sum([0.1,0.2,0.4,0.2,0.1]) is not exactly 1, but 1.0000000000000002.assert(round(sum(weights),3)==1)ifdata_yisnotNone:assert(len(data_x)==len(data_y))data_x,data_y=shuffle(data_x,data_y)else:np.random.shuffle(data_x)data_size=len(data_x)i=0splitted_data_x=[]splitted_data_y=[]forwinweights:size=math.floor(data_size*w)splitted_data_x.append(data_x[i:i+size])ifdata_yisnotNone:splitted_data_y.append(data_y[i:i+size])i+=sizeparts=len(weights)ifi<data_size:remain=data_size-iforiinrange(-remain,0,1):splitted_data_x[(-i)%parts].append(data_x[i])ifdata_yisnotNone:splitted_data_y[(-i)%parts].append(data_y[i])returnsplitted_data_x,splitted_data_y
[docs]defiid(data_x,data_y,num_of_clients,x_dtype,y_dtype,seed=0):"""Partition dataset into multiple clients with equal data quantity (difference is less than 1) randomly. Args: data_x (list[Object]): A list of data. data_y (list[Object]): A list of dataset labels. num_of_clients (int): The number of clients to partition to. x_dtype (numpy.dtype): The type of data. y_dtype (numpy.dtype): The type of data label. seed (int): random seed for data split. Returns: list[str]: A list of client ids. dict: The partitioned data, key is client id, value is the client data. e.g., {'client_1': {'x': [data_x], 'y': [data_y]}}. """np.random.seed(seed)data_x,data_y=shuffle(data_x,data_y)x_divided_list,y_divided_list=equal_division(num_of_clients,data_x,data_y)clients=[]federated_data={}foriinrange(num_of_clients):client_id="f%07.0f"%(i)temp_client={}temp_client['x']=np.array(x_divided_list[i]).astype(x_dtype)temp_client['y']=np.array(y_divided_list[i]).astype(y_dtype)federated_data[client_id]=temp_clientclients.append(client_id)returnclients,federated_data
[docs]defnon_iid_dirichlet(data_x,data_y,num_of_clients,alpha,min_size,x_dtype,y_dtype,seed=0):"""Partition dataset into multiple clients following the Dirichlet process. Args: data_x (list[Object]): A list of data. data_y (list[Object]): A list of dataset labels. num_of_clients (int): The number of clients to partition to. alpha (float): The parameter for Dirichlet process simulation. min_size (int): The minimum number of data size of a client. x_dtype (numpy.dtype): The type of data. y_dtype (numpy.dtype): The type of data label. seed (int): random seed for data split. Returns: list[str]: A list of client ids. dict: The partitioned data, key is client id, value is the client data. e.g., {'client_1': {'x': [data_x], 'y': [data_y]}}. """data_x,data_y=shuffle(data_x,data_y)n_train=data_x.shape[0]current_min_size=0num_class=np.amax(data_y)+1data_size=data_y.shape[0]net_dataidx_map={}np.random.seed(seed)whilecurrent_min_size<min_size:idx_batch=[[]for_inrange(num_of_clients)]forkinrange(num_class):idx_k=np.where(data_y==k)[0]proportions=np.random.dirichlet(np.repeat(alpha,num_of_clients))# using the proportions from dirichlet, only select those clients having data amount less than averageproportions=np.array([p*(len(idx_j)<data_size/num_of_clients)forp,idx_jinzip(proportions,idx_batch)])# scale proportionsproportions=proportions/proportions.sum()proportions=(np.cumsum(proportions)*len(idx_k)).astype(int)[:-1]idx_batch=[idx_j+idx.tolist()foridx_j,idxinzip(idx_batch,np.split(idx_k,proportions))]current_min_size=min([len(idx_j)foridx_jinidx_batch])federated_data={}clients=[]forjinrange(num_of_clients):np.random.shuffle(idx_batch[j])client_id="f%07.0f"%jclients.append(client_id)temp={}temp['x']=np.array(data_x[idx_batch[j]]).astype(x_dtype)temp['y']=np.array(data_y[idx_batch[j]]).astype(y_dtype)federated_data[client_id]=tempnet_dataidx_map[client_id]=idx_batch[j]print_data_distribution(data_y,net_dataidx_map)returnclients,federated_data
[docs]defnon_iid_class(data_x,data_y,class_per_client,num_of_clients,x_dtype,y_dtype,stack_x=True,seed=0):"""Partition dataset into multiple clients based on label classes. Each client contains [1, n] classes, where n is the number of classes of a dataset. Note: Each class is divided into `ceil(class_per_client * num_of_clients / num_class)` parts and each client chooses `class_per_client` parts from each class to construct its dataset. Args: data_x (list[Object]): A list of data. data_y (list[Object]): A list of dataset labels. class_per_client (int): The number of classes in each client. num_of_clients (int): The number of clients to partition to. x_dtype (numpy.dtype): The type of data. y_dtype (numpy.dtype): The type of data label. stack_x (bool, optional): A flag to indicate whether using np.vstack or append to construct dataset. seed (int): random seed for data split. Returns: list[str]: A list of client ids. dict: The partitioned data, key is client id, value is the client data. e.g., {'client_1': {'x': [data_x], 'y': [data_y]}}. """num_class=np.amax(data_y)+1all_index=[]clients=[]data_index_map={}foriinrange(num_class):# get indexes for all data with current label i at index i in all_indexall_index.append(np.where(data_y==i)[0].tolist())federated_data={}# total no. of partstotal_amount=class_per_client*num_of_clients# no. of parts each class should be diveded intoparts_per_class=math.ceil(total_amount/num_class)foriinrange(num_of_clients):client_id="f%07.0f"%(i)clients.append(client_id)data_index_map[client_id]=[]data={}data['x']=np.array([])data['y']=np.array([])federated_data[client_id]=dataclass_map={}parts_consumed=[]np.random.seed(seed)foriinrange(num_class):class_map[i],_=equal_division(parts_per_class,all_index[i])heapq.heappush(parts_consumed,(0,i))foriinclients:forjinrange(class_per_client):class_chosen=heapq.heappop(parts_consumed)part_indexes=class_map[class_chosen[1]].pop(0)iflen(federated_data[i]['x'])!=0:ifstack_x:federated_data[i]['x']=np.vstack((federated_data[i]['x'],data_x[part_indexes])).astype(x_dtype)else:federated_data[i]['x']=np.append(federated_data[i]['x'],data_x[part_indexes]).astype(x_dtype)federated_data[i]['y']=np.append(federated_data[i]['y'],data_y[part_indexes]).astype(y_dtype)else:federated_data[i]['x']=data_x[part_indexes].astype(x_dtype)federated_data[i]['y']=data_y[part_indexes].astype(y_dtype)heapq.heappush(parts_consumed,(class_chosen[0]+1,class_chosen[1]))data_index_map[i].extend(part_indexes)print_data_distribution(data_y,data_index_map)returnclients,federated_data
[docs]defdata_simulation(data_x,data_y,num_of_clients,data_distribution,weights=None,alpha=0.5,min_size=10,class_per_client=1,stack_x=True,seed=0):"""Simulate federated learning datasets by partitioning a data into multiple clients using different strategies. Args: data_x (list[Object]): A list of data. data_y (list[Object]): A list of dataset labels. num_of_clients (int): The number of clients to partition to. data_distribution (str): The ways to partition the dataset, options: `iid`: Partition dataset into multiple clients with equal quantity (difference is less than 1) randomly. `dir`: partition dataset into multiple clients following the Dirichlet process. `class`: partition dataset into multiple clients based on classes. weights: list, for simulating data quantity heterogeneity If None, each client are simulated with same data quantity Note: num_of_clients should be divisible by len(weights) weights (list[float], optional): The targeted distribution of data quantities. The values should sum up to 1. e.g., [0.1, 0.2, 0.7]. When `weights=None`, the data quantity of clients only depends on data_distribution. alpha (float, optional): The parameter for Dirichlet process simulation. It is only applicable when data_distribution is `dir`. min_size (int, optional): The minimum number of data size of a client. It is only applicable when data_distribution is `dir`. class_per_client (int): The number of classes in each client. It is only applicable when data_distribution is `class`. stack_x (bool, optional): A flag to indicate whether using np.vstack or append to construct dataset. It is only applicable when data_distribution is `class`. seed (int): random seed for data split. Raise: ValueError: When the simulation method `data_distribution` is not supported. Returns: list[str]: A list of client ids. dict: The partitioned data, key is client id, value is the client data. e.g., {'client_1': {'x': [data_x], 'y': [data_y]}}. """data_x=np.array(data_x)data_y=np.array(data_y)x_dtype=data_x.dtypey_dtype=data_y.dtypeifweightsisnotNone:assertnum_of_clients%len(weights)==0num_of_clients=num_of_clients//len(weights)ifdata_distribution==SIMULATE_IID:group_client_list,group_federated_data=iid(data_x,data_y,num_of_clients,x_dtype,y_dtype,seed)elifdata_distribution==SIMULATE_NIID_DIR:group_client_list,group_federated_data=non_iid_dirichlet(data_x,data_y,num_of_clients,alpha,min_size,x_dtype,y_dtype,seed)elifdata_distribution==SIMULATE_NIID_CLASS:group_client_list,group_federated_data=non_iid_class(data_x,data_y,class_per_client,num_of_clients,x_dtype,y_dtype,stack_x=stack_x,seed=seed)else:raiseValueError("Simulation type not supported")ifweightsisNone:returngroup_client_list,group_federated_dataclients=[]federated_data={}cur_key=0foriingroup_client_list:current_client=group_federated_data[i]input_lists,label_lists=quantity_hetero(weights,current_client['x'],current_client['y'])forjinrange(len(input_lists)):client_id="f%07.0f"%(cur_key)temp_client={}temp_client['x']=np.array(input_lists[j]).astype(x_dtype)temp_client['y']=np.array(label_lists[j]).astype(y_dtype)federated_data[client_id]=temp_clientclients.append(client_id)cur_key+=1returnclients,federated_data
defprint_data_distribution(data_y,data_index_map):"""Log the distribution of client datasets."""data_distribution={}forindex,dataidxindata_index_map.items():unique_values,counts=np.unique(data_y[dataidx],return_counts=True)distribution={unique_values[i]:counts[i]foriinrange(len(unique_values))}data_distribution[index]=distributionlogger.info(data_distribution)returndata_distributiondefsplit_labeled_and_unlabeled(data_x,data_y,semi_scenario,num_labels_per_class,num_of_clients):ifsemi_scenario=='label_in_server':num_s=num_labels_per_classifsemi_scenario=='label_in_client':num_s=num_labels_per_class*num_of_clientslabels=np.unique(data_y)data_y=np.array(data_y)data_by_label={}forlabelinlabels:idx=np.where(data_y[:]==label)[0]data_by_label[label]={'x':data_x[idx],'y':data_y[idx]}s_by_label,u_by_label={},{}forlabel,dataindata_by_label.items():iflen(s_by_label)==0orlen(u_by_label)==0:s_by_label={'x':data['x'][:num_s],'y':data['y'][:num_s]}u_by_label={'x':data['x'][num_s:],'y':data['y'][num_s:]}else:s_by_label={'x':np.vstack((s_by_label['x'],data['x'][:num_s])),'y':np.hstack((s_by_label['y'],data['y'][:num_s]))}u_by_label={'x':np.vstack((u_by_label['x'],data['x'][num_s:])),'y':np.hstack((u_by_label['y'],data['y'][num_s:]))}returns_by_label,u_by_label