def __contains__(self, item: int) -> bool:
return item >= self.src and item < (self.src + self.count)
+ def rcontains(self, item: int) -> bool:
+ return item >= self.dst and item < (self.dst + self.count)
+
+ def ritem(self, key: int) -> int:
+ if self.rcontains(key):
+ return self.src + (key - self.dst)
+ return key
+
class Map:
def __init__(self, name, ranges=[]):
return range[key]
return key
+ def ritem(self, key: int) -> int:
+ for xrange in self.ranges:
+ if xrange.rcontains(key):
+ return xrange.ritem(key)
+ return key
+
class Almanac:
def __init__(self, seeds, maps):
it = iter(seeds)
self.seedranges = [range(x[0], x[0] + x[1]) for x in zip(it, it)]
+ def valid_seed(self, seed: int) -> bool:
+ for seedrange in self.seedranges:
+ if seed in seedrange:
+ return True
+ return False
+
@property
def allseeds(self):
for seedrange in self.seedranges:
value = m[value]
return value
+ def rlookup(self, location: int) -> int:
+ value = location
+ for m in reversed(self.maps):
+ value = m.ritem(value)
+ return value
+
@staticmethod
def fromstream(stream) -> 'Almanac':
maps = []
for mapspec in data[1:]:
try:
name, parts = mapspec.split(": ")
- specs = [spec.split(" ") for spec in parts.split(", ") if spec != ""]
+ specs = [spec.split(" ")
+ for spec in parts.split(", ") if spec != ""]
values = [[int(v) for v in spec] for spec in specs]
ranges = [Range(*v) for v in values]
maps.append(Map(name, ranges))
import sys
import argparse
import concurrent.futures
-from typing import List
from almanac import Almanac
-def part2(almanac):
- for x in almanac.seedranges:
- print(x)
-
+def part2_slow(almanac):
def worker(seeds: range) -> int:
return min(almanac.location(seed) for seed in seeds)
print(r.result())
+# If the worker returns the start value, it's probably not in this block
+# If the number is within a block, its likely the answer and we can cancel
+# the remaining calculations for lower blocks as we no longer need to wait
+# for them (completed).
+# note: fastest with pypy
+def part2(almanac):
+ global completed
+ completed = False
+
+ def worker(almanac, start: int, count: int):
+ global completed
+ for location in range(start, start + count):
+ seed = almanac.rlookup(location)
+ if almanac.valid_seed(seed):
+ return location
+ if completed:
+ return None
+ return None
+
+ final = []
+ with concurrent.futures.ThreadPoolExecutor() as exec:
+ starts = [x for x in range(0, 10_000_000, 1_000_000)]
+ futures = [
+ exec.submit(worker, almanac, start, 1_000_000) for start in starts]
+ for res in concurrent.futures.as_completed(futures):
+ print(res.result())
+ if res.result():
+ final.append(res.result())
+ if res.result() not in starts and res.result() != None:
+ [f.cancel() for f in futures]
+ completed = True
+ print(f"part 2: {min(final)}")
+
+
def main(args=None):
parser = argparse.ArgumentParser(description="AoC 2023 day 5")
parser.add_argument('filename', type=str, help="input path")
locations = [almanac.location(seed) for seed in almanac.seeds]
print(f"part 1: {min(locations)}")
- #part2 = min(almanac.location(seed) for seed in almanac.allseeds)
- #print(f"part 2: {part2}")
part2(almanac)
return 0
self.assertFalse(97 in range)
self.assertFalse(100 in range)
+ def test_rcontains(self):
+ xrange = Range(50, 98, 2)
+ self.assertSequenceEqual(
+ [False, True, True, False, False],
+ [xrange.rcontains(x) for x in range(49, 54)])
+
def test_map_value(self):
- r = Range(50, 98, 2)
+ xrange = Range(50, 98, 2)
expected = [97, 50, 51, 100]
self.assertSequenceEqual(
expected,
- [r[n] for n in range(97, 101)]
- )
+ [xrange[n] for n in range(97, 101)])
+
+ def test_ritem(self):
+ xrange = Range(50, 98, 2)
+ self.assertEqual(98, xrange.ritem(50))
+ self.assertSequenceEqual(
+ [97, 50, 51, 100],
+ [xrange[n] for n in range(97, 101)])
class TestMap(unittest.TestCase):
[81, 14, 57, 13],
[soil[seed] for seed in [79, 14, 55, 13]])
+ def test_map_reverse_item(self):
+ soil = Map("test", [Range(50, 98, 2), Range(52, 50, 48)])
+ self.assertEqual(98, soil.ritem(50))
+ self.assertEqual(100, soil.ritem(100))
+ self.assertEqual(50, soil.ritem(52))
+ self.assertSequenceEqual(
+ [79, 14, 55, 13],
+ [soil.ritem(rseed) for rseed in [81, 14, 57, 13]])
+
class TestAlmanac(unittest.TestCase):
def test_load(self):
locations = [almanac.location(seed) for seed in almanac.seeds]
self.assertEqual(35, min(locations))
+ def test_location_to_seed(self):
+ with open("data/test_input") as input:
+ almanac = Almanac.fromstream(input)
+ seeds = [almanac.rlookup(loc) for loc in [82, 43, 86, 35]]
+ self.assertSequenceEqual(
+ [79, 14, 55, 13], seeds)
+
def test_seed_range(self):
with open("data/test_input") as input:
almanac = Almanac.fromstream(input)