Flipkart Interview Question
SDE-2sCountry: India
Interview Type: In-Person
Use a ConcurrentLinkedQueue or even a ConcurrentLinkedDeque (from java.util.concurrent package or another equivalent one if you prefer to write or from a different library/language) data structure as the Message container in the server side. You can then vary pub-sub implementations around this be it one-way publisher, one pub many consumer etc. Notice that you may actually want to augment the Queue data structure a bit to provide a queue name and have publishers/subscribers to communicate using that named queue.
// Bit simplified version of the same thing...let me know what you guys think
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
class Consumer implements Runnable{
//ds
Exchange exchange;
String consumerName;
String routingQueue;
Thread consumerThread;
Consumer(String consumerName, Exchange e, String rq){
this.consumerName = consumerName;
exchange = e;
routingQueue = rq;
consumerThread = (new Thread(this));
consumerThread.start();
}
@Override
public void run() {
while(true){
try {
consumerThread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Object queueMessage = exchange.consumeMessage(routingQueue);
if(queueMessage != null)
System.out.println(consumerName +" reading "+queueMessage);
}
}
}
//producer job
class Producer implements Runnable{
//ds
Exchange exchange;
Thread producerThread;
Producer(Exchange e){
exchange = e;
producerThread = (new Thread(this));
producerThread.start();
}
@Override
public void run() {
//hasSent = true;
int i = 0;
while(i < 100){
if(i%2 == 0)
exchange.addAndRouteQueueMessage(i,"even");
else
exchange.addAndRouteQueueMessage(i,"odd");
System.out.println("prod .. "+ i);
i++;
}
System.out.println("prod done ");
}
}
class Exchange{
Map<String, Queue> allQueues = new HashMap<>();
void addAndRouteQueueMessage(Object payload, String routingKeys){
String [] rKeys = routingKeys.split(",");
for(String key :rKeys){
if(!allQueues.containsKey(key)){
System.out.println("Routing queue not found : " + key);
continue;
}
Queue currentQueue = allQueues.get(key);
currentQueue.add(payload);
}
}
public void addNewQueue(Queue q, String queueRoutingKey) {
allQueues.put(queueRoutingKey, q);
}
public Object consumeMessage(String routingQueue) {
Queue currentQueue = allQueues.get(routingQueue);
if(currentQueue.isEmpty())
return null;
return currentQueue.remove();
}
}
class Broker{
//ds
Exchange exchange;
Broker(Exchange e){
this.exchange = e;
}
void addQueueInExchange(Queue q, String queueRoutingKey){
exchange.addNewQueue(q,queueRoutingKey);
}
}
public class TestingPC {
public static void main(String[] args) {
Exchange e = new Exchange();
Broker messageBroker = new Broker(e);
Queue odd = new ConcurrentLinkedQueue();
messageBroker.addQueueInExchange(odd, "odd");
Queue even = new ConcurrentLinkedQueue();
messageBroker.addQueueInExchange(even, "even");
Producer p = new Producer(e);
Consumer C1 = new Consumer("Odd consumer", e, "odd");
Consumer C2 = new Consumer("Even consumer", e, "odd");
}
}
Assume you have k processors/cores.
We will now create k queues of Messages, one for each processor. The idea is now that each core will process Messages in its respective queue.
Now, when publisher sends a message to the broker, the broker will put the message in the queue with the least number of Messages currently in it, which would be in O(k) time. If k is a small number, like 4 or 8, this would be trivial. However to scale up with many different processors, you could use a min heap to keep track of the queues that have the least number of messages.
If we assume that the MessageBroker is what contains the mapping between publisher and subscriber, then simply keep a hashtable that holds this mapping. Because processors would only read the hashtable for mappings, without changing it, then this would be threadsafe and there would be no need for mutex in the general case. However, if subscribers needs to be updated by the broker, processing can be stalled to do updates with the mutex.
If we want to scale up, the hashtable could be split, so that updates in one partition of the hashtable wouldn't affect performance of reading other partitions.
Simple one producer binds to exchange, exchange has n queues each queue can have a consumer. I'm using three Queues one for odd one for even for for just number(both odd even).Each queue has one consumer feeding off it.
- sharmanalin59 March 15, 2018