Can I use ChannelTrafficShapingHandler to control the network read/write speed in shuffle?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Can I use ChannelTrafficShapingHandler to control the network read/write speed in shuffle?

hustnn
Hi All:

I am trying to control the network read/write speed with ChannelTrafficShapingHandler provided by Netty.


In TransportContext.java

I modify it as below:

public TransportChannelHandler initializePipeline(
SocketChannel channel,
RpcHandler channelRpcHandler) {
try {
// added by zhaojie
logger.info("want to try control read bandwidth on host: " + host);
final ChannelTrafficShapingHandler channelShaping = new ChannelTrafficShapingHandler(50, 50, 1000);

TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);

channel.pipeline()
.addLast("encoder", ENCODER)
.addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
.addLast("decoder", DECODER)
.addLast("channelTrafficShaping", channelShaping)
.addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
// NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
// would require more logic to guarantee if this were not part of the same event loop.
.addLast("handler", channelHandler);

I create a ChannelTrafficShapingHandler and register it into the pipeline of the channel. I set the write and read speed as 50kb/sec in the constructor. 
Except for it, what else do I need to do?

However, it does not work. Is this idea correct? Am I missing something?   
Is there any better way ?

Thanks.

--
Regards,
Zhaojie

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Can I use ChannelTrafficShapingHandler to control the network read/write speed in shuffle?

Shixiong(Ryan) Zhu
I took a look at ChannelTrafficShapingHandler. Looks like it's because it doesn't support FileRegion. Spark's messages use this interface. See org.apache.spark.network.protocol.MessageWithHeader.

On Tue, Jun 13, 2017 at 4:17 AM, Niu Zhaojie <[hidden email]> wrote:
Hi All:

I am trying to control the network read/write speed with ChannelTrafficShapingHandler provided by Netty.


In TransportContext.java

I modify it as below:

public TransportChannelHandler initializePipeline(
SocketChannel channel,
RpcHandler channelRpcHandler) {
try {
// added by zhaojie
logger.info("want to try control read bandwidth on host: " + host);
final ChannelTrafficShapingHandler channelShaping = new ChannelTrafficShapingHandler(50, 50, 1000);

TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);

channel.pipeline()
.addLast("encoder", ENCODER)
.addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
.addLast("decoder", DECODER)
.addLast("channelTrafficShaping", channelShaping)
.addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
// NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
// would require more logic to guarantee if this were not part of the same event loop.
.addLast("handler", channelHandler);

I create a ChannelTrafficShapingHandler and register it into the pipeline of the channel. I set the write and read speed as 50kb/sec in the constructor. 
Except for it, what else do I need to do?

However, it does not work. Is this idea correct? Am I missing something?   
Is there any better way ?

Thanks.

--
Regards,
Zhaojie


Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Can I use ChannelTrafficShapingHandler to control the network read/write speed in shuffle?

hustnn
Hi Shixiong:

Thanks for the reply. You are right. It seems it only supports the following two types.

I will retry by adding FileRegion type.

protected long calculateSize(Object msg) {
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
}
if (msg instanceof ByteBufHolder) {
return ((ByteBufHolder) msg).content().readableBytes();
}
return -1;
}

2017-06-14 1:34 GMT+08:00 Shixiong(Ryan) Zhu <[hidden email]>:
I took a look at ChannelTrafficShapingHandler. Looks like it's because it doesn't support FileRegion. Spark's messages use this interface. See org.apache.spark.network.protocol.MessageWithHeader.

On Tue, Jun 13, 2017 at 4:17 AM, Niu Zhaojie <[hidden email]> wrote:
Hi All:

I am trying to control the network read/write speed with ChannelTrafficShapingHandler provided by Netty.


In TransportContext.java

I modify it as below:

public TransportChannelHandler initializePipeline(
SocketChannel channel,
RpcHandler channelRpcHandler) {
try {
// added by zhaojie
logger.info("want to try control read bandwidth on host: " + host);
final ChannelTrafficShapingHandler channelShaping = new ChannelTrafficShapingHandler(50, 50, 1000);

TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);

channel.pipeline()
.addLast("encoder", ENCODER)
.addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
.addLast("decoder", DECODER)
.addLast("channelTrafficShaping", channelShaping)
.addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
// NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
// would require more logic to guarantee if this were not part of the same event loop.
.addLast("handler", channelHandler);

I create a ChannelTrafficShapingHandler and register it into the pipeline of the channel. I set the write and read speed as 50kb/sec in the constructor. 
Except for it, what else do I need to do?

However, it does not work. Is this idea correct? Am I missing something?   
Is there any better way ?

Thanks.

--
Regards,
Zhaojie





--
Regards,
Zhaojie

Loading...