Archived
1
0
Fork 0

Fix weird session bugs

This commit is contained in:
Moritz Ruth 2021-01-02 00:24:17 +01:00
parent 1dbf679659
commit c090b1461a
No known key found for this signature in database
GPG key ID: AFD57E23E753841B
22 changed files with 205 additions and 167 deletions

View file

@ -2,4 +2,4 @@ package space.blokk.net
import java.util.concurrent.CancellationException import java.util.concurrent.CancellationException
class DisconnectedCancellationException(val reason: String? = null) : CancellationException("The connection was closed") class DisconnectedCancellationException : CancellationException("The player disconnected")

View file

@ -1,10 +1,8 @@
package space.blokk.net package space.blokk.net
import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBuf
import kotlinx.coroutines.CoroutineScope
import space.blokk.chat.TextComponent import space.blokk.chat.TextComponent
import space.blokk.event.EventBusWrapper import space.blokk.event.EventBusWrapper
import space.blokk.net.event.SessionEvent
import space.blokk.net.packet.OutgoingPacket import space.blokk.net.packet.OutgoingPacket
import space.blokk.net.packet.Protocol import space.blokk.net.packet.Protocol
import space.blokk.player.GameMode import space.blokk.player.GameMode
@ -83,7 +81,7 @@ abstract class Session {
class Joining(override val player: Player) : State(), WithPlayer class Joining(override val player: Player) : State(), WithPlayer
class Playing(override val player: Player) : State(), WithPlayer class Playing(override val player: Player) : State(), WithPlayer
class Disconnected(val reason: String?) : State() object Disconnected : State()
inline fun <reified T : State> getOrFail(): T = inline fun <reified T : State> getOrFail(): T =
if (this is T) this if (this is T) this
@ -99,12 +97,11 @@ abstract class Session {
* Closes the connection with the client. * Closes the connection with the client.
* *
* @param reason The message shown to the player. Only used if [currentProtocol] is `LOGIN` or `PLAY`. * @param reason The message shown to the player. Only used if [currentProtocol] is `LOGIN` or `PLAY`.
* @param loggableReason A short, loggable representation of [reason]. Not shown to the player, except * @param internalReason A short, loggable representation of [reason] only shown to the player in development mode.
* in development mode.
*/ */
abstract suspend fun disconnect( abstract suspend fun disconnect(
reason: TextComponent = TextComponent of "Disconnected.", reason: TextComponent = TextComponent of "Disconnected.",
loggableReason: String = reason.text internalReason: String? = null
) )
/** /**

View file

@ -0,0 +1,10 @@
package space.blokk.util
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.job
import space.blokk.Blokk
import kotlin.coroutines.CoroutineContext
fun CoroutineContext.supervisorChild(name: String) = this + CoroutineName(name) + SupervisorJob(this.job)

View file

@ -5,6 +5,7 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.SupervisorJob
import space.blokk.Blokk import space.blokk.Blokk
import space.blokk.player.Player import space.blokk.player.Player
import space.blokk.util.supervisorChild
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
import kotlin.math.floor import kotlin.math.floor
@ -41,8 +42,7 @@ abstract class Chunk(
* *
* Is cancelled when the chunk is unloaded. * Is cancelled when the chunk is unloaded.
*/ */
val coroutineContext: CoroutineContext = val coroutineContext: CoroutineContext = world.coroutineContext.supervisorChild(identifier)
world.coroutineContext + CoroutineName(identifier) + SupervisorJob(world.coroutineContext[Job])
/** /**
* A list of all players who have locked this chunk. * A list of all players who have locked this chunk.

View file

@ -1,8 +1,10 @@
package space.blokk.world package space.blokk.world
import kotlinx.coroutines.* import kotlinx.coroutines.*
import space.blokk.Blokk
import space.blokk.entity.Entity import space.blokk.entity.Entity
import space.blokk.util.newSingleThreadDispatcher import space.blokk.util.newSingleThreadDispatcher
import space.blokk.util.supervisorChild
import java.util.* import java.util.*
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
@ -18,8 +20,7 @@ abstract class World(val uuid: UUID) {
* *
* Is cancelled when the world is unloaded. * Is cancelled when the world is unloaded.
*/ */
val coroutineContext: CoroutineContext = val coroutineContext: CoroutineContext = Blokk.coroutineContext.supervisorChild(identifier) + threadExecutor
CoroutineName(identifier) + SupervisorJob() + threadExecutor
abstract val dimension: Dimension abstract val dimension: Dimension
abstract val loadedChunks: Map<Chunk.Key, Chunk> abstract val loadedChunks: Map<Chunk.Key, Chunk>

View file

@ -19,6 +19,7 @@ object PlayProtocol : Protocol(
PlayerInfoPacketCodec, PlayerInfoPacketCodec,
PlayerPositionAndLookPacketCodec, PlayerPositionAndLookPacketCodec,
ServerDifficultyPacketCodec, ServerDifficultyPacketCodec,
SetCompassTargetPacketCodec,
SetSelectedHotbarSlotPacketCodec, SetSelectedHotbarSlotPacketCodec,
TagsPacketCodec, TagsPacketCodec,
UpdateViewPositionPacketCodec UpdateViewPositionPacketCodec

View file

@ -3,10 +3,7 @@ package space.blokk
import com.sksamuel.hoplite.ConfigFilePropertySource import com.sksamuel.hoplite.ConfigFilePropertySource
import com.sksamuel.hoplite.ConfigLoader import com.sksamuel.hoplite.ConfigLoader
import com.sksamuel.hoplite.ConfigSource import com.sksamuel.hoplite.ConfigSource
import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.*
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.runBlocking
import space.blokk.command.Command import space.blokk.command.Command
import space.blokk.config.BlokkConfig import space.blokk.config.BlokkConfig
import space.blokk.event.BlokkEventBus import space.blokk.event.BlokkEventBus
@ -46,7 +43,14 @@ class BlokkServer internal constructor() : Server {
val x509EncodedPublicKey: ByteArray = EncryptionUtils.generateX509Key(keyPair.public).encoded val x509EncodedPublicKey: ByteArray = EncryptionUtils.generateX509Key(keyPair.public).encoded
override val coroutineContext: CoroutineContext = override val coroutineContext: CoroutineContext =
CoroutineName("Server") + Executors.newSingleThreadExecutor().asCoroutineDispatcher() + SupervisorJob() CoroutineName("Server") + SupervisorJob() +
Executors.newSingleThreadExecutor { r -> Thread(r, "server") }.asCoroutineDispatcher()
init {
coroutineContext.job.invokeOnCompletion {
println(it)
}
}
override val sessions by socketServer::sessions override val sessions by socketServer::sessions
override val players get() = sessions.mapNotNull { it.player } override val players get() = sessions.mapNotNull { it.player }

View file

@ -11,5 +11,6 @@ data class BlokkConfig(
val developmentMode: Boolean, val developmentMode: Boolean,
val minLogLevel: Logger.Level, val minLogLevel: Logger.Level,
val packetCompressionThreshold: Int, val packetCompressionThreshold: Int,
val timeout: Duration val timeout: Duration,
val packetsBufferSize: Int
) )

View file

@ -11,7 +11,7 @@ import kotlin.reflect.KClass
import kotlin.system.measureTimeMillis import kotlin.system.measureTimeMillis
class BlokkEventBus(private val developmentMode: Boolean) : EventBus() { class BlokkEventBus(private val developmentMode: Boolean) : EventBus() {
private val logger = Logger("EventBus", false) private val logger = Logger("EventBus")
/** /**
* Invokes all previously registered event handlers sorted by their handlerPosition. * Invokes all previously registered event handlers sorted by their handlerPosition.

View file

@ -1,31 +1,22 @@
package space.blokk.net package space.blokk.net
import io.netty.channel.Channel import io.netty.channel.*
import io.netty.channel.ChannelException
import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelOption
import io.netty.handler.timeout.IdleStateHandler import io.netty.handler.timeout.IdleStateHandler
import space.blokk.BlokkServer
class BlokkChannelInitializer(private val blokkSocketServer: BlokkSocketServer) : ChannelInitializer<Channel>() { class BlokkChannelInitializer(private val server: BlokkServer) : ChannelInitializer<Channel>() {
override fun initChannel(channel: Channel) { override fun initChannel(channel: Channel) {
if (!ipTOSFailed) try { if (!ipTOSFailed) try {
channel.config().setOption(ChannelOption.IP_TOS, 0x18) channel.config().setOption(ChannelOption.IP_TOS, 0x18)
} catch (e: ChannelException) { } catch (e: ChannelException) {
ipTOSFailed = true ipTOSFailed = true
blokkSocketServer.server.logger warn "Your OS does not support IP type of service" server.logger warn "Your OS does not support IP type of service"
} }
val session = BlokkSession(channel, blokkSocketServer.server) channel.pipeline().addLast("framing", FramingCodec(server))
channel.pipeline()
.addLast("idle", IdleStateHandler(20, 15, 0))
.addLast("framing", FramingCodec())
.addLast("packets", PacketMessageCodec(session))
.addLast("handler", PacketMessageHandler(session))
} }
companion object { companion object {
@Volatile
private var ipTOSFailed = false private var ipTOSFailed = false
} }
} }

View file

@ -1,47 +1,43 @@
package space.blokk.net package space.blokk.net
import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBuf
import io.netty.channel.Channel
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import space.blokk.BlokkServer import space.blokk.BlokkServer
import space.blokk.chat.ChatColor import space.blokk.chat.ChatColor
import space.blokk.chat.ChatComponent import space.blokk.chat.ChatComponent
import space.blokk.chat.TextComponent import space.blokk.chat.TextComponent
import space.blokk.event.* import space.blokk.event.*
import space.blokk.logging.Logger import space.blokk.logging.Logger
import space.blokk.net.Session.State import space.blokk.net.MinecraftProtocolDataTypes.readVarInt
import space.blokk.net.event.ClientBrandReceivedEvent
import space.blokk.net.event.PacketReceivedEvent import space.blokk.net.event.PacketReceivedEvent
import space.blokk.net.event.PacketSendEvent import space.blokk.net.event.PacketSendEvent
import space.blokk.net.event.SessionEvent
import space.blokk.net.packet.IncomingPacket import space.blokk.net.packet.IncomingPacket
import space.blokk.net.packet.OutgoingPacket import space.blokk.net.packet.OutgoingPacket
import space.blokk.net.packet.Packet
import space.blokk.net.packet.Protocol import space.blokk.net.packet.Protocol
import space.blokk.net.packet.handshaking.HandshakingProtocol import space.blokk.net.packet.handshaking.HandshakingProtocol
import space.blokk.net.packet.login.LoginProtocol import space.blokk.net.packet.login.LoginProtocol
import space.blokk.net.packet.play.IncomingKeepAlivePacket
import space.blokk.net.packet.play.OutgoingKeepAlivePacket import space.blokk.net.packet.play.OutgoingKeepAlivePacket
import space.blokk.net.packet.play.OutgoingPluginMessagePacket import space.blokk.net.packet.play.OutgoingPluginMessagePacket
import space.blokk.net.packet.play.PlayProtocol import space.blokk.net.packet.play.PlayProtocol
import space.blokk.net.packet.status.StatusProtocol import space.blokk.net.packet.status.StatusProtocol
import space.blokk.server.event.SessionInitializedEvent import space.blokk.server.event.SessionInitializedEvent
import space.blokk.util.awaitSuspending import space.blokk.util.awaitSuspending
import space.blokk.util.supervisorChild
import java.io.IOException
import java.net.InetAddress import java.net.InetAddress
import java.net.InetSocketAddress import java.net.InetSocketAddress
import javax.crypto.SecretKey import javax.crypto.SecretKey
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
import kotlin.properties.Delegates import kotlin.properties.Delegates
class BlokkSession(private val channel: Channel, val server: BlokkServer) : Session() { class BlokkSession(private val channel: io.netty.channel.Channel, val server: BlokkServer) : Session() {
override val address: InetAddress = (channel.remoteAddress() as InetSocketAddress).address override val address: InetAddress = (channel.remoteAddress() as InetSocketAddress).address
private val identifier = "BlokkSession(${address.hostAddress})" private val identifier = "BlokkSession(${address.hostAddress})" + System.identityHashCode(this)
val logger = Logger(identifier) val logger = Logger(identifier)
override val coroutineContext: CoroutineContext = override val coroutineContext: CoroutineContext = server.coroutineContext.supervisorChild(identifier)
CoroutineName(identifier) + Dispatchers.Unconfined + SupervisorJob()
val scope = CoroutineScope(coroutineContext) val scope = CoroutineScope(coroutineContext)
override var brand: String? = null override var brand: String? = null
@ -67,40 +63,96 @@ class BlokkSession(private val channel: Channel, val server: BlokkServer) : Sess
is State.Disconnected -> null is State.Disconnected -> null
} }
private var disconnectReason: String? = null private var packetsChannel: Channel<IncomingPacket>? = null
private val packetDataChannel: Channel<ByteBuf> = Channel(Channel.UNLIMITED)
override suspend fun disconnect(reason: TextComponent, loggableReason: String) { fun launchPacketsChannelWorker() {
fun getReason() = val channel = Channel<IncomingPacket>(Channel.UNLIMITED)
if (server.config.developmentMode) { packetsChannel = channel
scope.launch {
for (packet in channel) {
handlePacket(packet)
}
}
}
private suspend fun handlePacket(packet: IncomingPacket) {
logger.trace { "Packet received: $packet" }
server.eventBus.emit(PacketReceivedEvent(this@BlokkSession, packet)).ifNotCancelled {
SessionPacketReceivedEventHandler.handle(this@BlokkSession, packet)
}
}
init {
logger trace "Created"
scope.launch {
for (data in packetDataChannel) {
val packetID = data.readVarInt()
val codec = currentProtocol!!.incomingPacketCodecsByID[packetID]
if (codec == null) {
val message = "Client sent an unknown packet (ID: $packetID)"
if (server.config.developmentMode) {
logger warn "$message. This will cause the client to disconnect in production mode."
continue
} else throw IOException(message)
}
val packet = codec.decode(data)
if (packetsChannel != null) {
// The protocol will not change anymore, so we can safely decode all packets with the current one
if (!packetsChannel!!.offer(packet)) {
logger.warn("The packet buffer is full (size: ${server.config.packetsBufferSize})")
disconnect(internalReason = "You sent more packets than the packet buffer can handle")
}
} else {
// The protocol could change because of this packet, so we must wait
handlePacket(packet)
}
}
}
}
override suspend fun disconnect(reason: TextComponent, internalReason: String?) {
if (currentProtocol == LoginProtocol || currentProtocol == PlayProtocol) {
logger info "Disconnected." + (internalReason?.let { " Internal reason: $it" } ?: "")
val finalReason = if (server.config.developmentMode && internalReason != null) {
val additional = listOf<ChatComponent>( val additional = listOf<ChatComponent>(
TextComponent( TextComponent(
"\n\nLogged reason (only shown in development mode): \n", "\n\nLogged reason (only shown in development mode): \n",
color = ChatColor.GRAY color = ChatColor.GRAY
), ),
TextComponent.of(loggableReason) TextComponent of internalReason
) )
reason.copy(extra = reason.extra + additional) reason.copy(extra = reason.extra + additional)
} else reason } else reason
disconnectReason = loggableReason when (currentProtocol) {
LoginProtocol -> send(space.blokk.net.packet.login.DisconnectPacket(finalReason))
when (currentProtocol) { PlayProtocol -> send(space.blokk.net.packet.play.DisconnectPacket(finalReason))
LoginProtocol -> send(space.blokk.net.packet.login.DisconnectPacket(getReason())) }
PlayProtocol -> send(space.blokk.net.packet.play.DisconnectPacket(getReason()))
} }
onDisconnect(true)
channel.close() channel.close()
} }
var lastKeepAlivePacketTimestamp by Delegates.notNull<Long>() var lastKeepAlivePacketTimestamp by Delegates.notNull<Long>()
lateinit var keepAliveDisconnectJob: Job lateinit var keepAliveDisconnectJob: Job
fun scheduleKeepAlivePacket() { fun scheduleKeepAlivePacket(isFirst: Boolean = false) {
val timeSinceLastPacket = (System.currentTimeMillis() - lastKeepAlivePacketTimestamp).toInt()
scope.launch { scope.launch {
delay(KEEP_ALIVE_PACKET_INTERVAL.toLong() - timeSinceLastPacket) if (!isFirst) {
val timeSinceLastPacket = (System.currentTimeMillis() - lastKeepAlivePacketTimestamp).toInt()
delay(KEEP_ALIVE_PACKET_INTERVAL.toLong() - timeSinceLastPacket)
}
lastKeepAlivePacketTimestamp = System.currentTimeMillis() lastKeepAlivePacketTimestamp = System.currentTimeMillis()
send(OutgoingKeepAlivePacket(lastKeepAlivePacketTimestamp)) send(OutgoingKeepAlivePacket(lastKeepAlivePacketTimestamp))
@ -120,41 +172,35 @@ class BlokkSession(private val channel: Channel, val server: BlokkServer) : Sess
fun onConnect() = scope.launch { fun onConnect() = scope.launch {
logger trace "Connected" logger trace "Connected"
state = State.WaitingForHandshake
if (server.eventBus.emit(SessionInitializedEvent(this@BlokkSession)).cancelled) channel.close() if (server.eventBus.emit(SessionInitializedEvent(this@BlokkSession)).cancelled) channel.close()
else server.sessions.add(this@BlokkSession) else server.sessions.add(this@BlokkSession)
} }
fun onDisconnect() { fun onDisconnect(expected: Boolean) {
if (state is State.Disconnected) return if (state == State.Disconnected) return
val reason = disconnectReason if (!expected && currentProtocol != HandshakingProtocol && currentProtocol != StatusProtocol)
logger trace "The client disconnected unexpectedly"
logger.trace { val closeException = DisconnectedCancellationException()
val message = "Disconnected."
if (reason != null) "$message Reason: $reason"
else message
}
coroutineContext.cancel(DisconnectedCancellationException(reason))
state = State.Disconnected(reason)
state = State.Disconnected
server.sessions.remove(this) server.sessions.remove(this)
packetDataChannel.close(closeException)
packetsChannel?.close(closeException)
coroutineContext.cancel(closeException)
} }
fun onPacketReceived(packet: IncomingPacket) { fun onPacketDataReceived(packetData: ByteBuf) {
logger.trace { "Packet received: $packet" } packetDataChannel.offer(packetData)
scope.launch {
server.eventBus.emit(PacketReceivedEvent(this@BlokkSession, packet)).ifNotCancelled {
SessionPacketReceivedEventHandler.handle(this@BlokkSession, packet)
}
}
} }
fun failBecauseOfClient(message: String) { fun failBecauseOfClient(message: String) {
val messageGetter = { "A connection error caused by the client occurred: $message" } val messageGetter = { "The client caused a connection error: $message" }
if (server.config.silentNonServerErrors) logger debug messageGetter else logger error messageGetter if (server.config.silentNonServerErrors) logger debug messageGetter else logger error messageGetter
channel.close() onDisconnect(true)
onDisconnect()
} }
fun enableEncryptionCodec(key: SecretKey) { fun enableEncryptionCodec(key: SecretKey) {
@ -166,6 +212,7 @@ class BlokkSession(private val channel: Channel, val server: BlokkServer) : Sess
fun enableCompressionCodec() { fun enableCompressionCodec() {
logger trace "Enabling compression codec" logger trace "Enabling compression codec"
if (channel.pipeline().get("compression") != null) return if (channel.pipeline().get("compression") != null) return
channel.pipeline() channel.pipeline()
.addAfter("framing", "compression", CompressionCodec(server.config.packetCompressionThreshold)) .addAfter("framing", "compression", CompressionCodec(server.config.packetCompressionThreshold))
} }
@ -174,9 +221,9 @@ class BlokkSession(private val channel: Channel, val server: BlokkServer) : Sess
if (state is State.Disconnected) throw IllegalStateException("The session is not active anymore") if (state is State.Disconnected) throw IllegalStateException("The session is not active anymore")
logger.trace { "Sending packet: $packet" } logger.trace { "Sending packet: $packet" }
server.eventBus.emit(PacketSendEvent(this@BlokkSession, packet)).ifNotCancelled { server.eventBus.emit(PacketSendEvent(this, packet)).ifNotCancelled {
try { try {
channel.writeAndFlush(OutgoingPacketMessage(this@BlokkSession, it.packet)).awaitSuspending() channel.writeAndFlush(it.packet).awaitSuspending()
} catch (t: Throwable) { } catch (t: Throwable) {
if (!channel.isActive) return if (!channel.isActive) return
logger.error("Sending packet failed", t) logger.error("Sending packet failed", t)

View file

@ -14,9 +14,7 @@ import space.blokk.BlokkServer
import space.blokk.logging.Logger import space.blokk.logging.Logger
import java.util.concurrent.CopyOnWriteArraySet import java.util.concurrent.CopyOnWriteArraySet
class BlokkSocketServer(val server: BlokkServer) { class BlokkSocketServer(private val server: BlokkServer) {
private val logger = Logger("BlokkSocketServer")
private val bossGroup = createEventLoopGroup() private val bossGroup = createEventLoopGroup()
private val workerGroup = createEventLoopGroup() private val workerGroup = createEventLoopGroup()
private val bootstrap: ServerBootstrap = ServerBootstrap() private val bootstrap: ServerBootstrap = ServerBootstrap()
@ -30,7 +28,7 @@ class BlokkSocketServer(val server: BlokkServer) {
) )
.childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(BlokkChannelInitializer(this)) .childHandler(BlokkChannelInitializer(server))
internal val sessions: MutableSet<BlokkSession> = CopyOnWriteArraySet() internal val sessions: MutableSet<BlokkSession> = CopyOnWriteArraySet()

View file

@ -3,13 +3,23 @@ package space.blokk.net
import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.ByteToMessageCodec import io.netty.handler.codec.ByteToMessageCodec
import space.blokk.BlokkServer
import space.blokk.net.MinecraftProtocolDataTypes.readVarInt import space.blokk.net.MinecraftProtocolDataTypes.readVarInt
import space.blokk.net.MinecraftProtocolDataTypes.varIntReadable import space.blokk.net.MinecraftProtocolDataTypes.varIntReadable
import space.blokk.net.MinecraftProtocolDataTypes.writeVarInt import space.blokk.net.MinecraftProtocolDataTypes.writeVarInt
class FramingCodec : ByteToMessageCodec<ByteBuf>() { class FramingCodec(private val server: BlokkServer) : ByteToMessageCodec<ByteBuf>() {
private var currentLength: Int? = null private var currentLength: Int? = null
override fun channelActive(ctx: ChannelHandlerContext) {
val session = BlokkSession(ctx.channel(), server)
ctx.channel().pipeline()
.addLast("packets", PacketMessageDuplexHandler(session))
ctx.fireChannelActive()
}
override fun encode(ctx: ChannelHandlerContext, msg: ByteBuf, out: ByteBuf) { override fun encode(ctx: ChannelHandlerContext, msg: ByteBuf, out: ByteBuf) {
out.writeVarInt(msg.readableBytes()) out.writeVarInt(msg.readableBytes())
msg.readerIndex(0) msg.readerIndex(0)

View file

@ -35,6 +35,7 @@ class LoginAndJoinProcedure(val session: BlokkSession) {
val uuid = UUID.nameUUIDFromBytes("OfflinePlayer:${packet.username}".toByteArray()) val uuid = UUID.nameUUIDFromBytes("OfflinePlayer:${packet.username}".toByteArray())
session.send(LoginSuccessPacket(uuid, packet.username)) session.send(LoginSuccessPacket(uuid, packet.username))
session.state = Session.State.LoginSucceeded(packet.username, uuid) session.state = Session.State.LoginSucceeded(packet.username, uuid)
session.launchPacketsChannelWorker()
afterLogin() afterLogin()
} }
@ -86,7 +87,7 @@ class LoginAndJoinProcedure(val session: BlokkSession) {
event.cancelled -> session.disconnect() event.cancelled -> session.disconnect()
initialWorldAndLocation == null -> { initialWorldAndLocation == null -> {
session.logger warn "Could not join because no spawn location was set" session.logger warn "Could not join because no spawn location was set"
session.disconnect(loggableReason = "No spawn location set") session.disconnect(internalReason = "No spawn location set")
} }
else -> { else -> {
// TODO: Spawn the player entity // TODO: Spawn the player entity
@ -145,7 +146,7 @@ class LoginAndJoinProcedure(val session: BlokkSession) {
val event = session.server.eventBus.emit(PlayerInitializationEvent(session, settings)) val event = session.server.eventBus.emit(PlayerInitializationEvent(session, settings))
event.ifCancelled { event.ifCancelled {
session.disconnect(loggableReason = "PlayerInitializationEvent was cancelled") session.disconnect(internalReason = "PlayerInitializationEvent was cancelled")
return return
} }
@ -200,9 +201,7 @@ class LoginAndJoinProcedure(val session: BlokkSession) {
session.send(UpdateViewPositionPacket(Chunk.Key.from(player.location.toVoxelLocation()))) session.send(UpdateViewPositionPacket(Chunk.Key.from(player.location.toVoxelLocation())))
session.lastKeepAlivePacketTimestamp = System.currentTimeMillis()
session.scheduleKeepAlivePacket() session.scheduleKeepAlivePacket()
player.sendChunksAndLight() player.sendChunksAndLight()
// WorldBorder // WorldBorder

View file

@ -1,17 +0,0 @@
package space.blokk.net
import io.netty.buffer.ByteBuf
import space.blokk.net.packet.IncomingPacket
import space.blokk.net.packet.OutgoingPacket
import space.blokk.net.packet.OutgoingPacketCodec
import space.blokk.net.packet.Packet
abstract class PacketMessage<T : Packet>(val session: BlokkSession, val packet: T)
class OutgoingPacketMessage<T : OutgoingPacket>(session: BlokkSession, packet: T) : PacketMessage<T>(session, packet) {
@Suppress("UNCHECKED_CAST")
val packetCodec = session.currentProtocol!!.getCodecByType(packet::class) as OutgoingPacketCodec<T>
fun encodePacket(dst: ByteBuf) = packetCodec.encode(packet, dst)
}
class IncomingPacketMessage<T : IncomingPacket>(session: BlokkSession, packet: T) : PacketMessage<T>(session, packet)

View file

@ -1,44 +0,0 @@
package space.blokk.net
import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.MessageToMessageCodec
import space.blokk.net.MinecraftProtocolDataTypes.readVarInt
import space.blokk.net.MinecraftProtocolDataTypes.writeVarInt
import java.io.IOException
class PacketMessageCodec(private val session: BlokkSession) : MessageToMessageCodec<ByteBuf, PacketMessage<*>>() {
override fun channelActive(ctx: ChannelHandlerContext) {
session.onConnect()
}
override fun channelInactive(ctx: ChannelHandlerContext) {
session.onDisconnect()
}
override fun encode(ctx: ChannelHandlerContext, msg: PacketMessage<*>, out: MutableList<Any>) {
if (msg !is OutgoingPacketMessage<*>) throw UnsupportedOperationException("Only outgoing packets can get encoded")
val dst = ctx.alloc().buffer()
dst.writeVarInt(msg.packetCodec.id)
msg.encodePacket(dst)
out.add(dst)
}
override fun decode(ctx: ChannelHandlerContext, msg: ByteBuf, out: MutableList<Any>) {
val packetID = msg.readVarInt()
val codec = session.currentProtocol!!.incomingPacketCodecsByID[packetID]
if (codec == null) {
val message = "Client sent an unknown packet (ID: $packetID)"
if (session.server.config.developmentMode)
session.logger warn "$message. This will cause the client to disconnect in production mode."
else throw IOException(message)
} else out.add(IncomingPacketMessage(session, codec.decode(msg)))
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
session.failBecauseOfClient("${cause::class.java.name}: ${cause.message}")
}
}

View file

@ -0,0 +1,44 @@
package space.blokk.net
import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelDuplexHandler
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelPromise
import io.netty.handler.codec.MessageToMessageCodec
import space.blokk.net.MinecraftProtocolDataTypes.readVarInt
import space.blokk.net.MinecraftProtocolDataTypes.writeVarInt
import space.blokk.net.packet.OutgoingPacket
import space.blokk.net.packet.OutgoingPacketCodec
import java.io.IOException
class PacketMessageDuplexHandler(private val session: BlokkSession) : ChannelDuplexHandler() {
override fun channelActive(ctx: ChannelHandlerContext) {
session.onConnect()
}
override fun channelInactive(ctx: ChannelHandlerContext) {
session.onDisconnect(false)
}
override fun write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise) {
val packet = msg as OutgoingPacket
@Suppress("UNCHECKED_CAST")
val codec =
session.currentProtocol!!.getCodecByType(packet::class) as OutgoingPacketCodec<OutgoingPacket>
val dst = ctx.alloc().buffer()
dst.writeVarInt(codec.id)
codec.encode(packet, dst)
ctx.write(dst, promise)
}
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
// TODO: Check if the ByteBuf needs to be released
session.onPacketDataReceived(msg as ByteBuf)
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
session.failBecauseOfClient("${cause::class.java.name}: ${cause.message}")
}
}

View file

@ -1,13 +0,0 @@
package space.blokk.net
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler
import kotlinx.coroutines.launch
import space.blokk.net.event.PacketReceivedEvent
class PacketMessageHandler(private val session: BlokkSession) :
SimpleChannelInboundHandler<IncomingPacketMessage<*>>() {
override fun channelRead0(ctx: ChannelHandlerContext, msg: IncomingPacketMessage<*>) {
session.onPacketReceived(msg.packet)
}
}

View file

@ -10,7 +10,7 @@ object IncomingKeepAlivePacketHandler : PacketReceivedEventHandler<IncomingKeepA
session.keepAliveDisconnectJob.cancel() session.keepAliveDisconnectJob.cancel()
session.scheduleKeepAlivePacket() session.scheduleKeepAlivePacket()
} else { } else {
session.disconnect(loggableReason = "The ID of the last IncomingKeepAlive packet does not match the expected one.") session.disconnect(internalReason = "The ID of the last IncomingKeepAlive packet does not match the expected one.")
} }
} }
} }

View file

@ -10,14 +10,15 @@ object StatusProtocolHandler : ProtocolPacketReceivedEventHandler(mapOf(
val response = event.response val response = event.response
when { when {
event.cancelled -> session.disconnect(loggableReason = "The ServerListInfoRequestEvent was cancelled.") event.cancelled -> session.disconnect(internalReason = "The ServerListInfoRequestEvent was cancelled.")
response == null -> response == null ->
session.disconnect(loggableReason = "The response property of the ServerListInfoRequestEvent is null.") session.disconnect(internalReason = "The response property of the ServerListInfoRequestEvent is null.")
else -> session.send(ResponsePacket(response)) else -> session.send(ResponsePacket(response))
} }
}, },
PingPacket::class to PacketReceivedEventHandler.of<PingPacket> { session, packet -> PingPacket::class to PacketReceivedEventHandler.of<PingPacket> { session, packet ->
session.send(PongPacket(packet.payload)) session.send(PongPacket(packet.payload))
session.disconnect()
} }
)) ))

View file

@ -6,3 +6,4 @@ developmentMode: false
minLogLevel: INFO minLogLevel: INFO
packetCompressionThreshold: 256 packetCompressionThreshold: 256
timeout: 30s timeout: 30s
packetsBufferSize: 40

View file

@ -1,6 +1,9 @@
package space.blokk.testplugin package space.blokk.testplugin
import space.blokk.Blokk import space.blokk.Blokk
import space.blokk.chat.TextComponent
import space.blokk.net.ServerListInfo
import space.blokk.net.event.ServerListInfoRequestEvent
import space.blokk.net.event.SessionAfterLoginEvent import space.blokk.net.event.SessionAfterLoginEvent
import space.blokk.player.GameMode import space.blokk.player.GameMode
import space.blokk.plugin.Plugin import space.blokk.plugin.Plugin
@ -30,6 +33,10 @@ class TestPlugin: Plugin("Test", "1.0.0") {
world.getVoxel(VoxelLocation.of(-1, 2, -1)).block = CraftingTable() world.getVoxel(VoxelLocation.of(-1, 2, -1)).block = CraftingTable()
Blokk.eventBus.on<ServerListInfoRequestEvent> { event ->
event.response = ServerListInfo("1.16.4", 754, TextComponent of "Test", 10, 0, emptyList())
}
Blokk.eventBus.on<SessionAfterLoginEvent> { event -> Blokk.eventBus.on<SessionAfterLoginEvent> { event ->
event.initialWorldAndLocation = WorldAndLocationWithRotation( event.initialWorldAndLocation = WorldAndLocationWithRotation(
world, world,