Java源码示例:org.apache.mina.core.write.WriteRequestQueue
示例1
/**
* {@inheritDoc}
*/
public void write(VmPipeSession session, WriteRequest writeRequest) {
WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
writeRequestQueue.offer(session, writeRequest);
if (!session.isWriteSuspended()) {
this.flush(session);
}
}
示例2
/**
* {@inheritDoc}
*/
public final WriteRequestQueue getWriteRequestQueue() {
if (writeRequestQueue == null) {
throw new IllegalStateException();
}
return writeRequestQueue;
}
示例3
/**
* {@inheritDoc}
*/
public void write(S session, WriteRequest writeRequest) {
WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
writeRequestQueue.offer(session, writeRequest);
if (!session.isWriteSuspended()) {
this.flush(session);
}
}
示例4
@SuppressWarnings("unchecked")
@Override
public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
AbstractIoSession s = (AbstractIoSession) session;
// Maintain counters.
if (writeRequest.getMessage() instanceof IoBuffer) {
IoBuffer buffer = (IoBuffer) writeRequest.getMessage();
// I/O processor implementation will call buffer.reset()
// it after the write operation is finished, because
// the buffer will be specified with messageSent event.
buffer.mark();
int remaining = buffer.remaining();
if (remaining == 0) {
// Zero-sized buffer means the internal message
// delimiter.
s.increaseScheduledWriteMessages();
} else {
s.increaseScheduledWriteBytes(remaining);
}
} else {
s.increaseScheduledWriteMessages();
}
WriteRequestQueue writeRequestQueue = s.getWriteRequestQueue();
if (!s.isWriteSuspended()) {
if (writeRequestQueue.size() == 0) {
// We can write directly the message
s.getProcessor().write(s, writeRequest);
} else {
s.getWriteRequestQueue().offer(s, writeRequest);
s.getProcessor().flush(s);
}
} else {
s.getWriteRequestQueue().offer(s, writeRequest);
}
}
示例5
@Override
public WriteRequestQueue getWriteRequestQueue(IoSession session)
{
_wrqCount.getAndIncrement();
return new WriteRequestQueue()
{
private final ArrayDeque<WriteRequest> _wrq = new ArrayDeque<>();
@Override
public synchronized boolean offer(WriteRequest writeRequest) // message must be IoBuffer or FileRegion
{
_wrq.addLast(writeRequest);
return true;
}
@Override
public synchronized WriteRequest peek()
{
return _wrq.peekFirst();
}
@Override
public synchronized WriteRequest poll()
{
return _wrq.pollFirst();
}
@Override
public synchronized String toString()
{
return _wrq.toString();
}
};
}
示例6
WriteRequestQueue getWriteRequestQueue0() {
return super.getWriteRequestQueue();
}
示例7
/**
* {@inheritDoc}
*/
public CloseAwareWriteQueue(WriteRequestQueue queue) {
this.queue = queue;
}
示例8
public WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception {
return new DefaultWriteRequestQueue();
}
示例9
private boolean flush( S session, long currentTime ) throws Exception
{
final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
+ ( session.getConfig().getMaxReadBufferSize() >>> 1 );
int writtenBytes = 0;
try
{
for ( ;; )
{
WriteRequest req = session.getCurrentWriteRequest();
if ( req == null )
{
req = writeRequestQueue.poll( session );
if ( req == null )
{
setInterestedInWrite( session, false );
break;
}
session.setCurrentWriteRequest( req );
}
IoBuffer buf = ( IoBuffer ) req.getMessage();
if ( buf.remaining() == 0 )
{
// Clear and fire event
session.setCurrentWriteRequest( null );
buf.reset();
session.getFilterChain().fireMessageSent( req );
continue;
}
SocketAddress destination = req.getDestination();
if ( destination == null )
{
destination = session.getRemoteAddress();
}
int localWrittenBytes = send( session, buf, destination );
if ( ( localWrittenBytes == 0 ) || ( writtenBytes >= maxWrittenBytes ) )
{
// Kernel buffer is full or wrote too much
setInterestedInWrite( session, true );
return false;
}
else
{
setInterestedInWrite( session, false );
// Clear and fire event
session.setCurrentWriteRequest( null );
writtenBytes += localWrittenBytes;
buf.reset();
session.getFilterChain().fireMessageSent( req );
}
}
}
finally
{
session.increaseWrittenBytes( writtenBytes, currentTime );
}
return true;
}
示例10
private boolean flushNow(S session, long currentTime) {
if (!session.isConnected()) {
scheduleRemove(session);
return false;
}
final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
// Set limitation for the number of written bytes for read-write
// fairness. I used maxReadBufferSize * 3 / 2, which yields best
// performance in my experience while not breaking fairness much.
final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
+ (session.getConfig().getMaxReadBufferSize() >>> 1);
int writtenBytes = 0;
WriteRequest req = null;
try {
// Clear OP_WRITE
setInterestedInWrite(session, false);
do {
// Check for pending writes.
req = session.getCurrentWriteRequest();
if (req == null) {
req = writeRequestQueue.poll(session);
if (req == null) {
break;
}
session.setCurrentWriteRequest(req);
}
int localWrittenBytes = 0;
Object message = req.getMessage();
if (message instanceof IoBuffer) {
localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
currentTime);
if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) {
// the buffer isn't empty, we re-interest it in writing
writtenBytes += localWrittenBytes;
setInterestedInWrite(session, true);
return false;
}
} else if (message instanceof FileRegion) {
localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
currentTime);
// Fix for Java bug on Linux
// http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
// If there's still data to be written in the FileRegion,
// return 0 indicating that we need
// to pause until writing may resume.
if ((localWrittenBytes > 0) && (((FileRegion) message).getRemainingBytes() > 0)) {
writtenBytes += localWrittenBytes;
setInterestedInWrite(session, true);
return false;
}
} else {
throw new IllegalStateException("Don't know how to handle message of type '"
+ message.getClass().getName() + "'. Are you missing a protocol encoder?");
}
if (localWrittenBytes == 0) {
// Kernel buffer is full.
setInterestedInWrite(session, true);
return false;
}
writtenBytes += localWrittenBytes;
if (writtenBytes >= maxWrittenBytes) {
// Wrote too much
scheduleFlush(session);
return false;
}
} while (writtenBytes < maxWrittenBytes);
} catch (Exception e) {
if (req != null) {
req.getFuture().setException(e);
}
IoFilterChain filterChain = session.getFilterChain();
filterChain.fireExceptionCaught(e);
return false;
}
return true;
}
示例11
@Override
public WriteRequestQueue getWriteRequestQueue() {
return null;
}
示例12
@Override
public WriteRequestQueue getWriteRequestQueue() {
// TODO Auto-generated method stub
return null;
}
示例13
void flushNow(NioSession session) {
if (session.isClosing())
return;
try {
WriteRequestQueue writeQueue = session.getWriteRequestQueue();
for (WriteRequest req; (req = writeQueue.peek()) != null; writeQueue.poll()) {
Object message = req.writeRequestMessage();
if (message instanceof IoBuffer) {
IoBuffer buf = (IoBuffer)message;
if (buf.hasRemaining()) {
session.getChannel().write(buf.buf());
if (buf.hasRemaining()) {
session.setInterestedInWrite(true);
return;
}
}
req.writeRequestFuture().setWritten();
buf.free();
} else if (message instanceof FileRegion) {
FileRegion region = (FileRegion)message;
long len = region.getRemainingBytes();
if (len > 0) {
region.update(region.getFileChannel().transferTo(region.getPosition(), len, session.getChannel()));
if (region.getRemainingBytes() > 0) {
session.setInterestedInWrite(true);
return;
}
}
req.writeRequestFuture().setWritten();
} else if (req == NioSession.CLOSE_REQUEST) {
session.closeNow();
break;
} else if (req == NioSession.SHUTDOWN_REQUEST) {
session.getChannel().shutdownOutput();
break;
} else
throw new IllegalStateException("unknown message type for writting: " + message.getClass().getName() + ": " + message);
}
session.setInterestedInWrite(false);
} catch (Exception e) {
session.closeNow();
session.removeNow(e);
}
}
示例14
@Override
public WriteRequestQueue getWriteRequestQueue() {
return writeRequestQueue;
}
示例15
@Override
public WriteRequestQueue getWriteRequestQueue(IoSession session) {
return new DefaultWriteRequestQueue();
}
示例16
/**
* TODO Add method documentation
*/
WriteRequestQueue getWriteRequestQueue();
示例17
/**
* Create a new close aware write queue, based on the given write queue.
*
* @param writeRequestQueue
* The write request queue
*/
public final void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) {
this.writeRequestQueue = new CloseAwareWriteQueue(writeRequestQueue);
}
示例18
/**
* Returns an {@link WriteRequest} which is going to be associated with
* the specified <tt>session</tt>. Please note that the returned
* implementation must be thread-safe and robust enough to deal
* with various messages types (even what you didn't expect at all),
* especially when you are going to implement a priority queue which
* involves {@link Comparator}.
*/
WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception;
示例19
/**
* Get the queue that contains the message waiting for being written.
* As the reader might not be ready, it's frequent that the messages aren't written completely,
* or that some older messages are waiting to be written when a new message arrives.
* This queue is used to manage the backlog of messages.
*
* @return The queue containing the pending messages.
*/
WriteRequestQueue getWriteRequestQueue();
示例20
/**
* @return an {@link WriteRequest} which is going to be associated with the specified <tt>session</tt>.
* Please note that the returned implementation must be thread-safe and robust enough to deal
* with various messages types (even what you didn't expect at all),
* especially when you are going to implement a priority queue which involves {@link Comparator}.
*
* @param session The session for which we want the WriteRequest queue
*/
WriteRequestQueue getWriteRequestQueue(IoSession session);