128 lines
4.6 KiB
Kotlin
128 lines
4.6 KiB
Kotlin
/*
|
|
* Copyright 2020-2021 Moritz Ruth and Uranos contributors
|
|
* Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE file
|
|
*/
|
|
|
|
package space.uranos.net
|
|
|
|
import io.netty.buffer.ByteBuf
|
|
import io.netty.util.ReferenceCountUtil
|
|
import kotlinx.coroutines.channels.Channel
|
|
import kotlinx.coroutines.launch
|
|
import kotlinx.coroutines.withContext
|
|
import space.uranos.event.ifNotCancelled
|
|
import space.uranos.net.MinecraftProtocolDataTypes.readVarInt
|
|
import space.uranos.net.event.PacketReceivedEvent
|
|
import space.uranos.net.event.PacketSendEvent
|
|
import space.uranos.net.packet.IncomingPacket
|
|
import space.uranos.net.packet.Mergeable
|
|
import space.uranos.net.packet.OutgoingPacket
|
|
import space.uranos.util.awaitSuspending
|
|
|
|
class PacketsAdapter(val session: UranosSession) {
|
|
private val packetsForNextTick = ArrayList<OutgoingPacket>()
|
|
|
|
suspend fun tick() {
|
|
packetsForNextTick.forEach { send(it) } // TODO: Fix ConcurrentModificationException
|
|
packetsForNextTick.clear()
|
|
}
|
|
|
|
fun stopProcessingIncomingPackets() {
|
|
packetDataChannel.close()
|
|
packetsChannel?.close()
|
|
}
|
|
|
|
fun onPacketReceived(data: ByteBuf) {
|
|
packetDataChannel.offer(data)
|
|
}
|
|
|
|
private suspend fun handlePacket(packet: IncomingPacket) {
|
|
if (session.server.config.logging.shouldLog(packet)) session.logger.trace { "Packet received: $packet" }
|
|
|
|
session.server.eventBus.emit(PacketReceivedEvent(session, packet)).ifNotCancelled {
|
|
SessionPacketReceivedEventHandler.handle(session, packet)
|
|
}
|
|
}
|
|
|
|
suspend fun sendNextTick(packet: OutgoingPacket) = withContext(session.coroutineContext) {
|
|
if (packet is Mergeable) {
|
|
for (i in packetsForNextTick.indices.reversed()) {
|
|
val merged = packet.mergeWith(packetsForNextTick[i])
|
|
|
|
if (merged != null) {
|
|
packetsForNextTick[i] = merged
|
|
return@withContext
|
|
}
|
|
}
|
|
}
|
|
|
|
packetsForNextTick.add(packet)
|
|
}
|
|
|
|
suspend fun send(packet: OutgoingPacket): Unit = withContext(session.coroutineContext) {
|
|
if (session.server.config.logging.shouldLog(packet)) session.logger.trace { "Sending packet: $packet" }
|
|
|
|
session.server.eventBus.emit(PacketSendEvent(session, packet)).ifNotCancelled {
|
|
try {
|
|
session.channel.writeAndFlush(it.packet).awaitSuspending()
|
|
} catch (t: Throwable) {
|
|
if (session.channel.isActive) session.logger.error("Sending packet failed", t)
|
|
}
|
|
}
|
|
}
|
|
|
|
private var packetsChannel: Channel<IncomingPacket>? = null
|
|
private val packetDataChannel: Channel<ByteBuf> = Channel(Channel.UNLIMITED)
|
|
|
|
fun launchPacketsChannelConsumer() {
|
|
val channel = Channel<IncomingPacket>(session.server.config.packetsBufferSize)
|
|
packetsChannel = channel
|
|
|
|
session.scope.launch {
|
|
for (packet in channel) {
|
|
handlePacket(packet)
|
|
}
|
|
}
|
|
}
|
|
|
|
fun launchPacketDataChannelConsumer() {
|
|
session.scope.launch {
|
|
for (data in packetDataChannel) {
|
|
try {
|
|
handlePacketData(data)
|
|
} catch (e: Exception) {
|
|
session.failAndDisconnectBecauseOfClient(e)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private suspend fun handlePacketData(data: ByteBuf) {
|
|
val packetID = data.readVarInt()
|
|
val codec = session.currentProtocol!!.incomingPacketCodecsByID[packetID]
|
|
|
|
if (codec == null) {
|
|
val message = "Received an unknown packet (ID: 0x${packetID.toString(16).padStart(2, '0')})"
|
|
|
|
if (session.server.config.developmentMode)
|
|
session.logger warn "$message. This will cause the client to disconnect in production mode."
|
|
else session.failAndDisconnectBecauseOfClient(message)
|
|
|
|
return
|
|
}
|
|
|
|
val packet = codec.decode(data)
|
|
ReferenceCountUtil.release(data)
|
|
|
|
if (packetsChannel != null) {
|
|
// The protocol will not change anymore, so we can safely decode all pending packets with the current one
|
|
if (!packetsChannel!!.offer(packet)) {
|
|
session.logger.warn("The packet buffer is full (size: ${session.server.config.packetsBufferSize})")
|
|
session.disconnect(internalReason = "You sent more packets than the packet buffer can handle")
|
|
}
|
|
} else {
|
|
// The protocol could change because of this packet, so we must wait
|
|
handlePacket(packet)
|
|
}
|
|
}
|
|
}
|