API Reference

DispatcherCache.jl is a hash-chain optimizer for Dispatcher delayed execution graphs. It employes a hashing mechanism to check wether the state associated to a node is the DispatchGraph that is to be executed has already been hashed (and hence, an output is available) or, it is new or changed. Depending on the current state (by 'state' one understands the called function source code, input arguments and other input node dependencies), the current task becomes a load-from-disk or an execute-and-store-to-disk operation. This is done is such a manner that the minimimum number of load/execute operations are performed, minimizing both persistency and computational demands.

source
add_hash_cache!(graph, endpoints=[], uncacheable=[]
                [; compression=DEFAULT_COMPRESSION, cachedir=DEFAULT_CACHE_DIR])

Optimizes a delayed execution graph graph::DispatchGraph by wrapping individual nodes in load-from-disk on execute-and-store wrappers depending on the state of the disk cache and of the graph. The function modifies inplace the input graph and should be called on the same unmodified graph after each execution and not on the modified graph.

Once the original graph is modified, calling run! on it will, if the cache is already present, load the top most consistent key or alternatively re-run and store the outputs of nodes which have new state.

Arguments

  • exec::Executor the Dispatcher.jl executor
  • graph::DispatchGraph input dispatch graph
  • endpoints::AbstractVector leaf nodes for which caching will occur;

nodes that depend on these will not be cached. The nodes can be specified either by label of by the node object itself

  • uncacheable::AbstractVector` nodes that will never be cached and

will always be executed (these nodes are still hashed and their hashes influence upstream node hashes as well)

Keyword arguments

  • compression::String enables compression of the node outputs.

Available options are "none", for no compression, "bz2" or "bzip2" for BZIP compression and "gz" or "gzip" for GZIP compression

  • cachedir::String the cache directory.

Note: This function should be used with care as it modifies the input dispatch graph. One way to handle this is to make a function that generates the dispatch graph and calling add_hash_cache! each time on the distict, functionally identical graphs.

source
Dispatcher.run!Function.
run!(exec, graph, endpoints, uncacheable=[]
     [;compression=DEFAULT_COMPRESSION, cachedir=DEFAULT_CACHE_DIR])

Runs the graph::DispatchGraph and loads or executes and stores the outputs of the nodes in the subgraph whose leaf nodes are given by endpoints. Nodes in uncachable are not locally cached.

Arguments

  • exec::Executor the Dispatcher.jl executor
  • graph::DispatchGraph input dispatch graph
  • endpoints::AbstractVector leaf nodes for which caching will occur;

nodes that depend on these will not be cached. The nodes can be specified either by label of by the node object itself

  • uncacheable::AbstractVector nodes that will never be cached and will

always be executed (these nodes are still hashed and their hashes influence upstream node hashes as well)

Keyword arguments

  • compression::String enables compression of the node outputs.

Available options are "none", for no compression, "bz2" or "bzip2" for BZIP compression and "gz" or "gzip" for GZIP compression

  • cachedir::String The cache directory.

Examples

julia> using Dispatcher
       using DispatcherCache

       # Some functions
       foo(x) = begin sleep(1); x end
       bar(x) = begin sleep(1); x+1 end
       baz(x,y) = begin sleep(1); x-y end

       # Make a dispatch graph out of some operations
       op1 = @op foo(1)
       op2 = @op bar(2)
       op3 = @op baz(op1, op2)
       D = DispatchGraph(op3)
# DispatchGraph({3, 2} directed simple Int64 graph,
# NodeSet(DispatchNode[
# Op(DeferredFuture at (1,1,241),baz,"baz"),
# Op(DeferredFuture at (1,1,239),foo,"foo"),
# Op(DeferredFuture at (1,1,240),bar,"bar")]))

julia> # First run, writes results to disk (lasts 2 seconds)
       result_node = [op3]  # the node for which we want results
       cachedir = "./__cache__"  # directory does not exist
       @time r = run!(AsyncExecutor(), D,
                      result_node, cachedir=cachedir)
       println("result (first run) = $(fetch(r[1].result.value))")
# [info | Dispatcher]: Executing 3 graph nodes.
# [info | Dispatcher]: Node 1 (Op<baz, Op<foo>, Op<bar>>): running.
# [info | Dispatcher]: Node 2 (Op<foo, Int64>): running.
# [info | Dispatcher]: Node 3 (Op<bar, Int64>): running.
# [info | Dispatcher]: Node 2 (Op<foo, Int64>): complete.
# [info | Dispatcher]: Node 3 (Op<bar, Int64>): complete.
# [info | Dispatcher]: Node 1 (Op<baz, Op<foo>, Op<bar>>): complete.
# [info | Dispatcher]: All 3 nodes executed.
#   2.029992 seconds (11.53 k allocations: 1.534 MiB)
# result (first run) = -2

julia> # Secod run, loads directly the result from ./__cache__
       @time r = run!(AsyncExecutor(), D,
                      [op3], cachedir=cachedir)
       println("result (second run) = $(fetch(r[1].result.value))")
# [info | Dispatcher]: Executing 1 graph nodes.
# [info | Dispatcher]: Node 1 (Op<baz>): running.
# [info | Dispatcher]: Node 1 (Op<baz>): complete.
# [info | Dispatcher]: All 1 nodes executed.
#   0.005257 seconds (2.57 k allocations: 478.359 KiB)
# result (second run) = -2

julia> readdir(cachedir)
# 2-element Array{String,1}:
#  "cache"
#  "hashchain.json"
source
__hash(something)

Return a hexadecimal string corresponding to the hash of sum of the hashes of the value and type of something.

Examples

julia> using DispatcherCache: __hash
       __hash([1,2,3])
# "f00429a0d65eb7cb"
source
arg_hash(node)

Hash the data arguments (in certain cases configuration fields) of the dispatch node.

Examples

julia> using DispatcherCache: arg_hash, __hash
       f(x) = println("$x")
       arg = "argument"
       node = @op f(arg)
       arg_hash(node)
# "d482b7b1b5357c33"

julia> arg_hash(node) == __hash(hash(nothing) + hash(arg) + hash(typeof(arg)))
# true
source
dep_hash(node, key2hash)

Hash the dispatch node dependencies of node using their existing hashes if possible.

source
get_compressor(compression, action)

Return a TranscodingStreams compatible compressor or decompressor based on the values of compression and action.

source
get_node(graph, label)

Returns the node corresponding to label.

source
load_hashchain(cachedir [; compression=DEFAULT_COMPRESSION])

Loads the hashchain file found in the directory cachedir. Before loading, the compression value is checked against the one stored in the hashchain file (both have to match). If the file does not exist, it is created.

source
node_hash(node, key2hash)

Calculates and returns the hash corresponding to a Dispatcher task graph node i.e. DispatchNode using the hashes of its dependencies, input arguments and source code of the function associated to the node. Any available hashes are taken from key2hash.

source
root_nodes(graph::DispatchGraph) ->

Return an iterable of all nodes in the graph with no input edges.

source
source_hash(node)

Hashes the lowered representation of the source code of the function associated with node. Useful for Op nodes, the other node types do not have any associated source code.

Examples

julia> using DispatcherCache: source_hash
       f(x) = x + 1
       g(x) = begin
               #comment
               x + 1
              end
       node_f = @op f(1)
       node_g = @op g(10)
       # Test
	   source_hash(node_f) == source_hash(node_g)
# true
source
store_hashchain(hashchain, cachedir=DEFAULT_CACHE_DIR [; compression=DEFAULT_COMPRESSION, version=1])

Stores the hashchain object in a file named DEFAULT_HASHCHAIN_FILENAME, in the directory cachedir. The values of compression and version are stored as well in the file.

source
wrap_to_load!(updates, node, nodehash;
              cachedir=DEFAULT_CACHE_DIR,
              compression=DEFAULT_COMPRESSION)

Generates a new dispatch node that corresponds to node::DispatchNode and which loads a file from the cachedir cache directory whose name and extension depend on nodehash and compression and contents are the output of node. The generated node is added to updates which maps node to the generated node.

source
wrap_to_store!(graph, node, nodehash;
               cachedir=DEFAULT_CACHE_DIR,
               compression=DEFAULT_COMPRESSION,
               skipcache=false)

Generates a new Op node that corresponds to node::DispatchNode and which stores the output of the execution of node in a file whose name and extension depend on nodehash and compression. The generated node is added to updates which maps node to the generated node. The node output is stored in cachedir. The caching is skipped if skipcache is true.

source