## Facebook Interview Question

Software Engineer / Developers**Country:**United States

what if the median is not on the master machine? I think this algorithm doesn't work for that case.

@szilard, though the chances of that happening are less, but that is a valid point. I missed that one out.

If the median is not on the master server, then we will detect that when the working set becomes empty on the master server, in which case we will have to reassign the master (a server with an IP addr just greater than that) and start over again.

However, if we could possibly find which one server has the median, we could get over this road block, but as in any other distributed system I think it would be as difficult as finding the median itself.

Here's a solution I came up with that has a complexity of O(X log X + X * N) where X is the number of servers (10,000) and N is the size of the array (1 billion). It is not ideal, so if anybody has some improvement over this, I would be thankful.

The main idea is that if we have a list of numbers and we remove M numbers that are less than the median and M numbers that are greater than the median, the median of the new list will remain the same.

Step 1. We compute the medians of each of the server using the classical selection algorithm.

Step 2. We find the minimum of the medians A and the maximum of the medians B.

Step 3. We know for sure that the median of all the numbers will be in the interval [A, B]. We can then eliminate all the numbers < A in the server that contains A (there are N / 2 numbers with this property), and all the numbers > B in the server that contains B (there are N / 2 numbers with this property) . We eliminated exactly N numbers.

Step 4. We can now merge the 2 lists, find the new median and go back to step 2.

We will repeat steps 2-4 until we eliminate all the numbers. We can imagine doing some optimizations for Step 2, by placing all the medians in a heap. So the access to the heap will be O(log X) instead of O(X) for finding the minimum and maximum of all the medians. But, for each step out of X steps we have to merge and find the median of a list containing N numbers, that gives us the O(X * N) complexity, which should be improved.

Probably you forgot to consider the time to sort the array in order to take selection algorithm

I came up with another solution based on the similar idea to the above.

Let's assume that each server provides the following operation.

* get_median(lb, ub) for which the server returns the median among values [lb, ub], not among all values which it has. If there are no values in the range, return NoValues exception

1. initializes lb = min of possible values and ub = max of possible values such as Long.MIN or Long.MAX

2. requests every server get_median(lb, ub) and ignore NoValues exception and gather response -> { mi | i = 0 ... 9999 }

3. new_lb = min(mi), new_ub = max(mi)

4. if new_lb == new_ub, then new_lb is median

or if new_lb == lb and new_ub == ub, then (new_lb + new_ub) * 0.5 is median.

otherwise, lb = new_lb, ub = new_ub, repeat 2 ~ 4

I think probabilistically it will lose over billion numbers(N) at each iteration. # of messages would be bounded by O(X^2) where X is number of servers if we ignore processing time inside each server which is reasonable because we are talking about performance of whole distributed system, not a single machine. In worst case, at each iteration it will lose N and at each iteration it needs X response messages and X iterations are required to lose all numbers.

If each server maintains the sorted list, get_median operation can be done in O(log N) time on it because upper_bound and lower_bound algorithm can run in O(log N) and if indices of upper_bound and lower_bound are found, then it's trivial to find the median in the range.

I guess the main issue is not a single server can store such many integers.

So my solution is based on that assumption and requires communications between servers.

1. A server selects a mediun (say m). Then it asks each of other servers how many bigger integers each server has. If the summed number of bigger integers are just half of the total number of integers , then m is the median.

2. If not, if number of bigger integers > half of total integers, then the selected median is too small. Find the server whose number of bigger integers is largest. Use its median to try again. Keep trying until the selected median is too big.

3. Then use Newton interpolation method to narrow down and finally find the median.

Do it the MapReduce way. 1) On each server, calculate the count of each distinct integer using a hash map, in parallel. 2) Apply merge sort on the counts across all servers into one server. The reduced hash map has all distinct integers as keys and their combined counts across all servers as values. The reduced hash map is sorted by keys into a tree map. In worse case, the tree map has 10000 billion keys and each key can have a combined count of 10000 billion, so TreeMap<Long, Long> should be used as data structure and the map can be hosted entirely in memory. 3) Scan the tree map to find the total count of integers, maximum is 10000 billion so it's still within the range of Long. There will be no arithmetic overflow. 4). Scan the tree map again until we reach half of the total count. The key of the bucket that makes up half of the total count gives the median.

3) and 4) can be done with a little optimization. When we drop an entry/bucket from the beginning of the tree map, we drop one or more buckets from the end of the tree map that make up the same count as the head entry. If we cannot make the head count and the tail count even, we keep the difference in the tail. Say, our map is (1->10), (2->5), (3->7), 4(->9), one step can reduce it to (by dropping bucket 1 and 4): (2->5), (3->6) and further to (3->1). Our median in this case is 3. This way we only need to scan the tree map once and we don't have to worry about overflow.

I have posted N log N solution for this problem above but another simple solution of O (N^2) is as follows:

Step1: Consider first 2 servers with billion integers each.

Use merge procedure of merge sort. Keep track of count while comparing elements of two arrays. If count becomes billion (For 2 billion elements on 2 servers together), we have reached the median.

Take the average of the element at billion th position and billion -1 position to get the exact median value.

Step2: Now take billion elements from machine 3, Again we have 1 billion elements currently with us from machine 1 and 2 together and we have 1 billion elements from machine 3.

So we follow step1 again to merge the array and stop at billion th location to get the median.

Step3: Repeat the procedure for 10K servers.

Total time taken:

Step3 times Step1: O(N^2)

I forgot to mention a sorting step in this solution.

But the better solution still O(N^2) is as follows:

Step1: Sort billion numbers individually on each server .Since its a fixed sized 32 bit long integers, we can use LSD radix sort to sort in O(N) time.

Step2: Instead of considering only first 2 machines as given above, consider all the machines together.

Use merge procedure of merge sort. Keep track of count while comparing elements of 10K arrays. If count becomes 1000 * billion / 2 (This is half of 10K * billion numbers each), we have reached the median.

Now since this number is huge, we dont need to store all those elements on single machine, keep storing elements in new array and break the array as soon as the memory is full.

Now take the average of the element at 1000 * billion /2 th position and 1000 * billion / 2 -1 position to get the exact median value.

The time complexity is still O(N^2)

That makes sense to me, but frankly I'm interested in hearing your O(NlgN) solution now.

The solution by Saurabh seems appropriate except a minor improvement of using quick sort to sort each of the server because , the number of integers is so huge and merge sort uses extra space whereas quick sort can do the sorting in place with almost equal efficiency. When finding the medians of these 10,000 servers we need to have 10000 variables pointing to each sorted array and we need to keep the count , also we need to identify and mark which server has what count.

This is a distributed computing problem and you have to pay attention to how much data you transfer over the network (someone called it communication complexity).

The solutions given are shit (except the median of medians, which gives an approximate median, but does not answer the question correctly and is written in a shit way, showing a lack of understanding of the problem).

Very easy to modify quick select. Each server should have two queries: (1) given integer, give its sorted postion.(2) provide integer at sorted position.

select initial pivot is easy. query (1) rest of servers and recalculate new pivot.

Reply above mine sounds correct, but I think it needs a little more verbosity. Here is what I think it meant:

1. Sort each server.

2. Call (2) to get the middle element of each server.

3. The needed number is between min and max of the elements returned at step 2. So do a binary search between min and max. For each number you test call (1) on each server to get how many numbers lower, equal and higher than the test are on the server. Calculate 3 sums: number lower, number equal and number higher.

4. If nLower + nEqual >= nHigher && nLower <= nEqual + nHigher, then we are done and this is the median.

5. If nLower + nEqual < nHigher, then we should try a higher number.

6. Else we should try a lower number.

Let N be the number of integers per server (billion), and M is the number of servers (10,000). Then complexity of sorting is NlogN, complexity of binary search is logM*logN. So total complexity is NlogN.

This is the typical sort-merge technique which is used the map-reduce paradigms - It also uses the external sorting technique.

- Sort the individual set of integers within each of the server (using standard sorting techniques) - This can happen parallely in each of the N servers.

- Once the we have sorted buckets of data in each server, the shuffle stage (or essentially the sort-merge stage), brings in data from all the N servers by doing a multi-way merge sort of these sorted buckets.

- In the final reducer (or machine or on disk) the full sorted list is available. Assume we have also calculated the length of this list during the map-reduce while doing sorting, each mapper throws the length of its individual list. The reducer then merges these individual counts and forms the total list size.

- After this we can calculate the median by the following simple rule. Lets say list size is 'X'.

- If X is odd, median is list[(X+1)/2]

- If X is even, median is (list[X/2] + list[(X/2)+1])/2

Please refer to some map-reduce techniques or external sorting techniques which pretty much forms the basis of most distributed computing.

Same idea as mine. Yet we don't have to sort the integers, just the histogram of the integers.

Sabz`s suggestion is great, so I think we can first of all, use the map/reduce technology to create a histogram for each integer. So we will have something like "1:10000(count), 14:2344, 10:234, 5:898...", then we sort all the histograms by the key. After this, we just need to find out which area does the median sits.

M is 10K servers, N is 1 billion integers each server has. The complexity to find the median is O(N*M*log(M)/2) = O(N*M*log(M)). The memory usage is O(M).

1. Sort each server takes O(Nlog(N))

2. Use an array A (size M) to store the first elements from each server.

3.

```
count = 1
ret = 0
while(count<N*M/2){
3.1 find the minimum element m at index from the array A takes O(log M)
3.2 ret = m
3.2 A[index] = getNextElement(index) from the index_th server takes O(1)
//getNextElement(index) return the next element from the index_th server, if
//if it exceeds the max size of the index_th server, then return INF value
3.3 increase count by 1
}
```

4. ret is result we want

M is 10K servers, N is 1 billion integers each server has. The complexity to find the median is O(N*M*log(M)/2) = O(N*M*log(M)). The memory usage is O(M).

1. Sort each server takes O(Nlog(N))

2. Use an array A (size M) to store the first elements from each server.

3.

```
count = 1
ret = 0
while(count<N*M/2){
3.1 find the minimum element m at index from the array A takes O(log M)
3.2 ret = m
3.3 A[index] = getNextElement(index) from the index_th server takes O(1)
//getNextElement(index) return the next element from the index_th server, if
//if it exceeds the max size of the index_th server, then return INF value
3.4 increase count by 1
}
3.5 ret is result we want
```

You can sum numbers locally then we have 2 options:

1 - correctness should be as high as possible. In this case we send full sum to some aggregate server which can sum all sums and then divide it to 10k* 1billion

2 - we can divide sum locally and send meaning then on aggregate server when we just power it by 1 bill sum with other sums and divide to 10k* 1billion

From example above:

Machine 1: 2, 3, 4, 5, 6. Sum is 20

Machine 2: 6, 6, 6, 6, 6. Sum is 30

Then Machine 1 receives 30 and 50 / 10 = 5

Here sum is a map operation and sum of sums is reduce operation

You probably can get number type overflow, but create an custom type to store value in few buckets is efficient enough and transfer between node will not take a long time.

Since we have integers, we could create an array counts[MAXINT]. counts[i] equals the number of times integer value i present in the whole system. So each sever could pass it's total array to processing server holding the array. Processing server code is

`counts[i]++;`

Processing can be easily paralleled since counts arrays may be aggregated.

Find the median is as easy as sum up counts until total reaches half of the total number of integers in the whole system.

Correct me, if it is not O(n) time.

1- Find median in each server

2- Find median of the medians which ends up with two numbers since count is even

3- Find median of numbers within range of the above two number in each server

4- Repeat step 2 and 3 until each sever has no more than two numbers within the range, the two numbers are the final median.

(1) create an empty file (File A) of 4GB in each server.

(2) each server reads the billion numbers and update the occurrence count in File A.

(3) finally, this File A in 10000 servers is merged in one server.

(4) in this merged file, assume number at 2GB offset is median, sum up leftside and rightside separately. by comparing left and right, move the median left or right, correspondingly adjusting the median.

```
1. Sort the 1 billion Integers individually .
2. We know the total length i e 10,000 * 1 billion
3. Half will be the median
4. Use Min Heap of 10,000 size.
5. In every Iteration Pop the min element and insert new element in the min heap .The new element has to be from the same server from where the element was popped.
```

6. keep count to track the index .Once index == totalsize/2 return median .

If the each integer is 32 bits, here i O(n) solution:

Create an array of type long (64 bits) to hold all possible numbers from âˆ’2,147,483,648 to 2,147,483,647.

A[num] = #count of num.

Example: 1,4,4,5,1,2 --> A[1]=2, A[2]=1, A[4]=2, A[5]=1

Memory requirement is 4GB * 8 = 32 GB. Modern server should have this kind of memory.

Step 1: Loop through all numbers on all servers to count the occurrence of each number.

This would take O(n).

Step 2: We know there are 10 trillion numbers. From the array, just find where the 5 trillion occurs. This would take 4 billion operations which is much smaller than n = 10 trillions.

```
long curCount = 0;
int median = 0;
for (int num=-2,147,483,648; I<= 2,147,483,647; i++) {
If (A[num] != 0) {
curCount += A[num];
if (curCount >= 5000000000000) {
//This is the median number
median = num;
break;
}
}
}
```

If the each integer is 32 bits, here i O(n) solution:

Create an array of type long (64 bits) to hold all possible numbers from âˆ’2,147,483,648 to 2,147,483,647.

A[num] = #count of num.

Example: 1,4,4,5,1,2 --> A[1]=2, A[2]=1, A[4]=2, A[5]=1

Memory requirement is 4GB * 8 = 32 GB. Modern server should have this kind of memory.

Step 1: Loop through all numbers on all servers to count the occurrence of each number.

This would take O(n).

Step 2: We know there are 10 trillion numbers. From the array, just find where the 5 trillion occurs. This would take 4 billion operations which is much smaller than n = 10 trillions.

```
long curCount = 0;
int median = 0;
for (int num=-2,147,483,648; num<= 2,147,483,647; num++) {
If (A[num] != 0) {
curCount += A[num];
if (curCount >= 5000000000000) {
//This is the median number
median = num;
break;
}
}
}
```

I think i found better solution. N - servers count, M - array length, L - max number. Total complexity is O(log L * N * log M) with O (N log M) data transfer.

So logic is following:

1.Select master server

2.Sort array on each server O(M Log M)

3.Calculate median on each server and send them to master server

4. Master server take min and max from received data O(N)

5. Calculate mid = (max + min) / 2

6. Ask each server how many elements less then mid O(N * log M) (each server find out count using binary search)

7. Sum result and if sum < m * n / 2 then min = mid else mid = max goto step 5

So in result we will sort all data O(M log M), then transfer numbers to master O(N) data size , then find media using binary search using min and max O(log L) performing binary search on each servers (it will cost O(log M) since array sorted) and transfer one number from each server after each search O(N * log L) data transfer.

O(Nlogk) solution. k - number of servers + (time to request and pass n/2 numbers between the servers)

Inspired by Saurabh's second solution.

1. Sort billion numbers individually on each server. Since its a fixed sized 32 bit long integers, we can use radix sort to sort in O(N) time.

2. Let's introduce min heap on one of the machines. Heap size would be equal to 10000 (k).

3. Ask each machine for min value and insert this values to the heap (keeping information which server this value came from)

4. Let's introduce counter to keep track of how many values we have considered.

5. Get min value from the heap, ask server-origin of this value for next min value and insert it into the heap. Increment the counter (O(logk) + query time)

6. Do step 5 till we get through half of the values. Compute median by picking exact value (if we can have one) or finding average of two values at the center.

One standard solution could be:

1) Each server sort their own numbers (radix sort seems rational approach)

2) If memory is an issue (which the problem does not state that)

Write the result to a file

4) Have one machine at a time merge all files till memory is full then to the next continue where the first left of and so on til reaching half of the total count of numbers and that is the exact median.

O(n) as radix is O(n) and merge is O(n/2) so => O(n)

A more sophisticated solution which "approximates" the mean would be to using a hashing algorithm that warranties even distribution and minimal collision of the numbers across the machines like MD5 for example.

So all the machines will contain the same numbers or close to that and sort or (n/2)th number and the median.

1) Each machine calculate a (HashCode + state)%10,000 and send the number over to the resulting 0-9,999 machine.

2) Calculate on the servers the (n/2)ht number and you got average the approximate mean and you got a very approximate mean calculation.

Solution 1 O(n)

10 trillion 32 bit integers

Each machine creates an integer array of size 2^16 to count the frequency of each 2^16 interval. The array only takes 2^16*4 = 256 k memory. Total size is 2.5G.

One master machine creates a 64 bit integer array of size 2^16 to merge the counts.

Master knows the ith number in the jth bucket is the median.

In the second pass, each machine count the frequency of each number in the jth bucket and pass to master.

Master figures out the ith number is the median. If 2.5G data transfer between servers is the bottle neck, then we can do a three level partition, 2^11, 2^11, and 2^10.

Solution 2 map reduce

The merge step of can also be done in parallel which hints for map reduce.

Use map reduce to count the frequencies of each number.

Output contains a max of 4B integers, frequency pairs sorted by integer.

We can scan the file and find the median.

If scanning the output file becomes the bottle neck, we can do a multi-level map reduce similar to solution 1.

Here is how we can get the median of the billion numbers on each of 10K servers.

I am going to assume that since its a server, its going to have 32GB of RAM to perform calculations on those billion integers at once. If RAM on the servers is less, the logic still works well just that we have to break billion integers into group of say 2 and then perform the same logic.

Logic:

We will find medians of those billion numbers separately on each of the 10K servers.

[Step1] Get the median of the billion numbers on all 10K machines.

Getting the median of the billion numbers is a 2 step process

a. Sort the numbers. Since its a 32 bit integer number (in java), we can sort them in O(N) time using LSD Radix Sort

[Step2] Now consider machine 1 and 2, We have 1 array each of billion numbers say arr1 and arr2 and we have 1 median each say m1 and m2.

Now we will compare those medians as follows

Case1: If m1 and m2 are same, we have the median on 2 servers. Its m1.

Case2:If m1 is greater than m2, then median is present in one of the below two subarrays.

a) From first element of arr1 to m1

b) From m2 to last element of arr2

Case3: If m2 is greater than m1, then median is present in one of the below two subarrays.

a) From m1 to last element of arr1

b) From first element of arr2 to m2

[Step3] Repeat Step2 until size of both the subarrays becomes 2. If size of the two arrays is 2 then we get median as

Median = (max(arr1[0], arr2[0]) + min(arr1[1], arr2[1]))/2

[Step4] Now we found the median on 2 machines, to include machine 3 in median calculation,

we calculate median of machine 3 billion numbers say m3.

We now follow the exact same 3 steps as of Step2. If m3 (median of m/c 3) is larger than m12 (median found in step3 of machine 1 and 2) then we create arr1 as all the elements of machine 1 and 2 greater than median m12.

This way we accommodate 1 machine at a time to calculate the median of all 10K machines.

I will post the program as soon as possible.

Total Time complexity:

Step 1: sorting using LSD radix sort : O(N)

Step2: Divide and Conquer Step O(N logN)

Step3: Calculating median, constant time O(1)

Step4: Repeating step for 10K -2 servers O(N)

Total Time Complexity: O(N logN)

Correction: Time complexity for Step2 is O(logN) and repeating this process for 10K server times O(N) * O(LogN)

Total Time Complexity: O(N logN)

What is the communication complexity? How many numbers would you have to transfer over the network? Are you even getting the right answer? What are you optimizing for? Time? What about the cost of running the machines: why not optimize for that?

Basically this question needs discussion. It is good to have this question out here, and people can try thinking of approaches which optimize different things, like time, cost etc, maybe even question the original use case, and change the whole design (like partition data differently etc).

You don't need to sort the billion integers, you can use the 'median of medians' algorithm to find the median in an unsorted array of integers in O(n).

First sort the integers in each of the servers by storing them in a hash table, database or sorted array (if necessary the integers on the servers can be broken down into many arrays/hash tables and the following process can be performed recursively.

Next, make queries into each of the hash tables to find the median of each table/array. Next, you know that the median must be between the smallest and the largest of the medians. So, we can throw away all elements above the largest and below the smallest.

When we just have one or two elements in each of the arrays, we can combine them all to make a larger array, and then take the median of that.

For example,

`[1,2,3,4],[2],[7,8,9],[1,1,1]`

here we would throw away

`[1,4],[],[7,9],[1,1]`

and end up with

`[2,3],[2],[8],[1]`

Then, we could form a new array with the remaining elements:

`[1,2,2,3,8]`

and the median is 2.

I doubt you can take median of smaller list and apply it to a bigger list. Your approach doesn't work.

Consider the below example:

```
[5,6,9,1] ==> Sorted ==> [1,5,6,9] ==> Median is 5,6
[5,2,2,2,2,2,3] ==> Sorted ==> [2,2,2,2,2,3,5] ==> Median is 2
[9,6,8] ==> Sorted ==> [6,8,9] ==> Median is 8
Resulting list after removing all items greater than & less than the median
[5,6]
[2]
[8]
Merging & Sorting them ==> [2,5,6,8]
Median (as per your algorithm) is (5+6)/2 ==> 5.5
Actual Median Algorithm:
[1,2,2,2,2,2,3,5,5,6,6,8,9,9] ==> 14 elements
7th element is '3'
8th element is '5'
Median is (3+5)/2 ==> 4
```

I have a idea, that using selection algorithm.

Selection algorithm whose running time is O(n) in the worst case. So on each sever we can use selection algorithm to find the (billion/2)th biggest element, and then we can get 10k integers that are medians of each sever. And then we use same procedure to find the (10k/2)th biggest element, which is the final result. The total time is linear time O(n).

The median of all the medians is not necessarily the median of the entire combined dataset. Consider an example with just two machines:

Machine 1: 2, 3, 4, 5, 6. Median is 4.

Machine 2: 6, 6, 6, 6, 6. Median is 6.

The correct answer for the median of the combined data would be 6, but here you would get 5.

Finding medians of each of 10,000 server and finding the median among them will work as it is stated that there billion integers(equal number of numbers).

Problem breaks down into

1) finding median on each server

2) finding medians among the medians

As there are 1billion integers, 1billion/2th(or +1) biggest number will be the median. This becomes finding kth biggest number problem which has a complexity of O(N)

Reference: Search for finding Kth biggest element in an unsorted array in stackoverflow

Store this in an array of 10,000 and again find the 5000th biggest number in this array, whose complexity is O(M)

So total complexity = O(N)

min-max heap is supposed to return you the exact median value. not average. I only read a paper on that but haven't tried implementing. Let me work on that.

Hi,

Median is not the average of min and max element in the array. So min-max heap is not going to work.

Median is the middle element of the sorted array if the total elements are odd, or the average of the middle 2 elements if the element count is even.

So for the elements {2,13,17,30,45} and {120,150,260,380} the median is 45.

With your logic, for first array min = 2, max = 45 so average = (45+2) / 2 = 23.5

for second array min=120, max=380 so average = 500/2 = 250

Now the avg of 250 and 23.5 is not 45.

@Algo,

So in the example of {1,2,3} and {4,5} you would return 2, 4, or 5? still incorrect

Clearly, there are two kinds of costs involved here:1. cost of computation happening within a server , 2. Cost of communicating between servers.

- anantpushkar009 October 28, 2014Also worth noting is that the second cost is going to be the bottleneck here, so we better focus on saving bandwidth, and simultaneously, try not to over strain each server individually. Inspired by the classic quick select algorithm, I propose something that we could call distributed quick select algorithm:

1. Select a control/master server. Probably the one with the least IP addr. Standard protocol, not a big deal.[something similar to the spaning tree protocol would do]

2. Control/master server selects a pivot from its own array and broadcasts the value to the est

3. Each slave server partitions its own array based on this pivot value and sends the left , central and right count to the master server.

4. Based on the total number of elements in each slave server and those that the master itself has, master selects one of the left, or right partitions to work on . Each of the slave servers will now be working on this partition.

5. if total_left_count>=n/2: work on left partitions;repeat 2 to 4

else If total_left_count+total_central_count<=n/2: pivot is the median;

Else work on right partition; repeat 2 to 4

Total internal computation :Linear

Total number of network communications: nlogn [we will have to run an average logn cycles, each cycle taking linear number of communications]

The second cost would improve if we used a multicast network.