Anton
BAN USERThere is one more mechanism which wasn't mentioned in the comments.
java.util.concurrent.Semaphore
Maintains multiple permits. The number of permits is determined by a constructor parameter.
Semaphore s = new Semaphore(permits)
When the parameter is set to 1 than Semaphore behaves as Lock.
- Anton December 26, 2014According to the javadocs and numerous comments in the Internet the only possible limit for BufferedStreams is just the size of the allocated heap.
So the only way is to write a custom implementation of limiting by overriding all read methods to check limit validity in advance.
thulasi.nari, it doesn't work this way. To make it clear try to run the test below:
public class BufferLimitTest {
@Test
public void testBufferedReaderLimit() throws IOException {
byte[] arr = {1,2,3,4,5,6};
InputStream is = new ByteArrayInputStream(arr);
InputStreamReader in = new InputStreamReader(is);
BufferedReader reader = new BufferedReader(in, 1);
int size = 0;
while(reader.read() != -1) {
size++;
}
assertThat(size, is(equalTo(1)));
}
}
It fails, because during every reading operation BufferedReader invokes its private method fill() which increases the buffer size if its current value isn't enough for reading one more element.
The second parameter of the constructor is just optimization which gives an opportunity not to copy array in future if you know which size will be enough beforehand.
There is one more solution with CountDownLatch.
public class ThreadJoinTest {
@Test
public void testThreadJoin() {
ThreadJoin t = new ThreadJoin(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(3);
} catch(InterruptedException e) {
System.err.println(e.getMessage());
}
System.out.println("Task has been finished");
}
});
t.start();
t.threadJoin();
assertThat(t.getState(), is(equalTo(Thread.State.TERMINATED)));
}
}
public class ThreadJoin extends Thread {
private CountDownLatch latch = new CountDownLatch(1);
public ThreadJoin(Runnable runnable) {
super(runnable);
}
@Override
public void run() {
super.run();
latch.countDown();
}
public void threadJoin() {
try {
latch.await();
} catch (InterruptedException e) {
System.err.println("Exception occured while joining");
e.printStackTrace();
}
}
}
What do you think about using DelayQueue?
Please, look at the code below.
public class ClientServerTest {
private DelayQueue<Job> queue = new DelayQueue<>();
private Random rand = new Random(System.nanoTime());
@Test
public void testMultipleClientProcessTasks() {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.submit(new Runnable() {
@Override
public void run() {
Client client = new Client(queue);
client.addTask(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
System.err.println(Thread.currentThread()
.getId() + ": " + e.getMessage());
}
System.out.println(Thread.currentThread().getId());
}
}, rand.nextInt(500), TimeUnit.MILLISECONDS);
}
});
}
exec.submit(new Runnable() {
@Override
public void run() {
Server server = new Server(queue);
server.processTasks();
}
});
try {
TimeUnit.SECONDS.sleep(8);
} catch (InterruptedException e) {
}
assertThat(queue.isEmpty(), is(true));
exec.shutdownNow();
}
}
public class Client {
private DelayQueue<Job> queue;
public Client(DelayQueue<Job> queue) {
this.queue = queue;
}
public void addTask(Runnable job, long delay, TimeUnit timeUnit) {
queue.put(new Job(job, delay, timeUnit));
}
}
public class Server {
private DelayQueue<Job> queue;
public Server(DelayQueue<Job> queue) {
this.queue = queue;
}
public void processTasks() {
while(!Thread.interrupted()) {
try {
queue.take().job().run();
} catch (InterruptedException e) {
System.err.println(e.getMessage());
}
}
}
}
public class Job implements Delayed {
private Runnable job;
private TimeUnit timeUnit;
private long delay;
private long trigger;
public Job(Runnable job, long delay, TimeUnit timeUnit) {
this.job = job;
this.delay = delay;
this.timeUnit = timeUnit;
this.trigger = System.nanoTime() + TimeUnit.NANOSECONDS.convert(delay, timeUnit);
}
@Override
public int compareTo(Delayed o) {
Job that = (Job)o;
if (this.trigger - that.trigger < 0) {
return -1;
} else if (this.trigger - that.trigger > 0) {
return 1;
} else {
return 0;
}
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(trigger - System.nanoTime(), TimeUnit.NANOSECONDS);
}
public Runnable job() {
return job;
}
}
Here is my solution:
- Anton December 26, 2014