Flipkart Interview Question for SDE-2s


Country: India
Interview Type: In-Person




Comment hidden because of low score. Click to expand.
1
of 1 vote

Simple one producer binds to exchange, exchange has n queues each queue can have a consumer. I'm using three Queues one for odd one for even for for just number(both odd even).Each queue has one consumer feeding off it.

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
 * 
 * @author Nalin.Sharma
 *
 */
/*
 * Concurrent linked Queue
 * simple one producer one exchange multiple queues, one q to one consumer example
 */
class Sleep{
	static void sleep(){
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
}
}
//consumer job
class Consumer implements Runnable{
	//ds
	Queue q;
	String name;
	boolean hasReceived = false; 
	
	Consumer(String n){
		name = n;
		(new Thread(this)).start();
	}
	@Override
	public void run() {
		hasReceived = true;
		while(true){
		Sleep.sleep();
			if(!q.isEmpty())
		System.out.println(name+" reading "+q.remove());
		}
	}
}
//producer job
class Producer implements Runnable{
	//ds
	Exchange exchange;
	//Object payload;
	String routingKey;
	//boolean hasSent = true;
	
	Producer(Exchange e){
		exchange = e;
		(new Thread(this)).start();
	}
	@Override
	public void run() {
		//hasSent = true;
		int i = 0;
		Sleep.sleep();
		while(i < 100){
		
		if(i%2 == 0)	
		exchange.addAndRoute(i,"even,number");
		else
			exchange.addAndRoute(i,"odd,number");	
		System.out.println("prod .. "+ i);
		i++;
		}
		System.out.println("prod done ");
	}
}
class MyQueue{
	//ds
	String routingKey;
	Queue q;
	Consumer c; //you can count number of consumers of a type and spawn multiple threads
	
	public MyQueue(String routingKey, Queue q, Consumer c) {
		this.routingKey = routingKey;
		this.q = q;
		this.c = c;
		c.q = q; //one to one
	}
}

class Exchange{
	//ds
	String name;
	Producer p; //one producer
	Map<String,List<MyQueue>> qs = new HashMap<>(); //routing queue and list of queues(each having one consumer)
	
	Exchange(String n){
		name = n;
	}
	void bindProducer(Producer p){
		this.p = p;
	}
	void bindConsumer(MyQueue q){
		if(qs.get(q.routingKey) == null)
		qs.put(q.routingKey, new ArrayList<>());
		qs.get(q.routingKey).add(q);
	}
	void addAndRoute(Object payload, String routingKeys){
		String [] rKeys = routingKeys.split(",");
		for(String key :rKeys){
			List<MyQueue> Q = qs.get(key);
			for (int i = 0; i < Q.size(); i++) {
				Q.get(i).q.add(payload); 
			}
		}
	}
}

class Broker{
	//ds
	Exchange exchange;
	
	Broker(Exchange e, Producer p){
		this.exchange = e;
		this.exchange.p = p;
	}
	void bindToExchange(Queue q, Consumer c, String rq){
		MyQueue mq = new MyQueue(rq, q, c);
		exchange.bindConsumer(mq);
	}
}
public class MessageBrokerDesign {

	
	public static void main(String[] args) {
		Exchange e = new Exchange("e1");
		Producer p = new Producer(e);
		Broker mb = new Broker(e, p);
		
		Queue odd = new ConcurrentLinkedQueue();
		Consumer oddC1 = new Consumer("odd");
		mb.bindToExchange(odd, oddC1, "odd");

		Queue even = new ConcurrentLinkedQueue();
		Consumer evenC1 = new Consumer("even");
		mb.bindToExchange(even, evenC1, "even");
		
		Queue number = new ConcurrentLinkedQueue();
		Consumer numberC = new Consumer("number");
		mb.bindToExchange(number, numberC, "number");
	}
}

- sharmanalin59 March 15, 2018 | Flag Reply
Comment hidden because of low score. Click to expand.
0
of 0 vote

Use a ConcurrentLinkedQueue or even a ConcurrentLinkedDeque (from java.util.concurrent package or another equivalent one if you prefer to write or from a different library/language) data structure as the Message container in the server side. You can then vary pub-sub implementations around this be it one-way publisher, one pub many consumer etc. Notice that you may actually want to augment the Queue data structure a bit to provide a queue name and have publishers/subscribers to communicate using that named queue.

- rkalyankumar April 06, 2015 | Flag Reply
Comment hidden because of low score. Click to expand.
0
of 0 vote

// Bit simplified version of the same thing...let me know what you guys think
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

class Consumer implements Runnable{
    //ds
    Exchange exchange;
    String consumerName;
    String routingQueue;
    Thread consumerThread;

    Consumer(String consumerName, Exchange e, String rq){
        this.consumerName = consumerName;
        exchange = e;
        routingQueue = rq;

        consumerThread = (new Thread(this));
        consumerThread.start();
    }
    @Override
    public void run() {
        while(true){
            try {
                consumerThread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Object queueMessage = exchange.consumeMessage(routingQueue);
            if(queueMessage != null)
                System.out.println(consumerName +" reading "+queueMessage);
        }
    }
}
//producer job
class Producer implements Runnable{
    //ds
    Exchange exchange;
    Thread producerThread;

    Producer(Exchange e){
        exchange = e;
        producerThread = (new Thread(this));
        producerThread.start();
    }
    @Override
    public void run() {
        //hasSent = true;
        int i = 0;

        while(i < 100){
            if(i%2 == 0)
                exchange.addAndRouteQueueMessage(i,"even");
            else
                exchange.addAndRouteQueueMessage(i,"odd");
            System.out.println("prod .. "+ i);
            i++;
        }
        System.out.println("prod done ");
    }
}


class Exchange{
    
    Map<String, Queue> allQueues = new HashMap<>();
    
    void addAndRouteQueueMessage(Object payload, String routingKeys){
        String [] rKeys = routingKeys.split(",");
        for(String key :rKeys){
            if(!allQueues.containsKey(key)){
                System.out.println("Routing queue not found : " + key);
                continue;
            }
            Queue currentQueue = allQueues.get(key);
            currentQueue.add(payload);
        }
    }

    public void addNewQueue(Queue q, String queueRoutingKey) {
        allQueues.put(queueRoutingKey, q);
    }

    public Object consumeMessage(String routingQueue) {

        Queue currentQueue = allQueues.get(routingQueue);
        if(currentQueue.isEmpty())
            return null;
        return currentQueue.remove();
    }
}

class Broker{
    //ds
    Exchange exchange;

    Broker(Exchange e){
        this.exchange = e;
    }

    void addQueueInExchange(Queue q, String queueRoutingKey){
        exchange.addNewQueue(q,queueRoutingKey);
    }

}
public class TestingPC {

    public static void main(String[] args) {
        Exchange e = new Exchange();
        Broker messageBroker = new Broker(e);

        Queue odd = new ConcurrentLinkedQueue();
        messageBroker.addQueueInExchange(odd, "odd");

        Queue even = new ConcurrentLinkedQueue();
        messageBroker.addQueueInExchange(even, "even");
        
        Producer p = new Producer(e);
        Consumer C1 = new Consumer("Odd consumer", e, "odd");
        Consumer C2 = new Consumer("Even consumer", e, "odd");
    }
}

- James December 29, 2019 | Flag Reply
Comment hidden because of low score. Click to expand.
-1
of 1 vote

Assume you have k processors/cores.
We will now create k queues of Messages, one for each processor. The idea is now that each core will process Messages in its respective queue.

Now, when publisher sends a message to the broker, the broker will put the message in the queue with the least number of Messages currently in it, which would be in O(k) time. If k is a small number, like 4 or 8, this would be trivial. However to scale up with many different processors, you could use a min heap to keep track of the queues that have the least number of messages.

If we assume that the MessageBroker is what contains the mapping between publisher and subscriber, then simply keep a hashtable that holds this mapping. Because processors would only read the hashtable for mappings, without changing it, then this would be threadsafe and there would be no need for mutex in the general case. However, if subscribers needs to be updated by the broker, processing can be stalled to do updates with the mutex.

If we want to scale up, the hashtable could be split, so that updates in one partition of the hashtable wouldn't affect performance of reading other partitions.

- Skor March 15, 2015 | Flag Reply
Comment hidden because of low score. Click to expand.
0
of 0 votes

I don't you should consider number of processors when designing. It should work seamlessly for single / multi processor. The idea about keeping the hashmap of Pubs and Subs is sound but do not see point in keeping k queues.

- Vib March 20, 2015 | Flag


Add a Comment
Name:

Writing Code? Surround your code with {{{ and }}} to preserve whitespace.

Books

is a comprehensive book on getting a job at a top tech company, while focuses on dev interviews and does this for PMs.

Learn More

Videos

CareerCup's interview videos give you a real-life look at technical interviews. In these unscripted videos, watch how other candidates handle tough questions and how the interviewer thinks about their performance.

Learn More

Resume Review

Most engineers make critical mistakes on their resumes -- we can fix your resume with our custom resume review service. And, we use fellow engineers as our resume reviewers, so you can be sure that we "get" what you're saying.

Learn More

Mock Interviews

Our Mock Interviews will be conducted "in character" just like a real interview, and can focus on whatever topics you want. All our interviewers have worked for Microsoft, Google or Amazon, you know you'll get a true-to-life experience.

Learn More