Archived
1
0
Fork 0

Introduce DataStorage

This commit is contained in:
Moritz Ruth 2021-01-06 22:09:27 +01:00
parent 4e34dbc1e2
commit 818939267a
No known key found for this signature in database
GPG key ID: AFD57E23E753841B
18 changed files with 220 additions and 45 deletions

View file

@ -5,8 +5,8 @@
package space.uranos.testplugin
import space.uranos.Position
import space.uranos.Uranos
import space.uranos.chat.ChatColor
import space.uranos.chat.TextComponent
import space.uranos.net.ServerListInfo
import space.uranos.net.event.ServerListInfoRequestEvent
@ -14,6 +14,7 @@ import space.uranos.net.event.SessionAfterLoginEvent
import space.uranos.player.GameMode
import space.uranos.plugin.Plugin
import space.uranos.testplugin.anvil.AnvilWorld
import space.uranos.util.secondsToTicks
import space.uranos.world.Dimension
import space.uranos.world.VoxelLocation
import space.uranos.world.block.CraftingTableBlock
@ -51,6 +52,10 @@ class TestPlugin: Plugin("Test", "1.0.0") {
.atTopCenter()
.withRotation(0f, 0f)
.inside(world)
Uranos.scheduler.executeAfter(secondsToTicks(10)) {
Uranos.players.first().playerListName = TextComponent("Test", true, color = ChatColor.BLUE)
}
}
}
}

View file

@ -14,14 +14,14 @@ interface Scheduler {
*
* If you need the return value of [block], use [runAfter].
*/
fun executeAfter(delay: Int, block: suspend () -> Unit): Task
fun executeAfter(delay: Long, block: suspend () -> Unit): Task
/**
* Executes [block] every [interval] ticks.
*
* @param delay The ticks to pass before the first execution.
*/
fun executeRepeating(interval: Int, delay: Int = interval, block: suspend () -> Any): Task
fun executeRepeating(interval: Long, delay: Long = interval, block: suspend () -> Unit): Task
/**
* Executes [block] when the scheduler (and therefore the server) is shutdown.
@ -35,13 +35,13 @@ interface Scheduler {
*
* If you do not need the return value of [block], you should use [executeAfter] instead.
*/
suspend fun <R : Any> runAfter(delay: Int, inServerThread: Boolean = false, block: suspend () -> R): R
suspend fun <R> runAfter(delay: Long, block: suspend () -> R): R
/**
* Like [runAfter], but the task is *not* cancelled when the current coroutine scope is cancelled.
*/
suspend fun <R : Any> runDetachedAfter(delay: Int, inServerThread: Boolean = false, block: suspend () -> R): R =
withContext(NonCancellable) { runAfter(delay, inServerThread, block) }
suspend fun <R> runDetachedAfter(delay: Long, block: suspend () -> R): R =
withContext(NonCancellable) { runAfter(delay, block) }
interface Task {
fun cancel()

View file

@ -25,7 +25,7 @@ abstract class Entity internal constructor() {
abstract val type: EntityType
private val worldMutex = Mutex()
var world: World? = null; private set
var world: World? = null; protected set
suspend fun setWorld(world: World?) {
if (world == null && this is PlayerEntity)

View file

@ -47,7 +47,8 @@ abstract class Session {
/**
* The player corresponding to this session.
*/
val player: Player? get() = (state as? State.WithPlayer)?.player
val player: Player? get() = (state as? State.Playing)?.player
// val player: Player? get() = (state as? State.WithPlayer)?.player
/**
* The current state of this session.

View file

@ -8,10 +8,12 @@ package space.uranos.util
import kotlin.reflect.KClass
class KClassToInstanceMap<K : Any> : MutableMap<KClass<out K>, K> by HashMap() {
val instances get() = this.values
fun <T : K> getInstance(key: Key<T>): T? = getInstance(key.actualKey)
@Suppress("UNCHECKED_CAST")
fun <T : K> getInstance(key: KClass<T>): T? = get(key) as T
fun <T : K> getInstance(key: KClass<T>): T? = get(key) as T?
fun <T : K> putInstance(key: Key<T>, value: T) = this.putInstance(key.actualKey, value)

View file

@ -13,7 +13,7 @@ import space.uranos.server.Server
/**
* Suspends for [ticks].
*/
suspend inline fun delayTicks(ticks: Int) {
suspend inline fun delayTicks(ticks: Long) {
Uranos.scheduler.runAfter(ticks) {}
}

View file

@ -0,0 +1,11 @@
/*
* Copyright 2020-2021 Moritz Ruth and Uranos contributors
* Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE file
*/
package space.uranos.util
import kotlinx.coroutines.withContext
import space.uranos.Uranos
suspend fun <T> runInServerThread(block: suspend () -> T): T = withContext(Uranos.coroutineContext) { block() }

View file

@ -12,7 +12,7 @@ import space.uranos.net.packet.IncomingPacketCodec
object IncomingPlayerPositionPacketCodec :
IncomingPacketCodec<IncomingPlayerPositionPacket>(0x13, IncomingPlayerPositionPacket::class) {
override fun decode(msg: ByteBuf): IncomingPlayerPositionPacket = IncomingPlayerPositionPacket(
Position(msg.readDouble(), msg.readDouble(), msg.readDouble(), 360 - msg.readFloat() % 360, msg.readFloat()),
Position(msg.readDouble(), msg.readDouble(), msg.readDouble(), (360 - msg.readFloat()) % 360, msg.readFloat()),
msg.readBoolean()
)
}

View file

@ -5,21 +5,23 @@
package space.uranos
import kotlinx.coroutines.*
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.suspendCancellableCoroutine
import space.uranos.logging.Logger
import space.uranos.server.Server
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.*
import kotlin.collections.LinkedHashSet
import kotlin.coroutines.resume
/**
* Basically ExecutorService but for coroutines and with ticks.
*/
class UranosScheduler(private val executor: ScheduledExecutorService) : Scheduler {
class UranosScheduler : Scheduler {
private val executor: ScheduledExecutorService =
Executors.newSingleThreadScheduledExecutor { r -> Thread(r, "Scheduler") }
private val tasks = ConcurrentHashMap.newKeySet<Task<out Any>>()
private val shutdownTasks = Collections.synchronizedSet(LinkedHashSet<suspend () -> Unit>())
@ -27,8 +29,8 @@ class UranosScheduler(private val executor: ScheduledExecutorService) : Schedule
inner class Task<R : Any>(
val fn: suspend () -> R,
val interval: Int?,
var ticksUntilExecution: Int
val interval: Long?,
var ticksUntilExecution: Long
) : Scheduler.Task {
@Volatile
var cancelled: Boolean = false
@ -85,13 +87,13 @@ class UranosScheduler(private val executor: ScheduledExecutorService) : Schedule
shutdownTasks.forEach { it.invoke() }
}
override fun executeAfter(delay: Int, block: suspend () -> Unit): Scheduler.Task {
override fun executeAfter(delay: Long, block: suspend () -> Unit): Scheduler.Task {
val task = Task(block, null, delay)
tasks.add(task)
return task
}
override fun executeRepeating(interval: Int, delay: Int, block: suspend () -> Any): Scheduler.Task {
override fun executeRepeating(interval: Long, delay: Long, block: suspend () -> Unit): Scheduler.Task {
val task = Task(block, interval, delay)
tasks.add(task)
return task
@ -107,15 +109,10 @@ class UranosScheduler(private val executor: ScheduledExecutorService) : Schedule
}
}
// TODO: Use the current coroutine context for the task execution
override suspend fun <R : Any> runAfter(delay: Int, inServerThread: Boolean, block: suspend () -> R): R {
override suspend fun <R> runAfter(delay: Long, block: suspend () -> R): R {
lateinit var continuation: CancellableContinuation<R>
val context = currentCoroutineContext()
val fn =
if (inServerThread) suspend { continuation.resume(block()) }
else suspend { withContext(context) { continuation.resume(block()) } }
val fn = suspend { continuation.resume(block()) }
val task = Task(fn, null, delay)
return suspendCancellableCoroutine {

View file

@ -19,17 +19,19 @@ import space.uranos.event.UranosEventHandlerPositionManager
import space.uranos.logging.Logger
import space.uranos.logging.UranosLoggingOutputProvider
import space.uranos.net.UranosSocketServer
import space.uranos.net.packet.play.PlayerInfoPacket
import space.uranos.player.UranosPlayer
import space.uranos.plugin.UranosPluginManager
import space.uranos.recipe.Recipe
import space.uranos.server.Server
import space.uranos.util.EncryptionUtils
import space.uranos.util.msToTicks
import space.uranos.world.BiomeRegistry
import space.uranos.world.Dimension
import java.io.File
import java.security.KeyPair
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import kotlin.coroutines.CoroutineContext
import kotlin.system.exitProcess
@ -49,8 +51,8 @@ class UranosServer internal constructor() : Server() {
val x509EncodedPublicKey: ByteArray = EncryptionUtils.generateX509Key(keyPair.public).encoded
override lateinit var serverThread: Thread
private val scheduledExecutorService: ScheduledExecutorService =
Executors.newSingleThreadScheduledExecutor { r -> Thread(r, "server").also { serverThread = it } }
private val scheduledExecutorService: ExecutorService =
Executors.newSingleThreadExecutor { r -> Thread(r, "Server").also { serverThread = it } }
override val coroutineContext: CoroutineContext =
CoroutineName("Server") + SupervisorJob() + scheduledExecutorService.asCoroutineDispatcher()
@ -72,7 +74,7 @@ class UranosServer internal constructor() : Server() {
override val biomeRegistry = BiomeRegistry()
override val loggingOutputProvider = UranosLoggingOutputProvider
override val scheduler = UranosScheduler(scheduledExecutorService)
override val scheduler = UranosScheduler()
val config = ConfigLoader.Builder()
.addPropertySource(
@ -116,6 +118,24 @@ class UranosServer internal constructor() : Server() {
logger info "Listening on ${config.host}:${config.port}"
scheduler.start()
startDataStorageTicking()
startPingSync()
}
private fun startDataStorageTicking() {
scheduler.executeRepeating(1, 0) {
players.forEach { it.dataStorage.tick() }
}
}
private fun startPingSync() {
scheduler.executeRepeating(msToTicks(config.pingUpdateInterval.toMillis()), 0) {
val packet = PlayerInfoPacket(
PlayerInfoPacket.Action.UpdateLatency(players.map { it.uuid to it.session.ping }.toMap())
)
players.forEach { it.session.send(packet) }
}
}
companion object {

View file

@ -17,5 +17,6 @@ data class UranosConfig(
val minLogLevel: Logger.Level,
val packetCompressionThreshold: Int,
val timeout: Duration,
val packetsBufferSize: Int
val packetsBufferSize: Int,
val pingUpdateInterval: Duration
)

View file

@ -0,0 +1,61 @@
/*
* Copyright 2020-2021 Moritz Ruth and Uranos contributors
* Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE file
*/
package space.uranos.data
import java.lang.ref.WeakReference
class DataStorage<ContextT : Any>(val context: ContextT) {
private val map = HashMap<DataStorageKey<ContextT, *>, Entry<*>>()
inner class Entry<V>(private val key: DataStorageKey<ContextT, V>, value: V) {
private var oldValueRef = WeakReference(value)
private var changed = true
var value: V = value
set(value) {
val oldValue = oldValueRef.get()
if (oldValue == value) return
field = value
changed = true
}
suspend fun tick(): DataStorageCombinableAction<ContextT, *>? {
var action = key.tick(context, value, changed)
if (changed) action = key.tickIfChanged(context, value)
oldValueRef = WeakReference(value)
changed = false
return action
}
}
fun <V> set(key: DataStorageKey<ContextT, V>, value: V) {
@Suppress("UNCHECKED_CAST")
val entry = map[key] as Entry<V>?
println(key)
println(value)
if (entry == null) map[key] = Entry(key, value)
else entry.value = value
}
@Suppress("UNCHECKED_CAST")
fun <V> get(key: DataStorageKey<ContextT, V>) = map[key] as V
suspend fun tick() {
val actions = map.values.mapNotNull { it.tick() }
actions.groupBy { it.key }.forEach { (key, values) ->
@Suppress("UNCHECKED_CAST")
key as DataStorageCombinableActionKey<ContextT, Any>
key.tick(context, values)
}
}
}

View file

@ -0,0 +1,12 @@
/*
* Copyright 2020-2021 Moritz Ruth and Uranos contributors
* Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE file
*/
package space.uranos.data
data class DataStorageCombinableAction<ContextT, V>(val key: DataStorageCombinableActionKey<ContextT, V>, val value: V)
abstract class DataStorageCombinableActionKey<ContextT, V> {
open suspend fun tick(context: ContextT, values: List<V>) {}
}

View file

@ -0,0 +1,25 @@
/*
* Copyright 2020-2021 Moritz Ruth and Uranos contributors
* Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE file
*/
package space.uranos.data
abstract class DataStorageKey<ContextT, V>(val name: String) {
open suspend fun tick(context: ContextT, value: V, changed: Boolean): DataStorageCombinableAction<ContextT, *>? =
null
open suspend fun tickIfChanged(context: ContextT, value: V): DataStorageCombinableAction<ContextT, *>? = null
override fun toString(): String = "DataStorageKey:$name"
}
fun <ContextT : Any, V> createDataStorageKey(
name: String,
tick: suspend (context: ContextT, value: V, changed: Boolean) -> DataStorageCombinableAction<ContextT, *>? = { _, _, _ -> null },
tickIfChanged: suspend (context: ContextT, value: V) -> DataStorageCombinableAction<ContextT, *>? = { _, _ -> null }
) =
object : DataStorageKey<ContextT, V>(name) {
override suspend fun tick(context: ContextT, value: V, changed: Boolean) = tick(context, value, changed)
override suspend fun tickIfChanged(context: ContextT, value: V) = tickIfChanged(context, value)
}

View file

@ -93,7 +93,6 @@ class LoginAndJoinProcedure(val session: UranosSession) {
session.disconnect(internalReason = "No spawn location set")
}
else -> {
// TODO: Spawn the player entity
session.send(
JoinGamePacket(
0,
@ -198,8 +197,7 @@ class LoginAndJoinProcedure(val session: UranosSession) {
session.send(
PlayerInfoPacket(
PlayerInfoPacket.Action.UpdateLatency(
session.server.players
.map { it.uuid to it.session.ping }.toMap()
session.server.players.map { it.uuid to it.session.ping }.toMap()
)
)
)
@ -207,6 +205,8 @@ class LoginAndJoinProcedure(val session: UranosSession) {
session.send(UpdateViewPositionPacket(Chunk.Key.from(player.entity.position.toVoxelLocation())))
session.scheduleKeepAlivePacket(true)
player.spawnInitially(state.world)
player.sendChunksAndLight()
// WorldBorder
@ -214,5 +214,6 @@ class LoginAndJoinProcedure(val session: UranosSession) {
session.send(OutgoingPlayerPositionPacket(state.position))
// TODO: Wait for ClientStatus(action=0) packet
session.state = Session.State.Playing(player)
}
}

View file

@ -26,6 +26,7 @@ import space.uranos.net.packet.login.LoginProtocol
import space.uranos.net.packet.play.OutgoingKeepAlivePacket
import space.uranos.net.packet.play.OutgoingPluginMessagePacket
import space.uranos.net.packet.play.PlayProtocol
import space.uranos.net.packet.play.PlayerInfoPacket
import space.uranos.net.packet.status.StatusProtocol
import space.uranos.server.event.SessionInitializedEvent
import space.uranos.util.awaitSuspending
@ -43,10 +44,19 @@ class UranosSession(private val channel: io.netty.channel.Channel, val server: U
val logger = Logger(identifier)
override val coroutineContext: CoroutineContext = server.coroutineContext.supervisorChild(identifier)
val scope = CoroutineScope(coroutineContext)
private val scope = CoroutineScope(coroutineContext)
override var brand: String? = null
override var ping: Int = -1
set(value) {
if (field == -1) {
val packet = PlayerInfoPacket(PlayerInfoPacket.Action.UpdateLatency(mapOf(player!!.uuid to value)))
scope.launch { server.players.forEach { it.session.send(packet) } }
}
field = value
}
override var state: State = State.WaitingForHandshake

View file

@ -7,18 +7,23 @@ package space.uranos.player
import space.uranos.Position
import space.uranos.chat.TextComponent
import space.uranos.data.DataStorage
import space.uranos.data.createDataStorageKey
import space.uranos.entity.PlayerEntity
import space.uranos.net.Session
import space.uranos.net.UranosSession
import space.uranos.net.packet.play.ChunkDataPacket
import space.uranos.net.packet.play.ChunkLightDataPacket
import space.uranos.net.packet.play.PlayerInfoPacket
import space.uranos.net.packet.play.SelectedHotbarSlotPacket
import space.uranos.util.clampArgument
import space.uranos.world.Chunk
import space.uranos.world.VoxelLocation
import space.uranos.world.World
import java.util.*
import kotlin.math.abs
class UranosPlayer(
override val session: Session,
override val session: UranosSession,
override val name: String,
override val uuid: UUID,
override var gameMode: GameMode,
@ -34,25 +39,48 @@ class UranosPlayer(
override var compassTarget: VoxelLocation,
selectedHotbarSlot: Int
) : Player {
override var selectedHotbarSlot = 0
val dataStorage = DataStorage(this)
object DataStorageKeys {
val selectedHotbarSlot = createDataStorageKey("selectedHotbarSlot") { player: UranosPlayer, value: Int ->
player.session.send(SelectedHotbarSlotPacket(value))
null
}
val playerListName = createDataStorageKey("playerListName") { player: UranosPlayer, value: TextComponent? ->
player.session.server.players.forEach {
// TODO: Make packets like PlayerInfoPacket mergeable
it.session.send(PlayerInfoPacket(PlayerInfoPacket.Action.UpdateDisplayName(mapOf(player.uuid to value))))
}
null
}
}
override var selectedHotbarSlot
get() = dataStorage.get(DataStorageKeys.selectedHotbarSlot)
set(value) {
clampArgument("selectedHotbarSlot", 0..8, value)
field = value
dataStorage.set(DataStorageKeys.selectedHotbarSlot, value)
}
override var playerListName
get() = dataStorage.get(DataStorageKeys.playerListName)
set(value) = dataStorage.set(DataStorageKeys.playerListName, value)
init {
this.selectedHotbarSlot = selectedHotbarSlot
}
override var playerListName: TextComponent? = null
override var currentlyViewedChunks = emptyList<Chunk>()
init {
override val entity: PlayerEntity = PlayerEntity(position, this, headPitch)
suspend fun spawnInitially(world: World) {
entity.setWorld(world)
updateCurrentlyViewedChunks()
}
override val entity: PlayerEntity = PlayerEntity(position, this, headPitch)
/**
* Sets [currentlyViewedChunks] to all chunks in the view distance.
*/

View file

@ -7,3 +7,4 @@ minLogLevel: INFO
packetCompressionThreshold: 256
timeout: 30s
packetsBufferSize: 20
pingUpdateInterval: 10s