DispatcherCache.DispatcherCache
Dispatcher.run!
DispatcherCache.__hash
DispatcherCache.add_hash_cache!
DispatcherCache.arg_hash
DispatcherCache.dep_hash
DispatcherCache.get_compressor
DispatcherCache.get_node
DispatcherCache.load_hashchain
DispatcherCache.node_hash
DispatcherCache.root_nodes
DispatcherCache.source_hash
DispatcherCache.store_hashchain
DispatcherCache.wrap_to_load!
DispatcherCache.wrap_to_store!
DispatcherCache.DispatcherCache
— Module.DispatcherCache.jl is a hash-chain
optimizer for Dispatcher delayed execution graphs. It employes a hashing mechanism to check wether the state associated to a node is the DispatchGraph
that is to be executed has already been hashed
(and hence, an output is available) or, it is new or changed. Depending on the current state (by 'state' one understands the called function source code, input arguments and other input node dependencies), the current task becomes a load-from-disk or an execute-and-store-to-disk operation. This is done is such a manner that the minimimum number of load/execute operations are performed, minimizing both persistency and computational demands.
DispatcherCache.add_hash_cache!
— Function.add_hash_cache!(graph, endpoints=[], uncacheable=[]
[; compression=DEFAULT_COMPRESSION, cachedir=DEFAULT_CACHE_DIR])
Optimizes a delayed execution graph graph::DispatchGraph
by wrapping individual nodes in load-from-disk on execute-and-store wrappers depending on the state of the disk cache and of the graph. The function modifies inplace the input graph and should be called on the same unmodified graph
after each execution and not on the modified graph.
Once the original graph is modified, calling run!
on it will, if the cache is already present, load the top most consistent key or alternatively re-run and store the outputs of nodes which have new state.
Arguments
exec::Executor
theDispatcher.jl
executorgraph::DispatchGraph
input dispatch graphendpoints::AbstractVector
leaf nodes for which caching will occur;
nodes that depend on these will not be cached. The nodes can be specified either by label of by the node object itself
- uncacheable::AbstractVector` nodes that will never be cached and
will always be executed (these nodes are still hashed and their hashes influence upstream node hashes as well)
Keyword arguments
compression::String
enables compression of the node outputs.
Available options are "none"
, for no compression, "bz2"
or "bzip2"
for BZIP compression and "gz"
or "gzip"
for GZIP compression
cachedir::String
the cache directory.
Note: This function should be used with care as it modifies the input dispatch graph. One way to handle this is to make a function that generates the dispatch graph and calling add_hash_cache!
each time on the distict, functionally identical graphs.
Dispatcher.run!
— Function.run!(exec, graph, endpoints, uncacheable=[]
[;compression=DEFAULT_COMPRESSION, cachedir=DEFAULT_CACHE_DIR])
Runs the graph::DispatchGraph
and loads or executes and stores the outputs of the nodes in the subgraph whose leaf nodes are given by endpoints
. Nodes in uncachable
are not locally cached.
Arguments
exec::Executor
theDispatcher.jl
executorgraph::DispatchGraph
input dispatch graphendpoints::AbstractVector
leaf nodes for which caching will occur;
nodes that depend on these will not be cached. The nodes can be specified either by label of by the node object itself
uncacheable::AbstractVector
nodes that will never be cached and will
always be executed (these nodes are still hashed and their hashes influence upstream node hashes as well)
Keyword arguments
compression::String
enables compression of the node outputs.
Available options are "none"
, for no compression, "bz2"
or "bzip2"
for BZIP compression and "gz"
or "gzip"
for GZIP compression
cachedir::String
The cache directory.
Examples
julia> using Dispatcher
using DispatcherCache
# Some functions
foo(x) = begin sleep(1); x end
bar(x) = begin sleep(1); x+1 end
baz(x,y) = begin sleep(1); x-y end
# Make a dispatch graph out of some operations
op1 = @op foo(1)
op2 = @op bar(2)
op3 = @op baz(op1, op2)
D = DispatchGraph(op3)
# DispatchGraph({3, 2} directed simple Int64 graph,
# NodeSet(DispatchNode[
# Op(DeferredFuture at (1,1,241),baz,"baz"),
# Op(DeferredFuture at (1,1,239),foo,"foo"),
# Op(DeferredFuture at (1,1,240),bar,"bar")]))
julia> # First run, writes results to disk (lasts 2 seconds)
result_node = [op3] # the node for which we want results
cachedir = "./__cache__" # directory does not exist
@time r = run!(AsyncExecutor(), D,
result_node, cachedir=cachedir)
println("result (first run) = $(fetch(r[1].result.value))")
# [info | Dispatcher]: Executing 3 graph nodes.
# [info | Dispatcher]: Node 1 (Op<baz, Op<foo>, Op<bar>>): running.
# [info | Dispatcher]: Node 2 (Op<foo, Int64>): running.
# [info | Dispatcher]: Node 3 (Op<bar, Int64>): running.
# [info | Dispatcher]: Node 2 (Op<foo, Int64>): complete.
# [info | Dispatcher]: Node 3 (Op<bar, Int64>): complete.
# [info | Dispatcher]: Node 1 (Op<baz, Op<foo>, Op<bar>>): complete.
# [info | Dispatcher]: All 3 nodes executed.
# 2.029992 seconds (11.53 k allocations: 1.534 MiB)
# result (first run) = -2
julia> # Secod run, loads directly the result from ./__cache__
@time r = run!(AsyncExecutor(), D,
[op3], cachedir=cachedir)
println("result (second run) = $(fetch(r[1].result.value))")
# [info | Dispatcher]: Executing 1 graph nodes.
# [info | Dispatcher]: Node 1 (Op<baz>): running.
# [info | Dispatcher]: Node 1 (Op<baz>): complete.
# [info | Dispatcher]: All 1 nodes executed.
# 0.005257 seconds (2.57 k allocations: 478.359 KiB)
# result (second run) = -2
julia> readdir(cachedir)
# 2-element Array{String,1}:
# "cache"
# "hashchain.json"
DispatcherCache.__hash
— Method.__hash(something)
Return a hexadecimal string corresponding to the hash of sum of the hashes of the value and type of something
.
Examples
julia> using DispatcherCache: __hash
__hash([1,2,3])
# "f00429a0d65eb7cb"
DispatcherCache.arg_hash
— Method.arg_hash(node)
Hash the data arguments (in certain cases configuration fields) of the dispatch node
.
Examples
julia> using DispatcherCache: arg_hash, __hash
f(x) = println("$x")
arg = "argument"
node = @op f(arg)
arg_hash(node)
# "d482b7b1b5357c33"
julia> arg_hash(node) == __hash(hash(nothing) + hash(arg) + hash(typeof(arg)))
# true
DispatcherCache.dep_hash
— Method.dep_hash(node, key2hash)
Hash the dispatch node dependencies of node
using their existing hashes if possible.
DispatcherCache.get_compressor
— Method.get_compressor(compression, action)
Return a TranscodingStreams
compatible compressor or decompressor based on the values of compression
and action
.
DispatcherCache.get_node
— Method.get_node(graph, label)
Returns the node corresponding to label
.
DispatcherCache.load_hashchain
— Function.load_hashchain(cachedir [; compression=DEFAULT_COMPRESSION])
Loads the hashchain file found in the directory cachedir
. Before loading, the compression
value is checked against the one stored in the hashchain file (both have to match). If the file does not exist, it is created.
DispatcherCache.node_hash
— Method.node_hash(node, key2hash)
Calculates and returns the hash corresponding to a Dispatcher
task graph node i.e. DispatchNode
using the hashes of its dependencies, input arguments and source code of the function associated to the node
. Any available hashes are taken from key2hash
.
DispatcherCache.root_nodes
— Method.root_nodes(graph::DispatchGraph) ->
Return an iterable of all nodes in the graph with no input edges.
DispatcherCache.source_hash
— Method.source_hash(node)
Hashes the lowered representation of the source code of the function associated with node
. Useful for Op
nodes, the other node types do not have any associated source code.
Examples
julia> using DispatcherCache: source_hash
f(x) = x + 1
g(x) = begin
#comment
x + 1
end
node_f = @op f(1)
node_g = @op g(10)
# Test
source_hash(node_f) == source_hash(node_g)
# true
DispatcherCache.store_hashchain
— Function.store_hashchain(hashchain, cachedir=DEFAULT_CACHE_DIR [; compression=DEFAULT_COMPRESSION, version=1])
Stores the hashchain
object in a file named DEFAULT_HASHCHAIN_FILENAME
, in the directory cachedir
. The values of compression
and version
are stored as well in the file.
DispatcherCache.wrap_to_load!
— Method.wrap_to_load!(updates, node, nodehash;
cachedir=DEFAULT_CACHE_DIR,
compression=DEFAULT_COMPRESSION)
Generates a new dispatch node that corresponds to node::DispatchNode
and which loads a file from the cachedir
cache directory whose name and extension depend on nodehash
and compression
and contents are the output of node
. The generated node is added to updates
which maps node
to the generated node.
DispatcherCache.wrap_to_store!
— Method.wrap_to_store!(graph, node, nodehash;
cachedir=DEFAULT_CACHE_DIR,
compression=DEFAULT_COMPRESSION,
skipcache=false)
Generates a new Op
node that corresponds to node::DispatchNode
and which stores the output of the execution of node
in a file whose name and extension depend on nodehash
and compression
. The generated node is added to updates
which maps node
to the generated node. The node output is stored in cachedir
. The caching is skipped if skipcache
is true
.