diff --git a/uranos-api/src/main/kotlin/space/uranos/net/Session.kt b/uranos-api/src/main/kotlin/space/uranos/net/Session.kt index f633486..f5b2499 100644 --- a/uranos-api/src/main/kotlin/space/uranos/net/Session.kt +++ b/uranos-api/src/main/kotlin/space/uranos/net/Session.kt @@ -6,7 +6,10 @@ package space.uranos.net import io.netty.buffer.ByteBuf +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.SupervisorJob import space.uranos.Position +import space.uranos.Uranos import space.uranos.chat.TextComponent import space.uranos.event.EventBusWrapper import space.uranos.net.packet.OutgoingPacket @@ -16,21 +19,20 @@ import space.uranos.player.Player import space.uranos.world.World import java.net.InetAddress import java.util.* -import kotlin.coroutines.CoroutineContext abstract class Session { val events by lazy { EventBusWrapper(this) } + /** + * A coroutine scope that is cancelled when the session disconnected. + */ + val scope = CoroutineScope(SupervisorJob() + Uranos.dispatcher) + /** * The IP address of this session */ abstract val address: InetAddress - /** - * Unconfined [CoroutineContext] which is cancelled when the session is disconnected. - */ - abstract val coroutineContext: CoroutineContext - /** * The brand name the client optionally sent during the login procedure. * [ClientBrandReceivedEvent][space.uranos.net.event.ClientBrandReceivedEvent] is emitted when this value changes. @@ -48,7 +50,6 @@ abstract class Session { * The player corresponding to this session. */ val player: Player? get() = (state as? State.Playing)?.player -// val player: Player? get() = (state as? State.WithPlayer)?.player /** * The current state of this session. diff --git a/uranos-api/src/main/kotlin/space/uranos/player/Player.kt b/uranos-api/src/main/kotlin/space/uranos/player/Player.kt index d24c0d2..aaedb4b 100644 --- a/uranos-api/src/main/kotlin/space/uranos/player/Player.kt +++ b/uranos-api/src/main/kotlin/space/uranos/player/Player.kt @@ -11,10 +11,9 @@ import space.uranos.net.Session import space.uranos.world.Chunk import space.uranos.world.VoxelLocation import java.util.* -import kotlin.coroutines.CoroutineContext interface Player { - val coroutineContext: CoroutineContext get() = session.coroutineContext + val scope get() = session.scope /** * The session of this player. diff --git a/uranos-api/src/main/kotlin/space/uranos/server/Server.kt b/uranos-api/src/main/kotlin/space/uranos/server/Server.kt index 5aa19f6..a3606f1 100644 --- a/uranos-api/src/main/kotlin/space/uranos/server/Server.kt +++ b/uranos-api/src/main/kotlin/space/uranos/server/Server.kt @@ -5,7 +5,9 @@ package space.uranos.server -import space.uranos.CalledFromWrongThread +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.SupervisorJob import space.uranos.Registry import space.uranos.Scheduler import space.uranos.command.Command @@ -18,25 +20,26 @@ import space.uranos.net.Session import space.uranos.player.Player import space.uranos.plugin.PluginManager import space.uranos.recipe.Recipe +import space.uranos.util.newSingleThreadDispatcher import space.uranos.world.BiomeRegistry import space.uranos.world.Dimension import java.io.File import java.util.* import java.util.concurrent.atomic.AtomicInteger -import kotlin.coroutines.CoroutineContext abstract class Server { abstract val eventBus: EventBus abstract val eventHandlerPositions: EventHandlerPositionManager - protected abstract val serverThread: Thread + /** + * A coroutine dispatcher that is confined to the server thread. + */ + val dispatcher: CoroutineDispatcher = newSingleThreadDispatcher("Server") /** - * [CoroutineContext] confined to the server thread. - * - * Is cancelled when the server is shutting down. + * A coroutine scope that is cancelled when the server is shutdown. */ - abstract val coroutineContext: CoroutineContext + val scope = CoroutineScope(SupervisorJob() + dispatcher) /** * All sessions connected to the server. @@ -68,13 +71,6 @@ abstract class Server { */ abstract fun shutdown() - /** - * Throws [CalledFromWrongThread] when called from a thread which is not the server thread. - */ - fun ensureServerThread(errorMessage: String) { - if (Thread.currentThread() != serverThread) throw CalledFromWrongThread(errorMessage) - } - /** * Set of all existing [Entity] instances. * diff --git a/uranos-api/src/main/kotlin/space/uranos/util/runInServerThread.kt b/uranos-api/src/main/kotlin/space/uranos/util/runInServerThread.kt index b6687e9..92afaa7 100644 --- a/uranos-api/src/main/kotlin/space/uranos/util/runInServerThread.kt +++ b/uranos-api/src/main/kotlin/space/uranos/util/runInServerThread.kt @@ -8,4 +8,4 @@ package space.uranos.util import kotlinx.coroutines.withContext import space.uranos.Uranos -suspend fun runInServerThread(block: suspend () -> T): T = withContext(Uranos.coroutineContext) { block() } +suspend fun runInServerThread(block: suspend () -> T): T = withContext(Uranos.dispatcher) { block() } diff --git a/uranos-api/src/main/kotlin/space/uranos/world/Chunk.kt b/uranos-api/src/main/kotlin/space/uranos/world/Chunk.kt index e2ef6c6..92af9c2 100644 --- a/uranos-api/src/main/kotlin/space/uranos/world/Chunk.kt +++ b/uranos-api/src/main/kotlin/space/uranos/world/Chunk.kt @@ -5,10 +5,10 @@ package space.uranos.world +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.SupervisorJob import space.uranos.Uranos import space.uranos.player.Player -import space.uranos.util.supervisorChild -import kotlin.coroutines.CoroutineContext import kotlin.math.floor abstract class Chunk( @@ -40,11 +40,9 @@ abstract class Chunk( } /** - * [CoroutineContext] confined to the world thread. - * - * Is cancelled when the chunk is unloaded. + * A coroutine scope confined to the world's dispatcher that is cancelled when the chunk is unloaded. */ - val coroutineContext: CoroutineContext = world.coroutineContext.supervisorChild(identifier) + val scope = CoroutineScope(SupervisorJob() + world.dispatcher) /** * A list of all players who have locked this chunk. diff --git a/uranos-api/src/main/kotlin/space/uranos/world/World.kt b/uranos-api/src/main/kotlin/space/uranos/world/World.kt index 1a64adb..83f2748 100644 --- a/uranos-api/src/main/kotlin/space/uranos/world/World.kt +++ b/uranos-api/src/main/kotlin/space/uranos/world/World.kt @@ -5,33 +5,31 @@ package space.uranos.world -import kotlinx.coroutines.cancel -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.launch -import kotlinx.coroutines.withContext -import space.uranos.Uranos +import kotlinx.coroutines.* import space.uranos.Vector import space.uranos.entity.Entity import space.uranos.util.newSingleThreadDispatcher -import space.uranos.util.supervisorChild import space.uranos.util.untilPossiblyLower import java.util.* import java.util.concurrent.CopyOnWriteArraySet -import kotlin.coroutines.CoroutineContext /** * A Minecraft world. */ abstract class World(val uuid: UUID) { private val identifier = "World($uuid)" - private val threadExecutor = newSingleThreadDispatcher(identifier) + + private val internalDispatcher = newSingleThreadDispatcher(identifier) /** - * [CoroutineContext] confined to the world thread. - * - * Is cancelled when the world is unloaded. + * A coroutine dispatcher that is confined to the server thread. */ - val coroutineContext: CoroutineContext = Uranos.coroutineContext.supervisorChild(identifier) + threadExecutor + val dispatcher: CoroutineDispatcher = internalDispatcher + + /** + * A coroutine scope confined to [dispatcher] that is cancelled when the world is destroyed. + */ + val scope = CoroutineScope(SupervisorJob() + dispatcher) abstract val dimension: Dimension abstract val loadedChunks: Map @@ -104,14 +102,14 @@ abstract class World(val uuid: UUID) { } } - suspend fun destroy() = withContext(coroutineContext) { + suspend fun destroy() { // TODO: Move or kick players - coroutineContext.cancel() + scope.cancel() coroutineScope { loadedChunks.values.forEach { launch { (it as? Chunk.Unloadable)?.unload() } } } - threadExecutor.close() + internalDispatcher.close() } } diff --git a/uranos-server/src/main/kotlin/space/uranos/UranosServer.kt b/uranos-server/src/main/kotlin/space/uranos/UranosServer.kt index 2ce2b6d..d5da1dd 100644 --- a/uranos-server/src/main/kotlin/space/uranos/UranosServer.kt +++ b/uranos-server/src/main/kotlin/space/uranos/UranosServer.kt @@ -8,9 +8,6 @@ package space.uranos import com.sksamuel.hoplite.ConfigFilePropertySource import com.sksamuel.hoplite.ConfigLoader import com.sksamuel.hoplite.ConfigSource -import kotlinx.coroutines.CoroutineName -import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.runBlocking import space.uranos.command.Command import space.uranos.config.UranosConfig @@ -26,13 +23,11 @@ import space.uranos.recipe.Recipe import space.uranos.server.Server import space.uranos.util.EncryptionUtils import space.uranos.util.msToTicks +import space.uranos.util.runInServerThread import space.uranos.world.BiomeRegistry import space.uranos.world.Dimension import java.io.File import java.security.KeyPair -import java.util.concurrent.ExecutorService -import java.util.concurrent.Executors -import kotlin.coroutines.CoroutineContext import kotlin.system.exitProcess // TODO: Consider using DI because this improves testability @@ -50,13 +45,6 @@ class UranosServer internal constructor() : Server() { val x509EncodedPublicKey: ByteArray = EncryptionUtils.generateX509Key(keyPair.public).encoded - override lateinit var serverThread: Thread - private val scheduledExecutorService: ExecutorService = - Executors.newSingleThreadExecutor { r -> Thread(r, "Server").also { serverThread = it } } - - override val coroutineContext: CoroutineContext = - CoroutineName("Server") + SupervisorJob() + scheduledExecutorService.asCoroutineDispatcher() - override val sessions by socketServer::sessions override val players get() = sessions.mapNotNull { it.player as UranosPlayer? } @@ -124,14 +112,16 @@ class UranosServer internal constructor() : Server() { private fun startTicking() { scheduler.executeRepeating(1, 0) { - players.forEach { it.container.tick() } + runInServerThread { + players.forEach { it.container.tick() } - entities.forEach { - @Suppress("DEPRECATION_ERROR") - it.tick() + entities.forEach { + @Suppress("DEPRECATION_ERROR") + it.tick() + } + + sessions.forEach { it.packetsAdapter.tick() } } - - sessions.forEach { it.packetsAdapter.tick() } } } diff --git a/uranos-server/src/main/kotlin/space/uranos/net/PacketsAdapter.kt b/uranos-server/src/main/kotlin/space/uranos/net/PacketsAdapter.kt index b0d24a7..47ff18b 100644 --- a/uranos-server/src/main/kotlin/space/uranos/net/PacketsAdapter.kt +++ b/uranos-server/src/main/kotlin/space/uranos/net/PacketsAdapter.kt @@ -9,7 +9,6 @@ import io.netty.buffer.ByteBuf import io.netty.util.ReferenceCountUtil import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch -import kotlinx.coroutines.withContext import space.uranos.event.ifNotCancelled import space.uranos.net.MinecraftProtocolDataTypes.readVarInt import space.uranos.net.event.PacketReceivedEvent @@ -22,7 +21,7 @@ import space.uranos.util.awaitSuspending class PacketsAdapter(val session: UranosSession) { private val packetsForNextTick = ArrayList() - suspend fun tick() = withContext(session.coroutineContext) { + suspend fun tick() { packetsForNextTick.forEach { send(it) } packetsForNextTick.clear() } @@ -44,14 +43,14 @@ class PacketsAdapter(val session: UranosSession) { } } - suspend fun sendNextTick(packet: OutgoingPacket) = withContext(session.coroutineContext) { + fun sendNextTick(packet: OutgoingPacket) { if (packet is Mergeable) { for (i in packetsForNextTick.indices.reversed()) { val merged = packet.mergeWith(packetsForNextTick[i]) if (merged != null) { packetsForNextTick[i] = merged - return@withContext + return } } } @@ -59,7 +58,7 @@ class PacketsAdapter(val session: UranosSession) { packetsForNextTick.add(packet) } - suspend fun send(packet: OutgoingPacket): Unit = withContext(session.coroutineContext) { + suspend fun send(packet: OutgoingPacket) { if (session.server.config.logging.shouldLog(packet)) session.logger.trace { "Sending packet: $packet" } session.server.eventBus.emit(PacketSendEvent(session, packet)).ifNotCancelled { diff --git a/uranos-server/src/main/kotlin/space/uranos/net/UranosSession.kt b/uranos-server/src/main/kotlin/space/uranos/net/UranosSession.kt index 2ca4630..184e804 100644 --- a/uranos-server/src/main/kotlin/space/uranos/net/UranosSession.kt +++ b/uranos-server/src/main/kotlin/space/uranos/net/UranosSession.kt @@ -23,11 +23,9 @@ import space.uranos.net.packet.play.PlayProtocol import space.uranos.net.packet.play.PlayerInfoPacket import space.uranos.net.packet.status.StatusProtocol import space.uranos.server.event.SessionInitializedEvent -import space.uranos.util.supervisorChild import java.net.InetAddress import java.net.InetSocketAddress import javax.crypto.SecretKey -import kotlin.coroutines.CoroutineContext import kotlin.properties.Delegates class UranosSession(val channel: io.netty.channel.Channel, val server: UranosServer) : Session() { @@ -36,9 +34,6 @@ class UranosSession(val channel: io.netty.channel.Channel, val server: UranosSer private val identifier = "UranosSession(${address.hostAddress})" val logger = Logger(identifier) - override val coroutineContext: CoroutineContext = server.coroutineContext.supervisorChild(identifier) - val scope = CoroutineScope(coroutineContext) - override var brand: String? = null override var ping: Int = -1 @@ -75,9 +70,7 @@ class UranosSession(val channel: io.netty.channel.Channel, val server: UranosSer val packetsAdapter = PacketsAdapter(this) override suspend fun send(packet: OutgoingPacket) = packetsAdapter.send(packet) - override fun sendNextTick(packet: OutgoingPacket) { - scope.launch { packetsAdapter.sendNextTick(packet) } - } + override fun sendNextTick(packet: OutgoingPacket) = packetsAdapter.sendNextTick(packet) override suspend fun sendPluginMessage(channel: String, data: ByteBuf) { if (this.currentProtocol != PlayProtocol) throw IllegalStateException("The session is not using the PLAY protocol")