Make packets mergeable
This commit is contained in:
parent
fb047b22a3
commit
91e902ecd1
17 changed files with 228 additions and 126 deletions
|
@ -12,6 +12,7 @@ repositories {
|
||||||
|
|
||||||
dependencies {
|
dependencies {
|
||||||
implementation(project(":uranos-api"))
|
implementation(project(":uranos-api"))
|
||||||
|
implementation(project(":uranos-packets"))
|
||||||
}
|
}
|
||||||
|
|
||||||
tasks {
|
tasks {
|
||||||
|
|
|
@ -53,8 +53,10 @@ class TestPlugin: Plugin("Test", "1.0.0") {
|
||||||
.withRotation(0f, 0f)
|
.withRotation(0f, 0f)
|
||||||
.inside(world)
|
.inside(world)
|
||||||
|
|
||||||
Uranos.scheduler.executeAfter(secondsToTicks(10)) {
|
Uranos.scheduler.executeRepeating(secondsToTicks(1)) {
|
||||||
Uranos.players.first().playerListName = TextComponent("Test", true, color = ChatColor.BLUE)
|
for (player in Uranos.players) {
|
||||||
|
player.playerListName = TextComponent(player.name, color = ChatColor.NAMED_COLORS.values.random())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,6 @@ abstract class LivingEntity : Entity(), Mobile {
|
||||||
object DataStorageKeys {
|
object DataStorageKeys {
|
||||||
val position = createDataStorageKey("position") { entity: LivingEntity, value: Position ->
|
val position = createDataStorageKey("position") { entity: LivingEntity, value: Position ->
|
||||||
// TODO: Send the position to players
|
// TODO: Send the position to players
|
||||||
println(value)
|
|
||||||
null
|
null
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,8 @@
|
||||||
* Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE file
|
* Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE file
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
@file:Suppress("LeakingThis")
|
||||||
|
|
||||||
package space.uranos.entity
|
package space.uranos.entity
|
||||||
|
|
||||||
import space.uranos.Position
|
import space.uranos.Position
|
||||||
|
@ -24,7 +26,6 @@ open class PlayerEntity(
|
||||||
final override val type: EntityType = Type
|
final override val type: EntityType = Type
|
||||||
override var velocity: Vector = Vector.ZERO
|
override var velocity: Vector = Vector.ZERO
|
||||||
|
|
||||||
@Suppress("LeakingThis")
|
|
||||||
override val dataStorage: DataStorage<PlayerEntity> = DataStorage(this)
|
override val dataStorage: DataStorage<PlayerEntity> = DataStorage(this)
|
||||||
|
|
||||||
init {
|
init {
|
||||||
|
|
|
@ -117,6 +117,11 @@ abstract class Session {
|
||||||
*/
|
*/
|
||||||
abstract suspend fun send(packet: OutgoingPacket)
|
abstract suspend fun send(packet: OutgoingPacket)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sends a packet the next tick.
|
||||||
|
*/
|
||||||
|
abstract fun sendNextTick(packet: OutgoingPacket)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sends a plugin message packet.
|
* Sends a plugin message packet.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -10,4 +10,14 @@ sealed class Packet {
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract class IncomingPacket : Packet()
|
abstract class IncomingPacket : Packet()
|
||||||
|
|
||||||
|
interface Mergeable {
|
||||||
|
/**
|
||||||
|
* Merges this packet with [otherPacket] or returns null if the packets cannot be merged.
|
||||||
|
*
|
||||||
|
* If there are conflicts, the values from this packet should take precedence.
|
||||||
|
*/
|
||||||
|
fun mergeWith(otherPacket: OutgoingPacket): OutgoingPacket?
|
||||||
|
}
|
||||||
|
|
||||||
abstract class OutgoingPacket : Packet()
|
abstract class OutgoingPacket : Packet()
|
||||||
|
|
|
@ -6,6 +6,7 @@
|
||||||
package space.uranos.net.packet.play
|
package space.uranos.net.packet.play
|
||||||
|
|
||||||
import space.uranos.chat.TextComponent
|
import space.uranos.chat.TextComponent
|
||||||
|
import space.uranos.net.packet.Mergeable
|
||||||
import space.uranos.net.packet.OutgoingPacket
|
import space.uranos.net.packet.OutgoingPacket
|
||||||
import space.uranos.player.GameMode
|
import space.uranos.player.GameMode
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
@ -13,7 +14,7 @@ import java.util.*
|
||||||
/**
|
/**
|
||||||
* Informs the client about other players.
|
* Informs the client about other players.
|
||||||
*/
|
*/
|
||||||
data class PlayerInfoPacket(val action: Action<*>) : OutgoingPacket() {
|
data class PlayerInfoPacket(val action: Action<*>) : OutgoingPacket(), Mergeable {
|
||||||
sealed class Action<T> {
|
sealed class Action<T> {
|
||||||
abstract val entries: Map<UUID, T>
|
abstract val entries: Map<UUID, T>
|
||||||
|
|
||||||
|
@ -63,11 +64,29 @@ data class PlayerInfoPacket(val action: Action<*>) : OutgoingPacket() {
|
||||||
*/
|
*/
|
||||||
data class UpdateDisplayName(override val entries: Map<UUID, TextComponent?>) : Action<TextComponent?>()
|
data class UpdateDisplayName(override val entries: Map<UUID, TextComponent?>) : Action<TextComponent?>()
|
||||||
|
|
||||||
data class RemovePlayer(override val entries: Map<UUID, Unit>): Action<Unit>() {
|
data class RemovePlayer(override val entries: Map<UUID, Unit>) : Action<Unit>() {
|
||||||
constructor(entries: List<UUID>): this(entries.map { it to Unit }.toMap())
|
constructor(entries: List<UUID>) : this(entries.map { it to Unit }.toMap())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override fun mergeWith(otherPacket: OutgoingPacket): OutgoingPacket? {
|
||||||
|
if (otherPacket !is PlayerInfoPacket) return null
|
||||||
|
|
||||||
|
val action = when (otherPacket.action) {
|
||||||
|
is Action.AddPlayer -> (action as? Action.AddPlayer)?.let { Action.AddPlayer(otherPacket.action.entries + it.entries) }
|
||||||
|
is Action.UpdateGameMode -> (action as? Action.UpdateGameMode)?.let { Action.UpdateGameMode(otherPacket.action.entries + it.entries) }
|
||||||
|
is Action.UpdateLatency -> (action as? Action.UpdateLatency)?.let { Action.UpdateLatency(otherPacket.action.entries + it.entries) }
|
||||||
|
is Action.UpdateDisplayName -> (action as? Action.UpdateDisplayName)?.let {
|
||||||
|
Action.UpdateDisplayName(
|
||||||
|
otherPacket.action.entries + it.entries
|
||||||
|
)
|
||||||
|
}
|
||||||
|
is Action.RemovePlayer -> (action as? Action.RemovePlayer)?.let { Action.RemovePlayer(otherPacket.action.entries + it.entries) }
|
||||||
|
}
|
||||||
|
|
||||||
|
return action?.let { PlayerInfoPacket(it) }
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Latency constants for use in the [AddPlayer][Action.AddPlayer] and [UpdateLatency][Action.UpdateLatency] actions.
|
* Latency constants for use in the [AddPlayer][Action.AddPlayer] and [UpdateLatency][Action.UpdateLatency] actions.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -57,9 +57,8 @@ class UranosScheduler : Scheduler {
|
||||||
try {
|
try {
|
||||||
task.fn()
|
task.fn()
|
||||||
|
|
||||||
if (task.interval != null) {
|
if (task.interval != null) task.ticksUntilExecution = task.interval
|
||||||
task.ticksUntilExecution = task.interval
|
else tasks.remove(task)
|
||||||
} else tasks.remove(task)
|
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
tasks.remove(task)
|
tasks.remove(task)
|
||||||
e.printStackTrace()
|
e.printStackTrace()
|
||||||
|
|
|
@ -90,10 +90,10 @@ class UranosServer internal constructor() : Server() {
|
||||||
)
|
)
|
||||||
.build().loadConfigOrThrow<UranosConfig>()
|
.build().loadConfigOrThrow<UranosConfig>()
|
||||||
|
|
||||||
override val minimumLogLevel = config.minLogLevel
|
override val minimumLogLevel = config.logging.minLevel
|
||||||
override val developmentMode: Boolean = config.developmentMode
|
override val developmentMode: Boolean = config.developmentMode
|
||||||
|
|
||||||
override val eventBus = UranosEventBus(developmentMode)
|
override val eventBus = UranosEventBus(config.logging.events)
|
||||||
override val eventHandlerPositions = UranosEventHandlerPositionManager()
|
override val eventHandlerPositions = UranosEventHandlerPositionManager()
|
||||||
|
|
||||||
override fun shutdown() {
|
override fun shutdown() {
|
||||||
|
@ -118,14 +118,15 @@ class UranosServer internal constructor() : Server() {
|
||||||
logger info "Listening on ${config.host}:${config.port}"
|
logger info "Listening on ${config.host}:${config.port}"
|
||||||
|
|
||||||
scheduler.start()
|
scheduler.start()
|
||||||
startDataStorageTicking()
|
startTicking()
|
||||||
startPingSync()
|
startPingSync()
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun startDataStorageTicking() {
|
private fun startTicking() {
|
||||||
scheduler.executeRepeating(1, 0) {
|
scheduler.executeRepeating(1, 0) {
|
||||||
players.forEach { it.dataStorage.tick() }
|
players.forEach { it.dataStorage.tick() }
|
||||||
entities.forEach { it.dataStorage.tick() }
|
entities.forEach { it.dataStorage.tick() }
|
||||||
|
sessions.forEach { it.tick() }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,7 @@
|
||||||
package space.uranos.config
|
package space.uranos.config
|
||||||
|
|
||||||
import space.uranos.logging.Logger
|
import space.uranos.logging.Logger
|
||||||
|
import space.uranos.net.packet.Packet
|
||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
|
|
||||||
data class UranosConfig(
|
data class UranosConfig(
|
||||||
|
@ -14,9 +15,21 @@ data class UranosConfig(
|
||||||
val ignoreClientCausedErrors: Boolean,
|
val ignoreClientCausedErrors: Boolean,
|
||||||
val authenticateAndEncrypt: Boolean,
|
val authenticateAndEncrypt: Boolean,
|
||||||
val developmentMode: Boolean,
|
val developmentMode: Boolean,
|
||||||
val minLogLevel: Logger.Level,
|
|
||||||
val packetCompressionThreshold: Int,
|
val packetCompressionThreshold: Int,
|
||||||
val timeout: Duration,
|
val timeout: Duration,
|
||||||
val packetsBufferSize: Int,
|
val packetsBufferSize: Int,
|
||||||
val pingUpdateInterval: Duration
|
val pingUpdateInterval: Duration,
|
||||||
)
|
val logging: LoggingConfig
|
||||||
|
) {
|
||||||
|
data class LoggingConfig(
|
||||||
|
val minLevel: Logger.Level,
|
||||||
|
val events: Boolean,
|
||||||
|
val packets: List<String>,
|
||||||
|
val isBlacklist: Boolean
|
||||||
|
) {
|
||||||
|
fun shouldLog(packet: Packet): Boolean {
|
||||||
|
val contained = packets.contains(packet::class.simpleName?.removeSuffix("Packet"))
|
||||||
|
return if (isBlacklist) !contained else contained
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -9,7 +9,7 @@ import space.uranos.logging.Logger
|
||||||
import space.uranos.util.pluralizeWithCount
|
import space.uranos.util.pluralizeWithCount
|
||||||
import kotlin.system.measureTimeMillis
|
import kotlin.system.measureTimeMillis
|
||||||
|
|
||||||
class UranosEventBus(private val developmentMode: Boolean) : EventBus() {
|
class UranosEventBus(private val logEvents: Boolean) : EventBus() {
|
||||||
private val logger = Logger("EventBus")
|
private val logger = Logger("EventBus")
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -20,7 +20,7 @@ class UranosEventBus(private val developmentMode: Boolean) : EventBus() {
|
||||||
* @return [event]
|
* @return [event]
|
||||||
*/
|
*/
|
||||||
override suspend fun <T : Event> emit(event: T): T {
|
override suspend fun <T : Event> emit(event: T): T {
|
||||||
if (developmentMode) {
|
if (logEvents) {
|
||||||
var count = 0
|
var count = 0
|
||||||
val time = measureTimeMillis {
|
val time = measureTimeMillis {
|
||||||
for (handler in handlers) {
|
for (handler in handlers) {
|
||||||
|
|
|
@ -81,7 +81,7 @@ class LoginAndJoinProcedure(val session: UranosSession) {
|
||||||
|
|
||||||
private suspend fun afterLogin() {
|
private suspend fun afterLogin() {
|
||||||
val state: Session.State.LoginSucceeded = session.state.getOrFail()
|
val state: Session.State.LoginSucceeded = session.state.getOrFail()
|
||||||
session.launchPacketsChannelConsumer()
|
session.packetsAdapter.launchPacketsChannelConsumer()
|
||||||
|
|
||||||
val event = session.server.eventBus.emit(SessionAfterLoginEvent(session))
|
val event = session.server.eventBus.emit(SessionAfterLoginEvent(session))
|
||||||
val initialWorldAndLocation = event.initialWorldAndLocation
|
val initialWorldAndLocation = event.initialWorldAndLocation
|
||||||
|
|
|
@ -36,7 +36,7 @@ class PacketMessageDuplexHandler(private val session: UranosSession) : ChannelDu
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
|
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
|
||||||
session.onPacketDataReceived(msg as ByteBuf) // ByteBuf must be released by the session
|
session.packetsAdapter.onPacketReceived(msg as ByteBuf) // ByteBuf must be released by the session
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
|
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
|
||||||
|
|
128
uranos-server/src/main/kotlin/space/uranos/net/PacketsAdapter.kt
Normal file
128
uranos-server/src/main/kotlin/space/uranos/net/PacketsAdapter.kt
Normal file
|
@ -0,0 +1,128 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2020-2021 Moritz Ruth and Uranos contributors
|
||||||
|
* Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE file
|
||||||
|
*/
|
||||||
|
|
||||||
|
package space.uranos.net
|
||||||
|
|
||||||
|
import io.netty.buffer.ByteBuf
|
||||||
|
import io.netty.util.ReferenceCountUtil
|
||||||
|
import kotlinx.coroutines.channels.Channel
|
||||||
|
import kotlinx.coroutines.launch
|
||||||
|
import kotlinx.coroutines.withContext
|
||||||
|
import space.uranos.event.ifNotCancelled
|
||||||
|
import space.uranos.net.MinecraftProtocolDataTypes.readVarInt
|
||||||
|
import space.uranos.net.event.PacketReceivedEvent
|
||||||
|
import space.uranos.net.event.PacketSendEvent
|
||||||
|
import space.uranos.net.packet.IncomingPacket
|
||||||
|
import space.uranos.net.packet.Mergeable
|
||||||
|
import space.uranos.net.packet.OutgoingPacket
|
||||||
|
import space.uranos.util.awaitSuspending
|
||||||
|
|
||||||
|
class PacketsAdapter(val session: UranosSession) {
|
||||||
|
private val packetsForNextTick = ArrayList<OutgoingPacket>()
|
||||||
|
|
||||||
|
suspend fun tick() {
|
||||||
|
packetsForNextTick.forEach { send(it) }
|
||||||
|
packetsForNextTick.clear()
|
||||||
|
}
|
||||||
|
|
||||||
|
fun stopProcessingIncomingPackets() {
|
||||||
|
packetDataChannel.close()
|
||||||
|
packetsChannel?.close()
|
||||||
|
}
|
||||||
|
|
||||||
|
fun onPacketReceived(data: ByteBuf) {
|
||||||
|
packetDataChannel.offer(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
private suspend fun handlePacket(packet: IncomingPacket) {
|
||||||
|
if (session.server.config.logging.shouldLog(packet)) session.logger.trace { "Packet received: $packet" }
|
||||||
|
|
||||||
|
session.server.eventBus.emit(PacketReceivedEvent(session, packet)).ifNotCancelled {
|
||||||
|
SessionPacketReceivedEventHandler.handle(session, packet)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
suspend fun sendNextTick(packet: OutgoingPacket) = withContext(session.coroutineContext) {
|
||||||
|
if (packet is Mergeable) {
|
||||||
|
for (i in packetsForNextTick.indices.reversed()) {
|
||||||
|
val merged = packet.mergeWith(packetsForNextTick[i])
|
||||||
|
|
||||||
|
if (merged != null) {
|
||||||
|
packetsForNextTick[i] = merged
|
||||||
|
return@withContext
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
packetsForNextTick.add(packet)
|
||||||
|
}
|
||||||
|
|
||||||
|
suspend fun send(packet: OutgoingPacket): Unit = withContext(session.coroutineContext) {
|
||||||
|
if (session.server.config.logging.shouldLog(packet)) session.logger.trace { "Sending packet: $packet" }
|
||||||
|
|
||||||
|
session.server.eventBus.emit(PacketSendEvent(session, packet)).ifNotCancelled {
|
||||||
|
try {
|
||||||
|
session.channel.writeAndFlush(it.packet).awaitSuspending()
|
||||||
|
} catch (t: Throwable) {
|
||||||
|
if (session.channel.isActive) session.logger.error("Sending packet failed", t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private var packetsChannel: Channel<IncomingPacket>? = null
|
||||||
|
private val packetDataChannel: Channel<ByteBuf> = Channel(Channel.UNLIMITED)
|
||||||
|
|
||||||
|
fun launchPacketsChannelConsumer() {
|
||||||
|
val channel = Channel<IncomingPacket>(session.server.config.packetsBufferSize)
|
||||||
|
packetsChannel = channel
|
||||||
|
|
||||||
|
session.scope.launch {
|
||||||
|
for (packet in channel) {
|
||||||
|
handlePacket(packet)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fun launchPacketDataChannelConsumer() {
|
||||||
|
session.scope.launch {
|
||||||
|
for (data in packetDataChannel) {
|
||||||
|
try {
|
||||||
|
handlePacketData(data)
|
||||||
|
} catch (e: Exception) {
|
||||||
|
session.failAndDisconnectBecauseOfClient(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private suspend fun handlePacketData(data: ByteBuf) {
|
||||||
|
val packetID = data.readVarInt()
|
||||||
|
val codec = session.currentProtocol!!.incomingPacketCodecsByID[packetID]
|
||||||
|
|
||||||
|
if (codec == null) {
|
||||||
|
val message = "Received an unknown packet (ID: 0x${packetID.toString(16).padStart(2, '0')})"
|
||||||
|
|
||||||
|
if (session.server.config.developmentMode)
|
||||||
|
session.logger warn "$message. This will cause the client to disconnect in production mode."
|
||||||
|
else session.failAndDisconnectBecauseOfClient(message)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
val packet = codec.decode(data)
|
||||||
|
ReferenceCountUtil.release(data)
|
||||||
|
|
||||||
|
if (packetsChannel != null) {
|
||||||
|
// The protocol will not change anymore, so we can safely decode all pending packets with the current one
|
||||||
|
if (!packetsChannel!!.offer(packet)) {
|
||||||
|
session.logger.warn("The packet buffer is full (size: ${session.server.config.packetsBufferSize})")
|
||||||
|
session.disconnect(internalReason = "You sent more packets than the packet buffer can handle")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// The protocol could change because of this packet, so we must wait
|
||||||
|
handlePacket(packet)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -6,19 +6,13 @@
|
||||||
package space.uranos.net
|
package space.uranos.net
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf
|
import io.netty.buffer.ByteBuf
|
||||||
import io.netty.util.ReferenceCountUtil
|
|
||||||
import kotlinx.coroutines.*
|
import kotlinx.coroutines.*
|
||||||
import kotlinx.coroutines.channels.Channel
|
|
||||||
import space.uranos.UranosServer
|
import space.uranos.UranosServer
|
||||||
import space.uranos.chat.ChatColor
|
import space.uranos.chat.ChatColor
|
||||||
import space.uranos.chat.ChatComponent
|
import space.uranos.chat.ChatComponent
|
||||||
import space.uranos.chat.TextComponent
|
import space.uranos.chat.TextComponent
|
||||||
import space.uranos.event.*
|
import space.uranos.event.*
|
||||||
import space.uranos.logging.Logger
|
import space.uranos.logging.Logger
|
||||||
import space.uranos.net.MinecraftProtocolDataTypes.readVarInt
|
|
||||||
import space.uranos.net.event.PacketReceivedEvent
|
|
||||||
import space.uranos.net.event.PacketSendEvent
|
|
||||||
import space.uranos.net.packet.IncomingPacket
|
|
||||||
import space.uranos.net.packet.OutgoingPacket
|
import space.uranos.net.packet.OutgoingPacket
|
||||||
import space.uranos.net.packet.Protocol
|
import space.uranos.net.packet.Protocol
|
||||||
import space.uranos.net.packet.handshaking.HandshakingProtocol
|
import space.uranos.net.packet.handshaking.HandshakingProtocol
|
||||||
|
@ -29,7 +23,6 @@ import space.uranos.net.packet.play.PlayProtocol
|
||||||
import space.uranos.net.packet.play.PlayerInfoPacket
|
import space.uranos.net.packet.play.PlayerInfoPacket
|
||||||
import space.uranos.net.packet.status.StatusProtocol
|
import space.uranos.net.packet.status.StatusProtocol
|
||||||
import space.uranos.server.event.SessionInitializedEvent
|
import space.uranos.server.event.SessionInitializedEvent
|
||||||
import space.uranos.util.awaitSuspending
|
|
||||||
import space.uranos.util.supervisorChild
|
import space.uranos.util.supervisorChild
|
||||||
import java.net.InetAddress
|
import java.net.InetAddress
|
||||||
import java.net.InetSocketAddress
|
import java.net.InetSocketAddress
|
||||||
|
@ -37,14 +30,14 @@ import javax.crypto.SecretKey
|
||||||
import kotlin.coroutines.CoroutineContext
|
import kotlin.coroutines.CoroutineContext
|
||||||
import kotlin.properties.Delegates
|
import kotlin.properties.Delegates
|
||||||
|
|
||||||
class UranosSession(private val channel: io.netty.channel.Channel, val server: UranosServer) : Session() {
|
class UranosSession(val channel: io.netty.channel.Channel, val server: UranosServer) : Session() {
|
||||||
override val address: InetAddress = (channel.remoteAddress() as InetSocketAddress).address
|
override val address: InetAddress = (channel.remoteAddress() as InetSocketAddress).address
|
||||||
|
|
||||||
private val identifier = "UranosSession(${address.hostAddress})"
|
private val identifier = "UranosSession(${address.hostAddress})"
|
||||||
val logger = Logger(identifier)
|
val logger = Logger(identifier)
|
||||||
|
|
||||||
override val coroutineContext: CoroutineContext = server.coroutineContext.supervisorChild(identifier)
|
override val coroutineContext: CoroutineContext = server.coroutineContext.supervisorChild(identifier)
|
||||||
private val scope = CoroutineScope(coroutineContext)
|
val scope = CoroutineScope(coroutineContext)
|
||||||
|
|
||||||
override var brand: String? = null
|
override var brand: String? = null
|
||||||
|
|
||||||
|
@ -78,8 +71,25 @@ class UranosSession(private val channel: io.netty.channel.Channel, val server: U
|
||||||
is State.Disconnected -> null
|
is State.Disconnected -> null
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val packetsAdapter = PacketsAdapter(this)
|
||||||
|
|
||||||
|
override suspend fun send(packet: OutgoingPacket) = packetsAdapter.send(packet)
|
||||||
|
override fun sendNextTick(packet: OutgoingPacket) {
|
||||||
|
scope.launch { packetsAdapter.sendNextTick(packet) }
|
||||||
|
}
|
||||||
|
|
||||||
|
suspend fun tick() {
|
||||||
|
packetsAdapter.tick()
|
||||||
|
}
|
||||||
|
|
||||||
|
override suspend fun sendPluginMessage(channel: String, data: ByteBuf) {
|
||||||
|
if (this.currentProtocol != PlayProtocol) throw IllegalStateException("The session is not using the PLAY protocol")
|
||||||
|
send(OutgoingPluginMessagePacket(channel, data))
|
||||||
|
data.release()
|
||||||
|
}
|
||||||
|
|
||||||
override suspend fun disconnect(reason: TextComponent, internalReason: String?) {
|
override suspend fun disconnect(reason: TextComponent, internalReason: String?) {
|
||||||
stopProcessingIncomingPackets()
|
packetsAdapter.stopProcessingIncomingPackets()
|
||||||
|
|
||||||
if (currentProtocol == LoginProtocol || currentProtocol == PlayProtocol) {
|
if (currentProtocol == LoginProtocol || currentProtocol == PlayProtocol) {
|
||||||
logger info "Disconnected" + (internalReason?.let { ". Internal reason: $it" } ?: "")
|
logger info "Disconnected" + (internalReason?.let { ". Internal reason: $it" } ?: "")
|
||||||
|
@ -106,74 +116,6 @@ class UranosSession(private val channel: io.netty.channel.Channel, val server: U
|
||||||
channel.close()
|
channel.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
private var packetsChannel: Channel<IncomingPacket>? = null
|
|
||||||
private val packetDataChannel: Channel<ByteBuf> = Channel(Channel.UNLIMITED)
|
|
||||||
|
|
||||||
fun launchPacketsChannelConsumer() {
|
|
||||||
val channel = Channel<IncomingPacket>(server.config.packetsBufferSize)
|
|
||||||
packetsChannel = channel
|
|
||||||
|
|
||||||
scope.launch {
|
|
||||||
for (packet in channel) {
|
|
||||||
handlePacket(packet)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun launchPacketDataChannelConsumer() {
|
|
||||||
scope.launch {
|
|
||||||
for (data in packetDataChannel) {
|
|
||||||
try {
|
|
||||||
handlePacketData(data)
|
|
||||||
} catch (e: Exception) {
|
|
||||||
failAndDisconnectBecauseOfClient(e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun stopProcessingIncomingPackets() {
|
|
||||||
packetDataChannel.close()
|
|
||||||
packetsChannel?.close()
|
|
||||||
}
|
|
||||||
|
|
||||||
private suspend fun handlePacket(packet: IncomingPacket) {
|
|
||||||
logger.trace { "Packet received: $packet" }
|
|
||||||
|
|
||||||
server.eventBus.emit(PacketReceivedEvent(this@UranosSession, packet)).ifNotCancelled {
|
|
||||||
SessionPacketReceivedEventHandler.handle(this@UranosSession, packet)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private suspend fun handlePacketData(data: ByteBuf) {
|
|
||||||
val packetID = data.readVarInt()
|
|
||||||
val codec = currentProtocol!!.incomingPacketCodecsByID[packetID]
|
|
||||||
|
|
||||||
if (codec == null) {
|
|
||||||
val message = "Received an unknown packet (ID: 0x${packetID.toString(16).padStart(2, '0')})"
|
|
||||||
|
|
||||||
if (server.config.developmentMode)
|
|
||||||
logger warn "$message. This will cause the client to disconnect in production mode."
|
|
||||||
else failAndDisconnectBecauseOfClient(message)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
val packet = codec.decode(data)
|
|
||||||
ReferenceCountUtil.release(data)
|
|
||||||
|
|
||||||
if (packetsChannel != null) {
|
|
||||||
// The protocol will not change anymore, so we can safely decode all pending packets with the current one
|
|
||||||
if (!packetsChannel!!.offer(packet)) {
|
|
||||||
logger.warn("The packet buffer is full (size: ${server.config.packetsBufferSize})")
|
|
||||||
disconnect(internalReason = "You sent more packets than the packet buffer can handle")
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// The protocol could change because of this packet, so we must wait
|
|
||||||
handlePacket(packet)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
val joinProcedure = LoginAndJoinProcedure(this)
|
val joinProcedure = LoginAndJoinProcedure(this)
|
||||||
|
|
||||||
var lastKeepAlivePacketTimestamp by Delegates.notNull<Long>()
|
var lastKeepAlivePacketTimestamp by Delegates.notNull<Long>()
|
||||||
|
@ -202,7 +144,7 @@ class UranosSession(private val channel: io.netty.channel.Channel, val server: U
|
||||||
|
|
||||||
fun onConnect() = scope.launch {
|
fun onConnect() = scope.launch {
|
||||||
logger trace "Connected"
|
logger trace "Connected"
|
||||||
launchPacketDataChannelConsumer()
|
packetsAdapter.launchPacketDataChannelConsumer()
|
||||||
|
|
||||||
state = State.WaitingForHandshake
|
state = State.WaitingForHandshake
|
||||||
if (server.eventBus.emit(SessionInitializedEvent(this@UranosSession)).cancelled) channel.close()
|
if (server.eventBus.emit(SessionInitializedEvent(this@UranosSession)).cancelled) channel.close()
|
||||||
|
@ -215,18 +157,14 @@ class UranosSession(private val channel: io.netty.channel.Channel, val server: U
|
||||||
if (!expected && currentProtocol != HandshakingProtocol && currentProtocol != StatusProtocol)
|
if (!expected && currentProtocol != HandshakingProtocol && currentProtocol != StatusProtocol)
|
||||||
logger trace "The client disconnected unexpectedly"
|
logger trace "The client disconnected unexpectedly"
|
||||||
|
|
||||||
stopProcessingIncomingPackets()
|
packetsAdapter.stopProcessingIncomingPackets()
|
||||||
coroutineContext.cancel(DisconnectedCancellationException())
|
coroutineContext.cancel(DisconnectedCancellationException())
|
||||||
state = State.Disconnected
|
state = State.Disconnected
|
||||||
server.sessions.remove(this@UranosSession)
|
server.sessions.remove(this@UranosSession)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fun onPacketDataReceived(packetData: ByteBuf) {
|
fun failAndDisconnectBecauseOfClient(throwable: Throwable) =
|
||||||
packetDataChannel.offer(packetData)
|
|
||||||
}
|
|
||||||
|
|
||||||
internal fun failAndDisconnectBecauseOfClient(throwable: Throwable) =
|
|
||||||
failAndDisconnectBecauseOfClient("${throwable::class.java.name}: ${throwable.message}", throwable)
|
failAndDisconnectBecauseOfClient("${throwable::class.java.name}: ${throwable.message}", throwable)
|
||||||
|
|
||||||
internal fun failAndDisconnectBecauseOfClient(message: String, throwable: Throwable? = null) {
|
internal fun failAndDisconnectBecauseOfClient(message: String, throwable: Throwable? = null) {
|
||||||
|
@ -262,24 +200,6 @@ class UranosSession(private val channel: io.netty.channel.Channel, val server: U
|
||||||
.addAfter("framing", "compression", CompressionCodec(server.config.packetCompressionThreshold))
|
.addAfter("framing", "compression", CompressionCodec(server.config.packetCompressionThreshold))
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun send(packet: OutgoingPacket) = withContext<Unit>(coroutineContext) {
|
|
||||||
logger.trace { "Sending packet: $packet" }
|
|
||||||
server.eventBus.emit(PacketSendEvent(this@UranosSession, packet)).ifNotCancelled {
|
|
||||||
try {
|
|
||||||
channel.writeAndFlush(it.packet).awaitSuspending()
|
|
||||||
} catch (t: Throwable) {
|
|
||||||
if (!channel.isActive) return@withContext
|
|
||||||
logger.error("Sending packet failed", t)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
override suspend fun sendPluginMessage(channel: String, data: ByteBuf) {
|
|
||||||
if (this.currentProtocol != PlayProtocol) throw IllegalStateException("The session is not using the PLAY protocol")
|
|
||||||
send(OutgoingPluginMessagePacket(channel, data))
|
|
||||||
data.release()
|
|
||||||
}
|
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
const val KEEP_ALIVE_PACKET_INTERVAL = 1000
|
const val KEEP_ALIVE_PACKET_INTERVAL = 1000
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,8 +49,7 @@ class UranosPlayer(
|
||||||
|
|
||||||
val playerListName = createDataStorageKey("playerListName") { player: UranosPlayer, value: TextComponent? ->
|
val playerListName = createDataStorageKey("playerListName") { player: UranosPlayer, value: TextComponent? ->
|
||||||
player.session.server.players.forEach {
|
player.session.server.players.forEach {
|
||||||
// TODO: Make packets like PlayerInfoPacket mergeable
|
it.session.sendNextTick(PlayerInfoPacket(PlayerInfoPacket.Action.UpdateDisplayName(mapOf(player.uuid to value))))
|
||||||
it.session.send(PlayerInfoPacket(PlayerInfoPacket.Action.UpdateDisplayName(mapOf(player.uuid to value))))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
null
|
null
|
||||||
|
|
|
@ -8,3 +8,8 @@ packetCompressionThreshold: 256
|
||||||
timeout: 30s
|
timeout: 30s
|
||||||
packetsBufferSize: 20
|
packetsBufferSize: 20
|
||||||
pingUpdateInterval: 10s
|
pingUpdateInterval: 10s
|
||||||
|
logging:
|
||||||
|
minLevel: INFO
|
||||||
|
events: false
|
||||||
|
packets: [ ]
|
||||||
|
isBlacklist: false
|
||||||
|
|
Reference in a new issue