diff --git a/blokk-api/src/main/kotlin/space/blokk/net/DisconnectedCancellationException.kt b/blokk-api/src/main/kotlin/space/blokk/net/DisconnectedCancellationException.kt index 1357ccc..951cdaf 100644 --- a/blokk-api/src/main/kotlin/space/blokk/net/DisconnectedCancellationException.kt +++ b/blokk-api/src/main/kotlin/space/blokk/net/DisconnectedCancellationException.kt @@ -2,4 +2,4 @@ package space.blokk.net import java.util.concurrent.CancellationException -class DisconnectedCancellationException(val reason: String? = null) : CancellationException("The connection was closed") +class DisconnectedCancellationException : CancellationException("The player disconnected") diff --git a/blokk-api/src/main/kotlin/space/blokk/net/Session.kt b/blokk-api/src/main/kotlin/space/blokk/net/Session.kt index 87571aa..40aa932 100644 --- a/blokk-api/src/main/kotlin/space/blokk/net/Session.kt +++ b/blokk-api/src/main/kotlin/space/blokk/net/Session.kt @@ -1,10 +1,8 @@ package space.blokk.net import io.netty.buffer.ByteBuf -import kotlinx.coroutines.CoroutineScope import space.blokk.chat.TextComponent import space.blokk.event.EventBusWrapper -import space.blokk.net.event.SessionEvent import space.blokk.net.packet.OutgoingPacket import space.blokk.net.packet.Protocol import space.blokk.player.GameMode @@ -83,7 +81,7 @@ abstract class Session { class Joining(override val player: Player) : State(), WithPlayer class Playing(override val player: Player) : State(), WithPlayer - class Disconnected(val reason: String?) : State() + object Disconnected : State() inline fun getOrFail(): T = if (this is T) this @@ -99,12 +97,11 @@ abstract class Session { * Closes the connection with the client. * * @param reason The message shown to the player. Only used if [currentProtocol] is `LOGIN` or `PLAY`. - * @param loggableReason A short, loggable representation of [reason]. Not shown to the player, except - * in development mode. + * @param internalReason A short, loggable representation of [reason] only shown to the player in development mode. */ abstract suspend fun disconnect( reason: TextComponent = TextComponent of "Disconnected.", - loggableReason: String = reason.text + internalReason: String? = null ) /** diff --git a/blokk-api/src/main/kotlin/space/blokk/util/CoroutineContextSupervisorChild.kt b/blokk-api/src/main/kotlin/space/blokk/util/CoroutineContextSupervisorChild.kt new file mode 100644 index 0000000..d7b55fe --- /dev/null +++ b/blokk-api/src/main/kotlin/space/blokk/util/CoroutineContextSupervisorChild.kt @@ -0,0 +1,10 @@ +package space.blokk.util + +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.job +import space.blokk.Blokk +import kotlin.coroutines.CoroutineContext + +fun CoroutineContext.supervisorChild(name: String) = this + CoroutineName(name) + SupervisorJob(this.job) diff --git a/blokk-api/src/main/kotlin/space/blokk/world/Chunk.kt b/blokk-api/src/main/kotlin/space/blokk/world/Chunk.kt index 2f2729e..05bb94c 100644 --- a/blokk-api/src/main/kotlin/space/blokk/world/Chunk.kt +++ b/blokk-api/src/main/kotlin/space/blokk/world/Chunk.kt @@ -5,6 +5,7 @@ import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import space.blokk.Blokk import space.blokk.player.Player +import space.blokk.util.supervisorChild import kotlin.coroutines.CoroutineContext import kotlin.math.floor @@ -41,8 +42,7 @@ abstract class Chunk( * * Is cancelled when the chunk is unloaded. */ - val coroutineContext: CoroutineContext = - world.coroutineContext + CoroutineName(identifier) + SupervisorJob(world.coroutineContext[Job]) + val coroutineContext: CoroutineContext = world.coroutineContext.supervisorChild(identifier) /** * A list of all players who have locked this chunk. diff --git a/blokk-api/src/main/kotlin/space/blokk/world/World.kt b/blokk-api/src/main/kotlin/space/blokk/world/World.kt index d10e271..0d56a0f 100644 --- a/blokk-api/src/main/kotlin/space/blokk/world/World.kt +++ b/blokk-api/src/main/kotlin/space/blokk/world/World.kt @@ -1,8 +1,10 @@ package space.blokk.world import kotlinx.coroutines.* +import space.blokk.Blokk import space.blokk.entity.Entity import space.blokk.util.newSingleThreadDispatcher +import space.blokk.util.supervisorChild import java.util.* import kotlin.coroutines.CoroutineContext @@ -18,8 +20,7 @@ abstract class World(val uuid: UUID) { * * Is cancelled when the world is unloaded. */ - val coroutineContext: CoroutineContext = - CoroutineName(identifier) + SupervisorJob() + threadExecutor + val coroutineContext: CoroutineContext = Blokk.coroutineContext.supervisorChild(identifier) + threadExecutor abstract val dimension: Dimension abstract val loadedChunks: Map diff --git a/blokk-packet-codecs/src/main/kotlin/space/blokk/net/packet/play/PlayProtocol.kt b/blokk-packet-codecs/src/main/kotlin/space/blokk/net/packet/play/PlayProtocol.kt index 978fde7..102b141 100644 --- a/blokk-packet-codecs/src/main/kotlin/space/blokk/net/packet/play/PlayProtocol.kt +++ b/blokk-packet-codecs/src/main/kotlin/space/blokk/net/packet/play/PlayProtocol.kt @@ -19,6 +19,7 @@ object PlayProtocol : Protocol( PlayerInfoPacketCodec, PlayerPositionAndLookPacketCodec, ServerDifficultyPacketCodec, + SetCompassTargetPacketCodec, SetSelectedHotbarSlotPacketCodec, TagsPacketCodec, UpdateViewPositionPacketCodec diff --git a/blokk-server/src/main/kotlin/space/blokk/BlokkServer.kt b/blokk-server/src/main/kotlin/space/blokk/BlokkServer.kt index 9e57f22..b02969d 100644 --- a/blokk-server/src/main/kotlin/space/blokk/BlokkServer.kt +++ b/blokk-server/src/main/kotlin/space/blokk/BlokkServer.kt @@ -3,10 +3,7 @@ package space.blokk import com.sksamuel.hoplite.ConfigFilePropertySource import com.sksamuel.hoplite.ConfigLoader import com.sksamuel.hoplite.ConfigSource -import kotlinx.coroutines.CoroutineName -import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.asCoroutineDispatcher -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.* import space.blokk.command.Command import space.blokk.config.BlokkConfig import space.blokk.event.BlokkEventBus @@ -46,7 +43,14 @@ class BlokkServer internal constructor() : Server { val x509EncodedPublicKey: ByteArray = EncryptionUtils.generateX509Key(keyPair.public).encoded override val coroutineContext: CoroutineContext = - CoroutineName("Server") + Executors.newSingleThreadExecutor().asCoroutineDispatcher() + SupervisorJob() + CoroutineName("Server") + SupervisorJob() + + Executors.newSingleThreadExecutor { r -> Thread(r, "server") }.asCoroutineDispatcher() + + init { + coroutineContext.job.invokeOnCompletion { + println(it) + } + } override val sessions by socketServer::sessions override val players get() = sessions.mapNotNull { it.player } diff --git a/blokk-server/src/main/kotlin/space/blokk/config/BlokkConfig.kt b/blokk-server/src/main/kotlin/space/blokk/config/BlokkConfig.kt index e360969..fc59111 100644 --- a/blokk-server/src/main/kotlin/space/blokk/config/BlokkConfig.kt +++ b/blokk-server/src/main/kotlin/space/blokk/config/BlokkConfig.kt @@ -11,5 +11,6 @@ data class BlokkConfig( val developmentMode: Boolean, val minLogLevel: Logger.Level, val packetCompressionThreshold: Int, - val timeout: Duration + val timeout: Duration, + val packetsBufferSize: Int ) diff --git a/blokk-server/src/main/kotlin/space/blokk/event/BlokkEventBus.kt b/blokk-server/src/main/kotlin/space/blokk/event/BlokkEventBus.kt index 9f93199..f893335 100644 --- a/blokk-server/src/main/kotlin/space/blokk/event/BlokkEventBus.kt +++ b/blokk-server/src/main/kotlin/space/blokk/event/BlokkEventBus.kt @@ -11,7 +11,7 @@ import kotlin.reflect.KClass import kotlin.system.measureTimeMillis class BlokkEventBus(private val developmentMode: Boolean) : EventBus() { - private val logger = Logger("EventBus", false) + private val logger = Logger("EventBus") /** * Invokes all previously registered event handlers sorted by their handlerPosition. diff --git a/blokk-server/src/main/kotlin/space/blokk/net/BlokkChannelInitializer.kt b/blokk-server/src/main/kotlin/space/blokk/net/BlokkChannelInitializer.kt index 9dba5a9..d24e4ee 100644 --- a/blokk-server/src/main/kotlin/space/blokk/net/BlokkChannelInitializer.kt +++ b/blokk-server/src/main/kotlin/space/blokk/net/BlokkChannelInitializer.kt @@ -1,31 +1,22 @@ package space.blokk.net -import io.netty.channel.Channel -import io.netty.channel.ChannelException -import io.netty.channel.ChannelInitializer -import io.netty.channel.ChannelOption +import io.netty.channel.* import io.netty.handler.timeout.IdleStateHandler +import space.blokk.BlokkServer -class BlokkChannelInitializer(private val blokkSocketServer: BlokkSocketServer) : ChannelInitializer() { +class BlokkChannelInitializer(private val server: BlokkServer) : ChannelInitializer() { override fun initChannel(channel: Channel) { if (!ipTOSFailed) try { channel.config().setOption(ChannelOption.IP_TOS, 0x18) } catch (e: ChannelException) { ipTOSFailed = true - blokkSocketServer.server.logger warn "Your OS does not support IP type of service" + server.logger warn "Your OS does not support IP type of service" } - val session = BlokkSession(channel, blokkSocketServer.server) - - channel.pipeline() - .addLast("idle", IdleStateHandler(20, 15, 0)) - .addLast("framing", FramingCodec()) - .addLast("packets", PacketMessageCodec(session)) - .addLast("handler", PacketMessageHandler(session)) + channel.pipeline().addLast("framing", FramingCodec(server)) } companion object { - @Volatile private var ipTOSFailed = false } } diff --git a/blokk-server/src/main/kotlin/space/blokk/net/BlokkSession.kt b/blokk-server/src/main/kotlin/space/blokk/net/BlokkSession.kt index acb7595..ad0e7c7 100644 --- a/blokk-server/src/main/kotlin/space/blokk/net/BlokkSession.kt +++ b/blokk-server/src/main/kotlin/space/blokk/net/BlokkSession.kt @@ -1,47 +1,43 @@ package space.blokk.net import io.netty.buffer.ByteBuf -import io.netty.channel.Channel import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel import space.blokk.BlokkServer import space.blokk.chat.ChatColor import space.blokk.chat.ChatComponent import space.blokk.chat.TextComponent import space.blokk.event.* import space.blokk.logging.Logger -import space.blokk.net.Session.State -import space.blokk.net.event.ClientBrandReceivedEvent +import space.blokk.net.MinecraftProtocolDataTypes.readVarInt import space.blokk.net.event.PacketReceivedEvent import space.blokk.net.event.PacketSendEvent -import space.blokk.net.event.SessionEvent import space.blokk.net.packet.IncomingPacket import space.blokk.net.packet.OutgoingPacket -import space.blokk.net.packet.Packet import space.blokk.net.packet.Protocol import space.blokk.net.packet.handshaking.HandshakingProtocol import space.blokk.net.packet.login.LoginProtocol -import space.blokk.net.packet.play.IncomingKeepAlivePacket import space.blokk.net.packet.play.OutgoingKeepAlivePacket import space.blokk.net.packet.play.OutgoingPluginMessagePacket import space.blokk.net.packet.play.PlayProtocol import space.blokk.net.packet.status.StatusProtocol import space.blokk.server.event.SessionInitializedEvent import space.blokk.util.awaitSuspending +import space.blokk.util.supervisorChild +import java.io.IOException import java.net.InetAddress import java.net.InetSocketAddress import javax.crypto.SecretKey import kotlin.coroutines.CoroutineContext import kotlin.properties.Delegates -class BlokkSession(private val channel: Channel, val server: BlokkServer) : Session() { +class BlokkSession(private val channel: io.netty.channel.Channel, val server: BlokkServer) : Session() { override val address: InetAddress = (channel.remoteAddress() as InetSocketAddress).address - private val identifier = "BlokkSession(${address.hostAddress})" + private val identifier = "BlokkSession(${address.hostAddress})" + System.identityHashCode(this) val logger = Logger(identifier) - override val coroutineContext: CoroutineContext = - CoroutineName(identifier) + Dispatchers.Unconfined + SupervisorJob() - + override val coroutineContext: CoroutineContext = server.coroutineContext.supervisorChild(identifier) val scope = CoroutineScope(coroutineContext) override var brand: String? = null @@ -67,40 +63,96 @@ class BlokkSession(private val channel: Channel, val server: BlokkServer) : Sess is State.Disconnected -> null } - private var disconnectReason: String? = null + private var packetsChannel: Channel? = null + private val packetDataChannel: Channel = Channel(Channel.UNLIMITED) - override suspend fun disconnect(reason: TextComponent, loggableReason: String) { - fun getReason() = - if (server.config.developmentMode) { + fun launchPacketsChannelWorker() { + val channel = Channel(Channel.UNLIMITED) + packetsChannel = channel + + scope.launch { + for (packet in channel) { + handlePacket(packet) + } + } + } + + private suspend fun handlePacket(packet: IncomingPacket) { + logger.trace { "Packet received: $packet" } + + server.eventBus.emit(PacketReceivedEvent(this@BlokkSession, packet)).ifNotCancelled { + SessionPacketReceivedEventHandler.handle(this@BlokkSession, packet) + } + } + + init { + logger trace "Created" + + scope.launch { + for (data in packetDataChannel) { + val packetID = data.readVarInt() + val codec = currentProtocol!!.incomingPacketCodecsByID[packetID] + + if (codec == null) { + val message = "Client sent an unknown packet (ID: $packetID)" + + if (server.config.developmentMode) { + logger warn "$message. This will cause the client to disconnect in production mode." + continue + } else throw IOException(message) + } + + val packet = codec.decode(data) + + if (packetsChannel != null) { + // The protocol will not change anymore, so we can safely decode all packets with the current one + if (!packetsChannel!!.offer(packet)) { + logger.warn("The packet buffer is full (size: ${server.config.packetsBufferSize})") + disconnect(internalReason = "You sent more packets than the packet buffer can handle") + } + } else { + // The protocol could change because of this packet, so we must wait + handlePacket(packet) + } + } + } + } + + override suspend fun disconnect(reason: TextComponent, internalReason: String?) { + if (currentProtocol == LoginProtocol || currentProtocol == PlayProtocol) { + logger info "Disconnected." + (internalReason?.let { " Internal reason: $it" } ?: "") + + val finalReason = if (server.config.developmentMode && internalReason != null) { val additional = listOf( TextComponent( "\n\nLogged reason (only shown in development mode): \n", color = ChatColor.GRAY ), - TextComponent.of(loggableReason) + TextComponent of internalReason ) reason.copy(extra = reason.extra + additional) } else reason - disconnectReason = loggableReason - - when (currentProtocol) { - LoginProtocol -> send(space.blokk.net.packet.login.DisconnectPacket(getReason())) - PlayProtocol -> send(space.blokk.net.packet.play.DisconnectPacket(getReason())) + when (currentProtocol) { + LoginProtocol -> send(space.blokk.net.packet.login.DisconnectPacket(finalReason)) + PlayProtocol -> send(space.blokk.net.packet.play.DisconnectPacket(finalReason)) + } } + onDisconnect(true) channel.close() } var lastKeepAlivePacketTimestamp by Delegates.notNull() lateinit var keepAliveDisconnectJob: Job - fun scheduleKeepAlivePacket() { - val timeSinceLastPacket = (System.currentTimeMillis() - lastKeepAlivePacketTimestamp).toInt() - + fun scheduleKeepAlivePacket(isFirst: Boolean = false) { scope.launch { - delay(KEEP_ALIVE_PACKET_INTERVAL.toLong() - timeSinceLastPacket) + if (!isFirst) { + val timeSinceLastPacket = (System.currentTimeMillis() - lastKeepAlivePacketTimestamp).toInt() + delay(KEEP_ALIVE_PACKET_INTERVAL.toLong() - timeSinceLastPacket) + } lastKeepAlivePacketTimestamp = System.currentTimeMillis() send(OutgoingKeepAlivePacket(lastKeepAlivePacketTimestamp)) @@ -120,41 +172,35 @@ class BlokkSession(private val channel: Channel, val server: BlokkServer) : Sess fun onConnect() = scope.launch { logger trace "Connected" + state = State.WaitingForHandshake if (server.eventBus.emit(SessionInitializedEvent(this@BlokkSession)).cancelled) channel.close() else server.sessions.add(this@BlokkSession) } - fun onDisconnect() { - if (state is State.Disconnected) return - val reason = disconnectReason + fun onDisconnect(expected: Boolean) { + if (state == State.Disconnected) return + if (!expected && currentProtocol != HandshakingProtocol && currentProtocol != StatusProtocol) + logger trace "The client disconnected unexpectedly" - logger.trace { - val message = "Disconnected." - if (reason != null) "$message Reason: $reason" - else message - } - - coroutineContext.cancel(DisconnectedCancellationException(reason)) - state = State.Disconnected(reason) + val closeException = DisconnectedCancellationException() + state = State.Disconnected server.sessions.remove(this) + + packetDataChannel.close(closeException) + packetsChannel?.close(closeException) + + coroutineContext.cancel(closeException) } - fun onPacketReceived(packet: IncomingPacket) { - logger.trace { "Packet received: $packet" } - - scope.launch { - server.eventBus.emit(PacketReceivedEvent(this@BlokkSession, packet)).ifNotCancelled { - SessionPacketReceivedEventHandler.handle(this@BlokkSession, packet) - } - } + fun onPacketDataReceived(packetData: ByteBuf) { + packetDataChannel.offer(packetData) } fun failBecauseOfClient(message: String) { - val messageGetter = { "A connection error caused by the client occurred: $message" } + val messageGetter = { "The client caused a connection error: $message" } if (server.config.silentNonServerErrors) logger debug messageGetter else logger error messageGetter - channel.close() - onDisconnect() + onDisconnect(true) } fun enableEncryptionCodec(key: SecretKey) { @@ -166,6 +212,7 @@ class BlokkSession(private val channel: Channel, val server: BlokkServer) : Sess fun enableCompressionCodec() { logger trace "Enabling compression codec" if (channel.pipeline().get("compression") != null) return + channel.pipeline() .addAfter("framing", "compression", CompressionCodec(server.config.packetCompressionThreshold)) } @@ -174,9 +221,9 @@ class BlokkSession(private val channel: Channel, val server: BlokkServer) : Sess if (state is State.Disconnected) throw IllegalStateException("The session is not active anymore") logger.trace { "Sending packet: $packet" } - server.eventBus.emit(PacketSendEvent(this@BlokkSession, packet)).ifNotCancelled { + server.eventBus.emit(PacketSendEvent(this, packet)).ifNotCancelled { try { - channel.writeAndFlush(OutgoingPacketMessage(this@BlokkSession, it.packet)).awaitSuspending() + channel.writeAndFlush(it.packet).awaitSuspending() } catch (t: Throwable) { if (!channel.isActive) return logger.error("Sending packet failed", t) diff --git a/blokk-server/src/main/kotlin/space/blokk/net/BlokkSocketServer.kt b/blokk-server/src/main/kotlin/space/blokk/net/BlokkSocketServer.kt index d8d0801..cd6f86d 100644 --- a/blokk-server/src/main/kotlin/space/blokk/net/BlokkSocketServer.kt +++ b/blokk-server/src/main/kotlin/space/blokk/net/BlokkSocketServer.kt @@ -14,9 +14,7 @@ import space.blokk.BlokkServer import space.blokk.logging.Logger import java.util.concurrent.CopyOnWriteArraySet -class BlokkSocketServer(val server: BlokkServer) { - private val logger = Logger("BlokkSocketServer") - +class BlokkSocketServer(private val server: BlokkServer) { private val bossGroup = createEventLoopGroup() private val workerGroup = createEventLoopGroup() private val bootstrap: ServerBootstrap = ServerBootstrap() @@ -30,7 +28,7 @@ class BlokkSocketServer(val server: BlokkServer) { ) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true) - .childHandler(BlokkChannelInitializer(this)) + .childHandler(BlokkChannelInitializer(server)) internal val sessions: MutableSet = CopyOnWriteArraySet() diff --git a/blokk-server/src/main/kotlin/space/blokk/net/FramingCodec.kt b/blokk-server/src/main/kotlin/space/blokk/net/FramingCodec.kt index 3ed65a8..9f6585f 100644 --- a/blokk-server/src/main/kotlin/space/blokk/net/FramingCodec.kt +++ b/blokk-server/src/main/kotlin/space/blokk/net/FramingCodec.kt @@ -3,13 +3,23 @@ package space.blokk.net import io.netty.buffer.ByteBuf import io.netty.channel.ChannelHandlerContext import io.netty.handler.codec.ByteToMessageCodec +import space.blokk.BlokkServer import space.blokk.net.MinecraftProtocolDataTypes.readVarInt import space.blokk.net.MinecraftProtocolDataTypes.varIntReadable import space.blokk.net.MinecraftProtocolDataTypes.writeVarInt -class FramingCodec : ByteToMessageCodec() { +class FramingCodec(private val server: BlokkServer) : ByteToMessageCodec() { private var currentLength: Int? = null + override fun channelActive(ctx: ChannelHandlerContext) { + val session = BlokkSession(ctx.channel(), server) + + ctx.channel().pipeline() + .addLast("packets", PacketMessageDuplexHandler(session)) + + ctx.fireChannelActive() + } + override fun encode(ctx: ChannelHandlerContext, msg: ByteBuf, out: ByteBuf) { out.writeVarInt(msg.readableBytes()) msg.readerIndex(0) diff --git a/blokk-server/src/main/kotlin/space/blokk/net/LoginAndJoinProcedure.kt b/blokk-server/src/main/kotlin/space/blokk/net/LoginAndJoinProcedure.kt index 8ed2da2..0d4cc95 100644 --- a/blokk-server/src/main/kotlin/space/blokk/net/LoginAndJoinProcedure.kt +++ b/blokk-server/src/main/kotlin/space/blokk/net/LoginAndJoinProcedure.kt @@ -35,6 +35,7 @@ class LoginAndJoinProcedure(val session: BlokkSession) { val uuid = UUID.nameUUIDFromBytes("OfflinePlayer:${packet.username}".toByteArray()) session.send(LoginSuccessPacket(uuid, packet.username)) session.state = Session.State.LoginSucceeded(packet.username, uuid) + session.launchPacketsChannelWorker() afterLogin() } @@ -86,7 +87,7 @@ class LoginAndJoinProcedure(val session: BlokkSession) { event.cancelled -> session.disconnect() initialWorldAndLocation == null -> { session.logger warn "Could not join because no spawn location was set" - session.disconnect(loggableReason = "No spawn location set") + session.disconnect(internalReason = "No spawn location set") } else -> { // TODO: Spawn the player entity @@ -145,7 +146,7 @@ class LoginAndJoinProcedure(val session: BlokkSession) { val event = session.server.eventBus.emit(PlayerInitializationEvent(session, settings)) event.ifCancelled { - session.disconnect(loggableReason = "PlayerInitializationEvent was cancelled") + session.disconnect(internalReason = "PlayerInitializationEvent was cancelled") return } @@ -200,9 +201,7 @@ class LoginAndJoinProcedure(val session: BlokkSession) { session.send(UpdateViewPositionPacket(Chunk.Key.from(player.location.toVoxelLocation()))) - session.lastKeepAlivePacketTimestamp = System.currentTimeMillis() session.scheduleKeepAlivePacket() - player.sendChunksAndLight() // WorldBorder diff --git a/blokk-server/src/main/kotlin/space/blokk/net/OutgoingPacketMessage.kt b/blokk-server/src/main/kotlin/space/blokk/net/OutgoingPacketMessage.kt deleted file mode 100644 index 2db6c1c..0000000 --- a/blokk-server/src/main/kotlin/space/blokk/net/OutgoingPacketMessage.kt +++ /dev/null @@ -1,17 +0,0 @@ -package space.blokk.net - -import io.netty.buffer.ByteBuf -import space.blokk.net.packet.IncomingPacket -import space.blokk.net.packet.OutgoingPacket -import space.blokk.net.packet.OutgoingPacketCodec -import space.blokk.net.packet.Packet - -abstract class PacketMessage(val session: BlokkSession, val packet: T) - -class OutgoingPacketMessage(session: BlokkSession, packet: T) : PacketMessage(session, packet) { - @Suppress("UNCHECKED_CAST") - val packetCodec = session.currentProtocol!!.getCodecByType(packet::class) as OutgoingPacketCodec - fun encodePacket(dst: ByteBuf) = packetCodec.encode(packet, dst) -} - -class IncomingPacketMessage(session: BlokkSession, packet: T) : PacketMessage(session, packet) diff --git a/blokk-server/src/main/kotlin/space/blokk/net/PacketMessageCodec.kt b/blokk-server/src/main/kotlin/space/blokk/net/PacketMessageCodec.kt deleted file mode 100644 index 18cf40a..0000000 --- a/blokk-server/src/main/kotlin/space/blokk/net/PacketMessageCodec.kt +++ /dev/null @@ -1,44 +0,0 @@ -package space.blokk.net - -import io.netty.buffer.ByteBuf -import io.netty.channel.ChannelHandlerContext -import io.netty.handler.codec.MessageToMessageCodec -import space.blokk.net.MinecraftProtocolDataTypes.readVarInt -import space.blokk.net.MinecraftProtocolDataTypes.writeVarInt -import java.io.IOException - -class PacketMessageCodec(private val session: BlokkSession) : MessageToMessageCodec>() { - override fun channelActive(ctx: ChannelHandlerContext) { - session.onConnect() - } - - override fun channelInactive(ctx: ChannelHandlerContext) { - session.onDisconnect() - } - - override fun encode(ctx: ChannelHandlerContext, msg: PacketMessage<*>, out: MutableList) { - if (msg !is OutgoingPacketMessage<*>) throw UnsupportedOperationException("Only outgoing packets can get encoded") - - val dst = ctx.alloc().buffer() - dst.writeVarInt(msg.packetCodec.id) - msg.encodePacket(dst) - out.add(dst) - } - - override fun decode(ctx: ChannelHandlerContext, msg: ByteBuf, out: MutableList) { - val packetID = msg.readVarInt() - - val codec = session.currentProtocol!!.incomingPacketCodecsByID[packetID] - - if (codec == null) { - val message = "Client sent an unknown packet (ID: $packetID)" - if (session.server.config.developmentMode) - session.logger warn "$message. This will cause the client to disconnect in production mode." - else throw IOException(message) - } else out.add(IncomingPacketMessage(session, codec.decode(msg))) - } - - override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { - session.failBecauseOfClient("${cause::class.java.name}: ${cause.message}") - } -} diff --git a/blokk-server/src/main/kotlin/space/blokk/net/PacketMessageDuplexHandler.kt b/blokk-server/src/main/kotlin/space/blokk/net/PacketMessageDuplexHandler.kt new file mode 100644 index 0000000..377a92c --- /dev/null +++ b/blokk-server/src/main/kotlin/space/blokk/net/PacketMessageDuplexHandler.kt @@ -0,0 +1,44 @@ +package space.blokk.net + +import io.netty.buffer.ByteBuf +import io.netty.channel.ChannelDuplexHandler +import io.netty.channel.ChannelHandlerContext +import io.netty.channel.ChannelPromise +import io.netty.handler.codec.MessageToMessageCodec +import space.blokk.net.MinecraftProtocolDataTypes.readVarInt +import space.blokk.net.MinecraftProtocolDataTypes.writeVarInt +import space.blokk.net.packet.OutgoingPacket +import space.blokk.net.packet.OutgoingPacketCodec +import java.io.IOException + +class PacketMessageDuplexHandler(private val session: BlokkSession) : ChannelDuplexHandler() { + override fun channelActive(ctx: ChannelHandlerContext) { + session.onConnect() + } + + override fun channelInactive(ctx: ChannelHandlerContext) { + session.onDisconnect(false) + } + + override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise) { + val packet = msg as OutgoingPacket + + @Suppress("UNCHECKED_CAST") + val codec = + session.currentProtocol!!.getCodecByType(packet::class) as OutgoingPacketCodec + + val dst = ctx.alloc().buffer() + dst.writeVarInt(codec.id) + codec.encode(packet, dst) + ctx.write(dst, promise) + } + + override fun channelRead(ctx: ChannelHandlerContext, msg: Any) { + // TODO: Check if the ByteBuf needs to be released + session.onPacketDataReceived(msg as ByteBuf) + } + + override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { + session.failBecauseOfClient("${cause::class.java.name}: ${cause.message}") + } +} diff --git a/blokk-server/src/main/kotlin/space/blokk/net/PacketMessageHandler.kt b/blokk-server/src/main/kotlin/space/blokk/net/PacketMessageHandler.kt deleted file mode 100644 index c971239..0000000 --- a/blokk-server/src/main/kotlin/space/blokk/net/PacketMessageHandler.kt +++ /dev/null @@ -1,13 +0,0 @@ -package space.blokk.net - -import io.netty.channel.ChannelHandlerContext -import io.netty.channel.SimpleChannelInboundHandler -import kotlinx.coroutines.launch -import space.blokk.net.event.PacketReceivedEvent - -class PacketMessageHandler(private val session: BlokkSession) : - SimpleChannelInboundHandler>() { - override fun channelRead0(ctx: ChannelHandlerContext, msg: IncomingPacketMessage<*>) { - session.onPacketReceived(msg.packet) - } -} diff --git a/blokk-server/src/main/kotlin/space/blokk/net/packet/play/IncomingKeepAlivePacketHandler.kt b/blokk-server/src/main/kotlin/space/blokk/net/packet/play/IncomingKeepAlivePacketHandler.kt index b2cbd32..3961a09 100644 --- a/blokk-server/src/main/kotlin/space/blokk/net/packet/play/IncomingKeepAlivePacketHandler.kt +++ b/blokk-server/src/main/kotlin/space/blokk/net/packet/play/IncomingKeepAlivePacketHandler.kt @@ -10,7 +10,7 @@ object IncomingKeepAlivePacketHandler : PacketReceivedEventHandler session.disconnect(loggableReason = "The ServerListInfoRequestEvent was cancelled.") + event.cancelled -> session.disconnect(internalReason = "The ServerListInfoRequestEvent was cancelled.") response == null -> - session.disconnect(loggableReason = "The response property of the ServerListInfoRequestEvent is null.") + session.disconnect(internalReason = "The response property of the ServerListInfoRequestEvent is null.") else -> session.send(ResponsePacket(response)) } }, PingPacket::class to PacketReceivedEventHandler.of { session, packet -> session.send(PongPacket(packet.payload)) + session.disconnect() } )) diff --git a/blokk-server/src/main/resources/default-config.yml b/blokk-server/src/main/resources/default-config.yml index 9dfb5bd..460b1d7 100644 --- a/blokk-server/src/main/resources/default-config.yml +++ b/blokk-server/src/main/resources/default-config.yml @@ -6,3 +6,4 @@ developmentMode: false minLogLevel: INFO packetCompressionThreshold: 256 timeout: 30s +packetsBufferSize: 40 diff --git a/test-plugin/src/main/kotlin/space/blokk/testplugin/TestPlugin.kt b/test-plugin/src/main/kotlin/space/blokk/testplugin/TestPlugin.kt index 26a9803..a907fb2 100644 --- a/test-plugin/src/main/kotlin/space/blokk/testplugin/TestPlugin.kt +++ b/test-plugin/src/main/kotlin/space/blokk/testplugin/TestPlugin.kt @@ -1,6 +1,9 @@ package space.blokk.testplugin import space.blokk.Blokk +import space.blokk.chat.TextComponent +import space.blokk.net.ServerListInfo +import space.blokk.net.event.ServerListInfoRequestEvent import space.blokk.net.event.SessionAfterLoginEvent import space.blokk.player.GameMode import space.blokk.plugin.Plugin @@ -30,6 +33,10 @@ class TestPlugin: Plugin("Test", "1.0.0") { world.getVoxel(VoxelLocation.of(-1, 2, -1)).block = CraftingTable() + Blokk.eventBus.on { event -> + event.response = ServerListInfo("1.16.4", 754, TextComponent of "Test", 10, 0, emptyList()) + } + Blokk.eventBus.on { event -> event.initialWorldAndLocation = WorldAndLocationWithRotation( world,