Archived
1
0
Fork 0

Update Gradle, fix a ByteBuf leak and kick the client when a decoding error occurs

This commit is contained in:
Moritz Ruth 2021-01-02 22:31:31 +01:00
parent 44ff61bf03
commit 32103d695a
15 changed files with 130 additions and 121 deletions

View file

@ -3,15 +3,10 @@ package space.blokk.logging
import space.blokk.Blokk
class Logger(val name: String, private val printThreadName: Boolean = true) {
fun log(level: Level, message: String, throwable: Throwable? = null) =
private fun log(level: Level, message: String, throwable: Throwable? = null) =
Blokk.loggingOutputProvider.log(printThreadName, name, level, message, throwable)
fun error(msg: String, t: Throwable) {
if (Level.ERROR.isEnabled) {
error(msg)
t.printStackTrace()
}
}
fun error(message: String, throwable: Throwable) = log(Level.ERROR, message, throwable)
infix fun error(message: String) = log(Level.ERROR, message, null)
infix fun info(message: String) = log(Level.INFO, message, null)

View file

@ -8,12 +8,4 @@ import space.blokk.world.ChunkData
/**
* Sent to inform the player about blocks and biomes in a chunk.
*/
data class ChunkDataPacket(val key: Chunk.Key, val data: ChunkData) : OutgoingPacket() {
init {
if (key.x == 0 && key.z == 0) {
data.sections[0]?.forEach {
print(it)
}
}
}
}
data class ChunkDataPacket(val key: Chunk.Key, val data: ChunkData) : OutgoingPacket()

View file

@ -46,12 +46,6 @@ class BlokkServer internal constructor() : Server {
CoroutineName("Server") + SupervisorJob() +
Executors.newSingleThreadExecutor { r -> Thread(r, "server") }.asCoroutineDispatcher()
init {
coroutineContext.job.invokeOnCompletion {
println(it)
}
}
override val sessions by socketServer::sessions
override val players get() = sessions.mapNotNull { it.player }

View file

@ -6,7 +6,7 @@ import java.time.Duration
data class BlokkConfig(
val port: Int,
val host: String,
val silentNonServerErrors: Boolean,
val ignoreClientCausedErrors: Boolean,
val authenticateAndEncrypt: Boolean,
val developmentMode: Boolean,
val minLogLevel: Logger.Level,

View file

@ -16,7 +16,6 @@ object BlokkLoggingOutputProvider : LoggingOutputProvider {
throwable: Throwable?
) {
if (!level.isEnabled) return
val time = Date()
val stream: PrintStream = if (level.isGreaterOrEqualThan(Logger.Level.ERROR)) System.err else System.out
@ -40,5 +39,6 @@ object BlokkLoggingOutputProvider : LoggingOutputProvider {
parts.add(message)
stream.println(parts.joinToString(" "))
throwable?.printStackTrace(stream)
}
}

View file

@ -1,6 +1,7 @@
package space.blokk.net
import io.netty.buffer.ByteBuf
import io.netty.util.ReferenceCountUtil
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import space.blokk.BlokkServer
@ -24,7 +25,6 @@ import space.blokk.net.packet.status.StatusProtocol
import space.blokk.server.event.SessionInitializedEvent
import space.blokk.util.awaitSuspending
import space.blokk.util.supervisorChild
import java.io.IOException
import java.net.InetAddress
import java.net.InetSocketAddress
import javax.crypto.SecretKey
@ -63,64 +63,11 @@ class BlokkSession(private val channel: io.netty.channel.Channel, val server: Bl
is State.Disconnected -> null
}
private var packetsChannel: Channel<IncomingPacket>? = null
private val packetDataChannel: Channel<ByteBuf> = Channel(Channel.UNLIMITED)
fun launchPacketsChannelWorker() {
val channel = Channel<IncomingPacket>(Channel.UNLIMITED)
packetsChannel = channel
scope.launch {
for (packet in channel) {
handlePacket(packet)
}
}
}
private suspend fun handlePacket(packet: IncomingPacket) {
logger.trace { "Packet received: $packet" }
server.eventBus.emit(PacketReceivedEvent(this@BlokkSession, packet)).ifNotCancelled {
SessionPacketReceivedEventHandler.handle(this@BlokkSession, packet)
}
}
init {
logger trace "Created"
scope.launch {
for (data in packetDataChannel) {
val packetID = data.readVarInt()
val codec = currentProtocol!!.incomingPacketCodecsByID[packetID]
if (codec == null) {
val message = "Client sent an unknown packet (ID: $packetID)"
if (server.config.developmentMode) {
logger warn "$message. This will cause the client to disconnect in production mode."
continue
} else throw IOException(message)
}
val packet = codec.decode(data)
if (packetsChannel != null) {
// The protocol will not change anymore, so we can safely decode all packets with the current one
if (!packetsChannel!!.offer(packet)) {
logger.warn("The packet buffer is full (size: ${server.config.packetsBufferSize})")
disconnect(internalReason = "You sent more packets than the packet buffer can handle")
}
} else {
// The protocol could change because of this packet, so we must wait
handlePacket(packet)
}
}
}
}
override suspend fun disconnect(reason: TextComponent, internalReason: String?) {
stopProcessingIncomingPackets()
if (currentProtocol == LoginProtocol || currentProtocol == PlayProtocol) {
logger info "Disconnected." + (internalReason?.let { " Internal reason: $it" } ?: "")
logger info "Disconnected" + (internalReason?.let { ". Internal reason: $it" } ?: "")
val finalReason = if (server.config.developmentMode && internalReason != null) {
val additional = listOf<ChatComponent>(
@ -138,12 +85,82 @@ class BlokkSession(private val channel: io.netty.channel.Channel, val server: Bl
LoginProtocol -> send(space.blokk.net.packet.login.DisconnectPacket(finalReason))
PlayProtocol -> send(space.blokk.net.packet.play.DisconnectPacket(finalReason))
}
}
} else logger trace "Disconnected"
onDisconnect(true)
channel.close()
}
private var packetsChannel: Channel<IncomingPacket>? = null
private val packetDataChannel: Channel<ByteBuf> = Channel(Channel.UNLIMITED)
fun launchPacketsChannelConsumer() {
val channel = Channel<IncomingPacket>(server.config.packetsBufferSize)
packetsChannel = channel
scope.launch {
for (packet in channel) {
handlePacket(packet)
}
}
}
private fun launchPacketDataChannelConsumer() {
scope.launch {
for (data in packetDataChannel) {
try {
handlePacketData(data)
} catch (e: Exception) {
failAndDisconnectBecauseOfClient(e)
}
}
}
}
private fun stopProcessingIncomingPackets() {
packetDataChannel.close()
packetsChannel?.close()
}
private suspend fun handlePacket(packet: IncomingPacket) {
logger.trace { "Packet received: $packet" }
server.eventBus.emit(PacketReceivedEvent(this@BlokkSession, packet)).ifNotCancelled {
SessionPacketReceivedEventHandler.handle(this@BlokkSession, packet)
}
}
private suspend fun handlePacketData(data: ByteBuf) {
val packetID = data.readVarInt()
val codec = currentProtocol!!.incomingPacketCodecsByID[packetID]
if (codec == null) {
val message = "Received an unknown packet (ID: $packetID)"
if (server.config.developmentMode)
logger warn "$message. This will cause the client to disconnect in production mode."
else failAndDisconnectBecauseOfClient(message)
return
}
val packet = codec.decode(data)
ReferenceCountUtil.release(data)
if (packetsChannel != null) {
// The protocol will not change anymore, so we can safely decode all pending packets with the current one
if (!packetsChannel!!.offer(packet)) {
logger.warn("The packet buffer is full (size: ${server.config.packetsBufferSize})")
disconnect(internalReason = "You sent more packets than the packet buffer can handle")
}
} else {
// The protocol could change because of this packet, so we must wait
handlePacket(packet)
}
}
val joinProcedure = LoginAndJoinProcedure(this)
var lastKeepAlivePacketTimestamp by Delegates.notNull<Long>()
lateinit var keepAliveDisconnectJob: Job
@ -168,39 +185,52 @@ class BlokkSession(private val channel: io.netty.channel.Channel, val server: Bl
}
}
val joinProcedure = LoginAndJoinProcedure(this)
fun onConnect() = scope.launch {
logger trace "Connected"
launchPacketDataChannelConsumer()
state = State.WaitingForHandshake
if (server.eventBus.emit(SessionInitializedEvent(this@BlokkSession)).cancelled) channel.close()
else server.sessions.add(this@BlokkSession)
}
fun onDisconnect(expected: Boolean) {
if (state == State.Disconnected) return
scope.launch {
if (state == State.Disconnected) return@launch
if (!expected && currentProtocol != HandshakingProtocol && currentProtocol != StatusProtocol)
logger trace "The client disconnected unexpectedly"
val closeException = DisconnectedCancellationException()
stopProcessingIncomingPackets()
coroutineContext.cancel(DisconnectedCancellationException())
state = State.Disconnected
server.sessions.remove(this)
packetDataChannel.close(closeException)
packetsChannel?.close(closeException)
coroutineContext.cancel(closeException)
server.sessions.remove(this@BlokkSession)
}
}
fun onPacketDataReceived(packetData: ByteBuf) {
packetDataChannel.offer(packetData)
}
fun failBecauseOfClient(message: String) {
val messageGetter = { "The client caused a connection error: $message" }
if (server.config.silentNonServerErrors) logger debug messageGetter else logger error messageGetter
internal fun failAndDisconnectBecauseOfClient(throwable: Throwable) =
failAndDisconnectBecauseOfClient("${throwable::class.java.name}: ${throwable.message}", throwable)
internal fun failAndDisconnectBecauseOfClient(message: String, throwable: Throwable? = null) {
val start = "The client caused a connection error"
if (server.config.ignoreClientCausedErrors) {
logger debug "The client caused a connection error: $message"
// Does not send the Disconnect packet
channel.close()
onDisconnect(true)
} else {
if (throwable == null) logger.error("$start: $message")
else logger.error(start, throwable)
scope.launch {
// Sends the Disconnect packet
disconnect(internalReason = message)
}
}
}
fun enableEncryptionCodec(key: SecretKey) {
@ -217,15 +247,13 @@ class BlokkSession(private val channel: io.netty.channel.Channel, val server: Bl
.addAfter("framing", "compression", CompressionCodec(server.config.packetCompressionThreshold))
}
override suspend fun send(packet: OutgoingPacket) {
if (state is State.Disconnected) throw IllegalStateException("The session is not active anymore")
override suspend fun send(packet: OutgoingPacket) = withContext<Unit>(coroutineContext) {
logger.trace { "Sending packet: $packet" }
server.eventBus.emit(PacketSendEvent(this, packet)).ifNotCancelled {
server.eventBus.emit(PacketSendEvent(this@BlokkSession, packet)).ifNotCancelled {
try {
channel.writeAndFlush(it.packet).awaitSuspending()
} catch (t: Throwable) {
if (!channel.isActive) return
if (!channel.isActive) return@withContext
logger.error("Sending packet failed", t)
}
}

View file

@ -35,8 +35,6 @@ class LoginAndJoinProcedure(val session: BlokkSession) {
val uuid = UUID.nameUUIDFromBytes("OfflinePlayer:${packet.username}".toByteArray())
session.send(LoginSuccessPacket(uuid, packet.username))
session.state = Session.State.LoginSucceeded(packet.username, uuid)
session.launchPacketsChannelWorker()
afterLogin()
}
}
@ -73,12 +71,12 @@ class LoginAndJoinProcedure(val session: BlokkSession) {
session.send(LoginSuccessPacket(result.uuid, result.username))
session.state = Session.State.LoginSucceeded(result.username, result.uuid)
afterLogin()
}
private suspend fun afterLogin() {
val state: Session.State.LoginSucceeded = session.state.getOrFail()
session.launchPacketsChannelConsumer()
val event = session.server.eventBus.emit(SessionAfterLoginEvent(session))
val initialWorldAndLocation = event.initialWorldAndLocation

View file

@ -4,12 +4,9 @@ import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelDuplexHandler
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelPromise
import io.netty.handler.codec.MessageToMessageCodec
import space.blokk.net.MinecraftProtocolDataTypes.readVarInt
import space.blokk.net.MinecraftProtocolDataTypes.writeVarInt
import space.blokk.net.packet.OutgoingPacket
import space.blokk.net.packet.OutgoingPacketCodec
import java.io.IOException
class PacketMessageDuplexHandler(private val session: BlokkSession) : ChannelDuplexHandler() {
override fun channelActive(ctx: ChannelHandlerContext) {
@ -34,11 +31,10 @@ class PacketMessageDuplexHandler(private val session: BlokkSession) : ChannelDup
}
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
// TODO: Check if the ByteBuf needs to be released
session.onPacketDataReceived(msg as ByteBuf)
session.onPacketDataReceived(msg as ByteBuf) // ByteBuf must be released by the session
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
session.failBecauseOfClient("${cause::class.java.name}: ${cause.message}")
session.logger.error("An uncaught exception occurred", Exception(cause))
}
}

View file

@ -8,7 +8,7 @@ object EncryptionResponsePacketHandler : PacketReceivedEventHandler<EncryptionRe
try {
session.joinProcedure.onEncryptionResponse(packet)
} catch (e: IllegalStateException) {
session.failBecauseOfClient("Client sent EncryptionResponsePacket when it was not allowed.")
session.failAndDisconnectBecauseOfClient("Client sent EncryptionResponsePacket when it was not allowed.")
}
}
}

View file

@ -8,7 +8,7 @@ object LoginStartPacketHandler : PacketReceivedEventHandler<LoginStartPacket>()
try {
session.joinProcedure.start(packet)
} catch (e: IllegalStateException) {
session.failBecauseOfClient("Client sent LoginStartPacket when it was not allowed")
session.failAndDisconnectBecauseOfClient("Client sent LoginStartPacket when it was not allowed")
}
}
}

View file

@ -13,7 +13,7 @@ object ClientSettingsPacketHandler : PacketReceivedEventHandler<ClientSettingsPa
try {
session.joinProcedure.onClientSettingsReceived(packet)
} catch (e: IllegalStateException) {
session.failBecauseOfClient("Client sent ClientSettingsPacket when it was not allowed")
session.failAndDisconnectBecauseOfClient("Client sent ClientSettingsPacket when it was not allowed")
}
} else {
(player as BlokkPlayer).settings =

View file

@ -13,7 +13,7 @@ object IncomingPluginMessagePacketHandler : PacketReceivedEventHandler<IncomingP
// TODO: Add API for registering plugin channels and using them
if (packet.channel == "minecraft:brand") {
if (session.brand != null) session.failBecauseOfClient("The client sent his brand name more than once")
if (session.brand != null) session.failAndDisconnectBecauseOfClient("The client sent his brand name more than once")
else {
val buffer: ByteBuf = Unpooled.copiedBuffer(packet.data)
val brand = buffer.readString()

View file

@ -1,5 +1,9 @@
package space.blokk.player
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.yield
import space.blokk.chat.TextComponent
import space.blokk.net.Session
import space.blokk.net.packet.play.ChunkDataPacket
@ -9,6 +13,7 @@ import space.blokk.world.LocationWithRotation
import space.blokk.world.VoxelLocation
import space.blokk.world.World
import java.util.*
import kotlin.coroutines.CoroutineContext
import kotlin.math.abs
class BlokkPlayer(
@ -66,6 +71,7 @@ class BlokkPlayer(
suspend fun sendChunksAndLight() {
val chunks = currentlyViewedChunks.sortedBy { abs(it.key.x) + abs(it.key.z) }
chunks.forEach { session.send(ChunkLightDataPacket(it.key, it.getLightData(this))) }
chunks.forEach { session.send(ChunkDataPacket(it.key, it.getData(this))) }
}
}

View file

@ -1,9 +1,9 @@
host: 0.0.0.0
port: 25565
silentNonServerErrors: true
ignoreClientCausedErrors: false
authenticateAndEncrypt: true
developmentMode: false
minLogLevel: INFO
packetCompressionThreshold: 256
timeout: 30s
packetsBufferSize: 40
packetsBufferSize: 1

View file

@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.3-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-6.7.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists