From 32103d695a8278d8dff8b017df9c43183ff12e45 Mon Sep 17 00:00:00 2001 From: Moritz Ruth Date: Sat, 2 Jan 2021 22:31:31 +0100 Subject: [PATCH] Update Gradle, fix a ByteBuf leak and kick the client when a decoding error occurs --- .../main/kotlin/space/blokk/logging/Logger.kt | 9 +- .../blokk/net/packet/play/ChunkDataPacket.kt | 10 +- .../main/kotlin/space/blokk/BlokkServer.kt | 6 - .../kotlin/space/blokk/config/BlokkConfig.kt | 2 +- .../logging/BlokkLoggingOutputProvider.kt | 2 +- .../kotlin/space/blokk/net/BlokkSession.kt | 190 ++++++++++-------- .../space/blokk/net/LoginAndJoinProcedure.kt | 4 +- .../blokk/net/PacketMessageDuplexHandler.kt | 8 +- .../login/EncryptionResponsePacketHandler.kt | 2 +- .../packet/login/LoginStartPacketHandler.kt | 2 +- .../play/ClientSettingsPacketHandler.kt | 2 +- .../IncomingPluginMessagePacketHandler.kt | 2 +- .../kotlin/space/blokk/player/BlokkPlayer.kt | 6 + .../src/main/resources/default-config.yml | 4 +- gradle/wrapper/gradle-wrapper.properties | 2 +- 15 files changed, 130 insertions(+), 121 deletions(-) diff --git a/blokk-api/src/main/kotlin/space/blokk/logging/Logger.kt b/blokk-api/src/main/kotlin/space/blokk/logging/Logger.kt index b8829e0..6a56706 100644 --- a/blokk-api/src/main/kotlin/space/blokk/logging/Logger.kt +++ b/blokk-api/src/main/kotlin/space/blokk/logging/Logger.kt @@ -3,15 +3,10 @@ package space.blokk.logging import space.blokk.Blokk class Logger(val name: String, private val printThreadName: Boolean = true) { - fun log(level: Level, message: String, throwable: Throwable? = null) = + private fun log(level: Level, message: String, throwable: Throwable? = null) = Blokk.loggingOutputProvider.log(printThreadName, name, level, message, throwable) - fun error(msg: String, t: Throwable) { - if (Level.ERROR.isEnabled) { - error(msg) - t.printStackTrace() - } - } + fun error(message: String, throwable: Throwable) = log(Level.ERROR, message, throwable) infix fun error(message: String) = log(Level.ERROR, message, null) infix fun info(message: String) = log(Level.INFO, message, null) diff --git a/blokk-packets/src/main/kotlin/space/blokk/net/packet/play/ChunkDataPacket.kt b/blokk-packets/src/main/kotlin/space/blokk/net/packet/play/ChunkDataPacket.kt index 9cb9d80..019e542 100644 --- a/blokk-packets/src/main/kotlin/space/blokk/net/packet/play/ChunkDataPacket.kt +++ b/blokk-packets/src/main/kotlin/space/blokk/net/packet/play/ChunkDataPacket.kt @@ -8,12 +8,4 @@ import space.blokk.world.ChunkData /** * Sent to inform the player about blocks and biomes in a chunk. */ -data class ChunkDataPacket(val key: Chunk.Key, val data: ChunkData) : OutgoingPacket() { - init { - if (key.x == 0 && key.z == 0) { - data.sections[0]?.forEach { - print(it) - } - } - } -} +data class ChunkDataPacket(val key: Chunk.Key, val data: ChunkData) : OutgoingPacket() diff --git a/blokk-server/src/main/kotlin/space/blokk/BlokkServer.kt b/blokk-server/src/main/kotlin/space/blokk/BlokkServer.kt index b02969d..3d673a2 100644 --- a/blokk-server/src/main/kotlin/space/blokk/BlokkServer.kt +++ b/blokk-server/src/main/kotlin/space/blokk/BlokkServer.kt @@ -46,12 +46,6 @@ class BlokkServer internal constructor() : Server { CoroutineName("Server") + SupervisorJob() + Executors.newSingleThreadExecutor { r -> Thread(r, "server") }.asCoroutineDispatcher() - init { - coroutineContext.job.invokeOnCompletion { - println(it) - } - } - override val sessions by socketServer::sessions override val players get() = sessions.mapNotNull { it.player } diff --git a/blokk-server/src/main/kotlin/space/blokk/config/BlokkConfig.kt b/blokk-server/src/main/kotlin/space/blokk/config/BlokkConfig.kt index fc59111..00983fe 100644 --- a/blokk-server/src/main/kotlin/space/blokk/config/BlokkConfig.kt +++ b/blokk-server/src/main/kotlin/space/blokk/config/BlokkConfig.kt @@ -6,7 +6,7 @@ import java.time.Duration data class BlokkConfig( val port: Int, val host: String, - val silentNonServerErrors: Boolean, + val ignoreClientCausedErrors: Boolean, val authenticateAndEncrypt: Boolean, val developmentMode: Boolean, val minLogLevel: Logger.Level, diff --git a/blokk-server/src/main/kotlin/space/blokk/logging/BlokkLoggingOutputProvider.kt b/blokk-server/src/main/kotlin/space/blokk/logging/BlokkLoggingOutputProvider.kt index 6944379..f51f678 100644 --- a/blokk-server/src/main/kotlin/space/blokk/logging/BlokkLoggingOutputProvider.kt +++ b/blokk-server/src/main/kotlin/space/blokk/logging/BlokkLoggingOutputProvider.kt @@ -16,7 +16,6 @@ object BlokkLoggingOutputProvider : LoggingOutputProvider { throwable: Throwable? ) { if (!level.isEnabled) return - val time = Date() val stream: PrintStream = if (level.isGreaterOrEqualThan(Logger.Level.ERROR)) System.err else System.out @@ -40,5 +39,6 @@ object BlokkLoggingOutputProvider : LoggingOutputProvider { parts.add(message) stream.println(parts.joinToString(" ")) + throwable?.printStackTrace(stream) } } diff --git a/blokk-server/src/main/kotlin/space/blokk/net/BlokkSession.kt b/blokk-server/src/main/kotlin/space/blokk/net/BlokkSession.kt index 1a9851e..238cd72 100644 --- a/blokk-server/src/main/kotlin/space/blokk/net/BlokkSession.kt +++ b/blokk-server/src/main/kotlin/space/blokk/net/BlokkSession.kt @@ -1,6 +1,7 @@ package space.blokk.net import io.netty.buffer.ByteBuf +import io.netty.util.ReferenceCountUtil import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import space.blokk.BlokkServer @@ -24,7 +25,6 @@ import space.blokk.net.packet.status.StatusProtocol import space.blokk.server.event.SessionInitializedEvent import space.blokk.util.awaitSuspending import space.blokk.util.supervisorChild -import java.io.IOException import java.net.InetAddress import java.net.InetSocketAddress import javax.crypto.SecretKey @@ -63,64 +63,11 @@ class BlokkSession(private val channel: io.netty.channel.Channel, val server: Bl is State.Disconnected -> null } - private var packetsChannel: Channel? = null - private val packetDataChannel: Channel = Channel(Channel.UNLIMITED) - - fun launchPacketsChannelWorker() { - val channel = Channel(Channel.UNLIMITED) - packetsChannel = channel - - scope.launch { - for (packet in channel) { - handlePacket(packet) - } - } - } - - private suspend fun handlePacket(packet: IncomingPacket) { - logger.trace { "Packet received: $packet" } - - server.eventBus.emit(PacketReceivedEvent(this@BlokkSession, packet)).ifNotCancelled { - SessionPacketReceivedEventHandler.handle(this@BlokkSession, packet) - } - } - - init { - logger trace "Created" - - scope.launch { - for (data in packetDataChannel) { - val packetID = data.readVarInt() - val codec = currentProtocol!!.incomingPacketCodecsByID[packetID] - - if (codec == null) { - val message = "Client sent an unknown packet (ID: $packetID)" - - if (server.config.developmentMode) { - logger warn "$message. This will cause the client to disconnect in production mode." - continue - } else throw IOException(message) - } - - val packet = codec.decode(data) - - if (packetsChannel != null) { - // The protocol will not change anymore, so we can safely decode all packets with the current one - if (!packetsChannel!!.offer(packet)) { - logger.warn("The packet buffer is full (size: ${server.config.packetsBufferSize})") - disconnect(internalReason = "You sent more packets than the packet buffer can handle") - } - } else { - // The protocol could change because of this packet, so we must wait - handlePacket(packet) - } - } - } - } - override suspend fun disconnect(reason: TextComponent, internalReason: String?) { + stopProcessingIncomingPackets() + if (currentProtocol == LoginProtocol || currentProtocol == PlayProtocol) { - logger info "Disconnected." + (internalReason?.let { " Internal reason: $it" } ?: "") + logger info "Disconnected" + (internalReason?.let { ". Internal reason: $it" } ?: "") val finalReason = if (server.config.developmentMode && internalReason != null) { val additional = listOf( @@ -138,12 +85,82 @@ class BlokkSession(private val channel: io.netty.channel.Channel, val server: Bl LoginProtocol -> send(space.blokk.net.packet.login.DisconnectPacket(finalReason)) PlayProtocol -> send(space.blokk.net.packet.play.DisconnectPacket(finalReason)) } - } + } else logger trace "Disconnected" onDisconnect(true) channel.close() } + private var packetsChannel: Channel? = null + private val packetDataChannel: Channel = Channel(Channel.UNLIMITED) + + fun launchPacketsChannelConsumer() { + val channel = Channel(server.config.packetsBufferSize) + packetsChannel = channel + + scope.launch { + for (packet in channel) { + handlePacket(packet) + } + } + } + + private fun launchPacketDataChannelConsumer() { + scope.launch { + for (data in packetDataChannel) { + try { + handlePacketData(data) + } catch (e: Exception) { + failAndDisconnectBecauseOfClient(e) + } + } + } + } + + private fun stopProcessingIncomingPackets() { + packetDataChannel.close() + packetsChannel?.close() + } + + private suspend fun handlePacket(packet: IncomingPacket) { + logger.trace { "Packet received: $packet" } + + server.eventBus.emit(PacketReceivedEvent(this@BlokkSession, packet)).ifNotCancelled { + SessionPacketReceivedEventHandler.handle(this@BlokkSession, packet) + } + } + + private suspend fun handlePacketData(data: ByteBuf) { + val packetID = data.readVarInt() + val codec = currentProtocol!!.incomingPacketCodecsByID[packetID] + + if (codec == null) { + val message = "Received an unknown packet (ID: $packetID)" + + if (server.config.developmentMode) + logger warn "$message. This will cause the client to disconnect in production mode." + else failAndDisconnectBecauseOfClient(message) + + return + } + + val packet = codec.decode(data) + ReferenceCountUtil.release(data) + + if (packetsChannel != null) { + // The protocol will not change anymore, so we can safely decode all pending packets with the current one + if (!packetsChannel!!.offer(packet)) { + logger.warn("The packet buffer is full (size: ${server.config.packetsBufferSize})") + disconnect(internalReason = "You sent more packets than the packet buffer can handle") + } + } else { + // The protocol could change because of this packet, so we must wait + handlePacket(packet) + } + } + + val joinProcedure = LoginAndJoinProcedure(this) + var lastKeepAlivePacketTimestamp by Delegates.notNull() lateinit var keepAliveDisconnectJob: Job @@ -168,39 +185,52 @@ class BlokkSession(private val channel: io.netty.channel.Channel, val server: Bl } } - val joinProcedure = LoginAndJoinProcedure(this) - fun onConnect() = scope.launch { logger trace "Connected" + launchPacketDataChannelConsumer() + state = State.WaitingForHandshake if (server.eventBus.emit(SessionInitializedEvent(this@BlokkSession)).cancelled) channel.close() else server.sessions.add(this@BlokkSession) } fun onDisconnect(expected: Boolean) { - if (state == State.Disconnected) return - if (!expected && currentProtocol != HandshakingProtocol && currentProtocol != StatusProtocol) - logger trace "The client disconnected unexpectedly" + scope.launch { + if (state == State.Disconnected) return@launch + if (!expected && currentProtocol != HandshakingProtocol && currentProtocol != StatusProtocol) + logger trace "The client disconnected unexpectedly" - val closeException = DisconnectedCancellationException() - - state = State.Disconnected - server.sessions.remove(this) - - packetDataChannel.close(closeException) - packetsChannel?.close(closeException) - - coroutineContext.cancel(closeException) + stopProcessingIncomingPackets() + coroutineContext.cancel(DisconnectedCancellationException()) + state = State.Disconnected + server.sessions.remove(this@BlokkSession) + } } fun onPacketDataReceived(packetData: ByteBuf) { packetDataChannel.offer(packetData) } - fun failBecauseOfClient(message: String) { - val messageGetter = { "The client caused a connection error: $message" } - if (server.config.silentNonServerErrors) logger debug messageGetter else logger error messageGetter - onDisconnect(true) + internal fun failAndDisconnectBecauseOfClient(throwable: Throwable) = + failAndDisconnectBecauseOfClient("${throwable::class.java.name}: ${throwable.message}", throwable) + + internal fun failAndDisconnectBecauseOfClient(message: String, throwable: Throwable? = null) { + val start = "The client caused a connection error" + + if (server.config.ignoreClientCausedErrors) { + logger debug "The client caused a connection error: $message" + // Does not send the Disconnect packet + channel.close() + onDisconnect(true) + } else { + if (throwable == null) logger.error("$start: $message") + else logger.error(start, throwable) + + scope.launch { + // Sends the Disconnect packet + disconnect(internalReason = message) + } + } } fun enableEncryptionCodec(key: SecretKey) { @@ -217,15 +247,13 @@ class BlokkSession(private val channel: io.netty.channel.Channel, val server: Bl .addAfter("framing", "compression", CompressionCodec(server.config.packetCompressionThreshold)) } - override suspend fun send(packet: OutgoingPacket) { - if (state is State.Disconnected) throw IllegalStateException("The session is not active anymore") - + override suspend fun send(packet: OutgoingPacket) = withContext(coroutineContext) { logger.trace { "Sending packet: $packet" } - server.eventBus.emit(PacketSendEvent(this, packet)).ifNotCancelled { + server.eventBus.emit(PacketSendEvent(this@BlokkSession, packet)).ifNotCancelled { try { channel.writeAndFlush(it.packet).awaitSuspending() } catch (t: Throwable) { - if (!channel.isActive) return + if (!channel.isActive) return@withContext logger.error("Sending packet failed", t) } } diff --git a/blokk-server/src/main/kotlin/space/blokk/net/LoginAndJoinProcedure.kt b/blokk-server/src/main/kotlin/space/blokk/net/LoginAndJoinProcedure.kt index c15a814..1e34dd3 100644 --- a/blokk-server/src/main/kotlin/space/blokk/net/LoginAndJoinProcedure.kt +++ b/blokk-server/src/main/kotlin/space/blokk/net/LoginAndJoinProcedure.kt @@ -35,8 +35,6 @@ class LoginAndJoinProcedure(val session: BlokkSession) { val uuid = UUID.nameUUIDFromBytes("OfflinePlayer:${packet.username}".toByteArray()) session.send(LoginSuccessPacket(uuid, packet.username)) session.state = Session.State.LoginSucceeded(packet.username, uuid) - session.launchPacketsChannelWorker() - afterLogin() } } @@ -73,12 +71,12 @@ class LoginAndJoinProcedure(val session: BlokkSession) { session.send(LoginSuccessPacket(result.uuid, result.username)) session.state = Session.State.LoginSucceeded(result.username, result.uuid) - afterLogin() } private suspend fun afterLogin() { val state: Session.State.LoginSucceeded = session.state.getOrFail() + session.launchPacketsChannelConsumer() val event = session.server.eventBus.emit(SessionAfterLoginEvent(session)) val initialWorldAndLocation = event.initialWorldAndLocation diff --git a/blokk-server/src/main/kotlin/space/blokk/net/PacketMessageDuplexHandler.kt b/blokk-server/src/main/kotlin/space/blokk/net/PacketMessageDuplexHandler.kt index 377a92c..40afb14 100644 --- a/blokk-server/src/main/kotlin/space/blokk/net/PacketMessageDuplexHandler.kt +++ b/blokk-server/src/main/kotlin/space/blokk/net/PacketMessageDuplexHandler.kt @@ -4,12 +4,9 @@ import io.netty.buffer.ByteBuf import io.netty.channel.ChannelDuplexHandler import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelPromise -import io.netty.handler.codec.MessageToMessageCodec -import space.blokk.net.MinecraftProtocolDataTypes.readVarInt import space.blokk.net.MinecraftProtocolDataTypes.writeVarInt import space.blokk.net.packet.OutgoingPacket import space.blokk.net.packet.OutgoingPacketCodec -import java.io.IOException class PacketMessageDuplexHandler(private val session: BlokkSession) : ChannelDuplexHandler() { override fun channelActive(ctx: ChannelHandlerContext) { @@ -34,11 +31,10 @@ class PacketMessageDuplexHandler(private val session: BlokkSession) : ChannelDup } override fun channelRead(ctx: ChannelHandlerContext, msg: Any) { - // TODO: Check if the ByteBuf needs to be released - session.onPacketDataReceived(msg as ByteBuf) + session.onPacketDataReceived(msg as ByteBuf) // ByteBuf must be released by the session } override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { - session.failBecauseOfClient("${cause::class.java.name}: ${cause.message}") + session.logger.error("An uncaught exception occurred", Exception(cause)) } } diff --git a/blokk-server/src/main/kotlin/space/blokk/net/packet/login/EncryptionResponsePacketHandler.kt b/blokk-server/src/main/kotlin/space/blokk/net/packet/login/EncryptionResponsePacketHandler.kt index 713c8c7..df82acd 100644 --- a/blokk-server/src/main/kotlin/space/blokk/net/packet/login/EncryptionResponsePacketHandler.kt +++ b/blokk-server/src/main/kotlin/space/blokk/net/packet/login/EncryptionResponsePacketHandler.kt @@ -8,7 +8,7 @@ object EncryptionResponsePacketHandler : PacketReceivedEventHandler() try { session.joinProcedure.start(packet) } catch (e: IllegalStateException) { - session.failBecauseOfClient("Client sent LoginStartPacket when it was not allowed") + session.failAndDisconnectBecauseOfClient("Client sent LoginStartPacket when it was not allowed") } } } diff --git a/blokk-server/src/main/kotlin/space/blokk/net/packet/play/ClientSettingsPacketHandler.kt b/blokk-server/src/main/kotlin/space/blokk/net/packet/play/ClientSettingsPacketHandler.kt index 6c753ab..54dff5d 100644 --- a/blokk-server/src/main/kotlin/space/blokk/net/packet/play/ClientSettingsPacketHandler.kt +++ b/blokk-server/src/main/kotlin/space/blokk/net/packet/play/ClientSettingsPacketHandler.kt @@ -13,7 +13,7 @@ object ClientSettingsPacketHandler : PacketReceivedEventHandler