手書き数字の認識をしています。
8かける8のマスで0から9のラベルを判別します。
使うのは10枚の訓練データのみです。
性能は正解率が60パーセントです。
from collections import defaultdict
import matplotlib.pyplot as plt
import numpy as np
import random
from sklearn.datasets import load_digits
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler
# =========================================================
# 1. 環境とデータの準備(手書き数字 64次元データ)
# =========================================================
np.random.seed(42)
digits = load_digits()
X_raw, y_raw = digits.data, digits.target
# 生態系の変化を見やすくするため、400サンプルを抽出
total_indices = np.random.choice(len(X_raw), 400, replace=False)
X_400, y_400 = X_raw[total_indices], y_raw[total_indices]
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_400)
# 少数サンプルを訓練用に(残りをテスト環境に)
train_idx = []
for c in range(10):
c_idxs = np.where(y_400 == c)[0]
train_idx.extend(c_idxs[:1]) # 各クラス1枚ずつ
test_idx = [i for i in range(400) if i not in train_idx]
X_train, y_train = X_scaled[train_idx], y_400[train_idx]
X_test, y_test = X_scaled[test_idx], y_400[test_idx]
NUM_CLASSES = 10
INPUT_DIM = 64
# --- ハイパーパラメータ ---
INIT_CELLS = 680
MAX_CELLS = 10000
MIN_CELLS = 80
THOUGHT_STEPS = 3
lr_base = 0.30
lr_link_base = 0.10
DIVISION_THRESHOLD = 0.85 # 構造過学習を防ぐため、少し厳格化
DEATH_THRESHOLD = 0.08 # 淘汰圧を少し強める
GRAPH_DECAY = 0.90
EDGE_PRUNE_THRESHOLD = 0.02
epochs =6 # 現象を観察するため、5サイクル回します
# =========================================================
# 2. 視覚記憶細胞(VisionMemoryCell)の定義
# =========================================================
class VisionMemoryCell:
def __init__(self, cell_id):
self.id = cell_id
self.w_in = np.random.normal(0, 1.0, INPUT_DIM) # 蜘蛛の巣初期化で再上書きされます
self.A = np.random.normal(0, 0.1, NUM_CLASSES)
self.links = {}
self.visits = 0.0
self.energy = 0.5
self.recent_activity = 0.0
def cosine_similarity(w, x):
return np.dot(w, x) / (np.linalg.norm(w) * np.linalg.norm(x) + 1e-9)
def softmax(x, temp=1.0):
x = np.array(x) / temp
x = x - np.max(x)
e = np.exp(x)
return e / (np.sum(e) + 1e-12)
# =========================================================
# 🧬 3. 代謝・新陳代謝コア(分裂コストの厳格化)
# =========================================================
def evolve_and_prune_space(memory_space, outer_graph):
# 睡眠・自然減衰
for cell in memory_space:
cell.energy *= 0.92
cell.recent_activity *= 0.50
if cell.energy < 0.01: cell.energy = 0.01
# --- 分裂フェーズ(ユーザーさんの提案によるコスト厳格化) ---
new_cells = []
current_max_id = max([c.id for c in memory_space]) if memory_space else 0
for cell in memory_space:
if len(memory_space) + len(new_cells) >= MAX_CELLS: break
# 分裂条件をエネルギーかつ、 visits累積に設定
if cell.energy > DIVISION_THRESHOLD and cell.visits > 15:
child = VisionMemoryCell(current_max_id + len(new_cells) + 1)
# 子は親の特性に少し変異(ノイズ)を乗せる
child.w_in = np.clip(cell.w_in + np.random.normal(0, 0.05, INPUT_DIM), -3.0, 3.0)
child.A = cell.A.copy()
for k, v in cell.links.items(): child.links[k] = v * 0.5
# 【ユーザーさんの設計思想】親の資源を大きく削り、子を未熟な状態からスタートさせる
cell.energy *= 0.3
cell.visits *= 0.5
child.energy = 0.5 # 子の初期エネルギーは低い
new_cells.append(child)
memory_space.extend(new_cells)
# --- 淘汰 & 外側マクログラフの再マッピング ---
survived_space = []
id_map = {}
for cell in memory_space:
if len(survived_space) > MIN_CELLS:
if cell.energy < DEATH_THRESHOLD and cell.visits < 1.5:
continue
new_id = len(survived_space)
id_map[cell.id] = new_id
cell.id = new_id
survived_space.append(cell)
for cell in survived_space:
cell.links = {id_map[old_id]: w * 0.85 for old_id, w in cell.links.items() if old_id in id_map and w > 0.05}
new_graph = defaultdict(dict)
for u, neighbors in outer_graph.items():
new_u = ("cell", id_map[u[1]]) if u[0] == "cell" and u[1] in id_map else u
if u[0] == "cell" and u[1] not in id_map: continue
for v, weight in neighbors.items():
decayed_weight = weight * GRAPH_DECAY
if decayed_weight < EDGE_PRUNE_THRESHOLD: continue
new_v = ("cell", id_map[v[1]]) if v[0] == "cell" and v[1] in id_map else v
if v[0] == "cell" and v[1] not in id_map: continue
new_graph[new_u][new_v] = decayed_weight
return survived_space, new_graph
# =========================================================
# 🔄 4. 融合生命ダイナミクス(思考の歩行・側頭抑制)
# =========================================================
def live_vision_cycle(memory_space, outer_graph, x_64d=None, target_class=None, target_recall_class=None,
sensory_drive=1.0, intrinsic_drive=0.0, top_down_drive=0.0, learning_gate=1.0):
if len(memory_space) == 0: return np.zeros(NUM_CLASSES), np.zeros(INPUT_DIM)
id_to_cell = {c.id: c for c in memory_space}
start_probabilities = np.zeros(len(memory_space))
if x_64d is not None and sensory_drive > 0.0:
sims = np.array([cosine_similarity(cell.w_in, x_64d) for cell in memory_space])
start_probabilities += softmax(sims, temp=0.1) * sensory_drive
if intrinsic_drive > 0.0:
activity_weights = np.array([cell.energy * (cell.recent_activity + 0.05) for cell in memory_space])
start_probabilities += (activity_weights / (np.sum(activity_weights) + 1e-12)) * intrinsic_drive
if target_recall_class is not None and top_down_drive > 0.0:
c_node = ("class", target_recall_class)
for next_node, weight in outer_graph.get(c_node, {}).items():
if next_node[0] == "cell" and next_node[1] in id_to_cell:
start_probabilities[next_node[1]] += weight * top_down_drive
if np.sum(start_probabilities) == 0:
start_probabilities = np.ones(len(memory_space)) / len(memory_space)
start_probabilities /= np.sum(start_probabilities)
current_cell_id = np.random.choice(list(id_to_cell.keys()), p=start_probabilities)
path = [current_cell_id]
collected_signals = [id_to_cell[current_cell_id].A.copy()]
imagined_pixels = [id_to_cell[current_cell_id].w_in.copy()]
# --- 思考がグラフを歩く(Graph Walk) ---
for step in range(THOUGHT_STEPS - 1):
current_cell = id_to_cell[current_cell_id]
current_cell.recent_activity += sensory_drive * 1.0
all_ids = list(id_to_cell.keys())
candidates = set(np.random.choice(all_ids, min(25, len(all_ids)), replace=False))
candidates.update(current_cell.links.keys())
scores = []
c_ids = []
for cid in candidates:
if cid in path or cid not in id_to_cell: continue
cell = id_to_cell[cid]
score = current_cell.links.get(cid, 0.0) * 2.5
if x_64d is not None:
score += 2.0 * cosine_similarity(cell.w_in, x_64d) * sensory_drive
score += 0.5 * cosine_similarity(current_cell.A, cell.A)
scores.append(score)
c_ids.append(cid)
if not scores: break
probs = softmax(scores, temp=0.3)
current_cell_id = np.random.choice(c_ids, p=probs)
path.append(current_cell_id)
collected_signals.append(id_to_cell[current_cell_id].A.copy())
imagined_pixels.append(id_to_cell[current_cell_id].w_in.copy())
final_perception = np.mean(np.stack(collected_signals), axis=0) if collected_signals else np.zeros(NUM_CLASSES)
final_image = np.mean(np.stack(imagined_pixels), axis=0) if imagined_pixels else np.zeros(INPUT_DIM)
sensory_factor = sensory_drive * learning_gate
intrinsic_factor = intrinsic_drive * learning_gate
inferred_class = np.argmax(final_perception)
target = target_class if target_class is not None else inferred_class
target_vec = np.zeros(NUM_CLASSES)
target_vec[target] = 1.0
# 報酬の決定(厳格化:専門化を促す)
reward = min(1.0 / (np.linalg.norm(target_vec - final_perception) + 0.15), 2.5)
# --- ヘブ則的な学習と、専門化のための側頭抑制(Lateral Inhibition) ---
if x_64d is not None and sensory_factor > 0.0:
# 1. 経路上の細胞の学習
for cid in path:
if cid in id_to_cell:
cell = id_to_cell[cid]
cell.visits += sensory_factor
cell.energy = min(1.0, cell.energy + reward * 0.15 * sensory_factor)
cell.w_in += (lr_base * 0.5 * sensory_factor) * (x_64d - cell.w_in)
cell.A = (1.0 - lr_base * sensory_factor) * cell.A + (lr_base * sensory_factor) * target_vec
# 2. 側頭抑制(同じ情報への過剰適合・重複増殖を防ぐ)
# 歩いた細胞の近隣リンクにありながら、今回選ばれなかった細胞のエネルギーを少し削る
for cid in path:
if cid in id_to_cell:
for neighbor_id in id_to_cell[cid].links.keys():
if neighbor_id not in path and neighbor_id in id_to_cell:
id_to_cell[neighbor_id].energy *= 0.95 # 側頭抑制によるエネルギー減衰
total_learning_power = (sensory_factor * reward + intrinsic_factor * 0.2)
if total_learning_power > 0.0:
for i in range(len(path) - 1):
u, v = path[i], path[i+1]
if u in id_to_cell and v in id_to_cell:
id_to_cell[u].links[v] = id_to_cell[u].links.get(v, 0.1) + (lr_link_base * total_learning_power)
c_node = ("class", int(target))
first_cell_node = ("cell", path[0])
last_cell_node = ("cell", path[-1])
for i in range(len(path) - 1):
f_node = ("cell", path[i])
t_node = ("cell", path[i+1])
outer_graph[f_node][t_node] = outer_graph[f_node].get(t_node, 0.0) + total_learning_power
outer_graph[t_node][f_node] = outer_graph[t_node].get(f_node, 0.0) + total_learning_power
# クラスから細胞、細胞からクラスへの「双方向エッジ」を正確にマッピング
outer_graph[first_cell_node][c_node] = outer_graph[first_cell_node].get(c_node, 0.0) + total_learning_power
outer_graph[c_node][first_cell_node] = outer_graph[c_node].get(first_cell_node, 0.0) + total_learning_power
outer_graph[last_cell_node][c_node] = outer_graph[last_cell_node].get(c_node, 0.0) + total_learning_power
outer_graph[c_node][last_cell_node] = outer_graph[c_node].get(last_cell_node, 0.0) + total_learning_power
return final_perception, final_image
# =========================================================
# 👥 5. アンサンブル・蜘蛛の巣トポロジー管理(ミーム交配の実装)
# =========================================================
class EnsembleVisionEcosystem:
def __init__(self, n_estimators=5, total_cells=1000):
self.n_estimators = n_estimators
self.models = []
NUM_RINGS = 5 # 蜘蛛の巣の階層数
for _ in range(n_estimators):
space = []
graph = defaultdict(dict)
# 密度の調整:外側(感覚層)ほど細胞を多く、内側(概念層)ほど少なく
cells_per_ring = []
remaining = total_cells
for r in range(NUM_RINGS):
if r == NUM_RINGS - 1:
cells_per_ring.append(remaining)
else:
count = int(total_cells * (0.45 / (1.6**r)))
cells_per_ring.append(count)
remaining -= count
cell_id_counter = 0
ring_nodes = {}
# --- 1. 蜘蛛の巣の細胞配置と受容野初期化 ---
for ring_idx, n_cells in enumerate(cells_per_ring):
ring_nodes[ring_idx] = []
for i in range(n_cells):
cell = VisionMemoryCell(cell_id_counter)
# 外周(感覚層)ほど実際の数字に近い初期受容野を持たせる
sensory_proximity = 1.0 - (ring_idx / NUM_RINGS)
if sensory_proximity > 0.2:
ref_idx = np.random.choice(len(X_train))
base_pattern = X_train[ref_idx]
cell.w_in = base_pattern * sensory_proximity + np.random.normal(0, 0.3, INPUT_DIM) * (1.0 - sensory_proximity)
cell.A[y_train[ref_idx]] = 0.5 * sensory_proximity
space.append(cell)
ring_nodes[ring_idx].append(cell_id_counter)
cell_id_counter += 1
# --- 2. 蜘蛛の巣の糸(リンク)の構築 ---
id_to_cell = {c.id: c for c in space}
for ring_idx in range(NUM_RINGS):
nodes = ring_nodes[ring_idx]
num_nodes = len(nodes)
if num_nodes == 0: continue
for i in range(num_nodes):
curr_id = nodes[i]
# (a) 同心円の横の糸(横方向リンク)
next_id = nodes[(i + 1) % num_nodes]
id_to_cell[curr_id].links[next_id] = 0.4
id_to_cell[next_id].links[curr_id] = 0.4
# (b) 放射方向の縦の糸(抽象化への階層リンク)
if ring_idx < NUM_RINGS - 1:
inner_nodes = ring_nodes[ring_idx + 1]
if len(inner_nodes) > 0:
inner_idx = int(i * len(inner_nodes) / num_nodes)
inner_id = inner_nodes[inner_idx]
id_to_cell[curr_id].links[inner_id] = 0.6
id_to_cell[inner_id].links[curr_id] = 0.6
self.models.append({'space': space, 'graph': graph})
def train_cycle(self, X, y, phase="day"):
for model in self.models:
# 脳の個性を引き出すブートストラップサンプリング
indices = np.random.choice(len(X), len(X), replace=True)
for idx in indices:
if phase == "day":
live_vision_cycle(model['space'], model['graph'], x_64d=X[idx],
target_class=y[idx], sensory_drive=1.0, intrinsic_drive=0.0)
elif phase == "evening":
live_vision_cycle(model['space'], model['graph'], x_64d=X[idx],
target_class=y[idx], sensory_drive=0.3, intrinsic_drive=0.7)
def sleep_cycle(self, steps=60):
for model in self.models:
for _ in range(steps):
live_vision_cycle(model['space'], model['graph'], x_64d=None,
target_class=None, sensory_drive=0.0, intrinsic_drive=1.0)
def cultural_crossover(self, crossover_rate=0.2):
"""異なる脳の間で、最も洗練されたイデア(ミーム)を交換する"""
elite_cells_per_model = []
for m_idx, model in enumerate(self.models):
space = model['space']
if not space: continue
# 最深部(Ring 4に相当する後半10%のコア層)からエリートを選別
core_start_idx = int(len(space) * 0.9)
core_cells = space[core_start_idx:]
if not core_cells: continue
# エネルギーが高く、キャラクター(特定のクラスへの確信)が立っている細胞をスカウト
elite_cell = max(core_cells, key=lambda c: (c.energy * np.max(c.A)))
elite_cells_per_model.append((m_idx, elite_cell))
# 集団間での文化伝播(シナプス感染)
for src_idx, src_cell in elite_cells_per_model:
target_indices = [i for i in range(self.n_estimators) if i != src_idx]
if not target_indices: continue
dst_idx = random.choice(target_indices)
dst_model = self.models[dst_idx]
dst_core_cells = dst_model['space'][int(len(dst_model['space']) * 0.9):]
if not dst_core_cells: continue
# 伝播先の脳の中で、最も迷っている(エネルギーの低い)細胞にミームをブレンドする
weak_cell = min(dst_core_cells, key=lambda c: c.energy)
# 知識の混ざり合い
weak_cell.w_in = (1.0 - crossover_rate) * weak_cell.w_in + crossover_rate * src_cell.w_in
weak_cell.A = (1.0 - crossover_rate) * weak_cell.A + crossover_rate * src_cell.A
weak_cell.energy = max(weak_cell.energy, src_cell.energy * 0.8) # 文化刺激による活性化
def evolve(self):
for model in self.models:
model['space'], model['graph'] = evolve_and_prune_space(model['space'], model['graph'])
def predict(self, X, run_evolution=True):
"""
推論を実行しつつ、その入力データにリアルタイムに適応して
細胞を更新・分裂・淘汰させる(リアルタイム・オンライン学習モード)
"""
y_pred = []
for idx in range(len(X)):
ensemble_perceptions = []
# 各ベースモデル(脳)がデータに反応し、同時に学習・更新を行う
for model in self.models:
# 【変更点】
# 1. target_class=None にすることで、自分の推論結果をターゲット(疑似ラベル)にする
# 2. learning_gate=1.0 にして、推論時もヘブ則や側頭抑制を完全に作動させる
perception, _ = live_vision_cycle(
model['space'], model['graph'], x_64d=X[idx],
target_class=None, # 自身の推論(inferred_class)で自己学習
sensory_drive=1.0,
intrinsic_drive=0.1,
learning_gate=1.0 # ゲートを開放!推論時も動的に更新
)
ensemble_perceptions.append(perception)
# アンサンブルとしての最終決定
avg_perception = np.mean(ensemble_perceptions, axis=0)
final_class = np.argmax(avg_perception)
y_pred.append(final_class)
# 【変更点】細胞を増やす・間引く(新陳代謝の実行)
# サンプルごと(または数サンプルごと)に代謝を回すことで、推論中にリアルタイムに細胞が増減します
if run_evolution and (idx + 1) % 10 == 0: # ここでは10サンプルごとに新陳代謝を実行
self.evolve()
return y_pred
def get_average_cell_count(self):
return int(np.mean([len(m['space']) for m in self.models]))
# =========================================================
# 🚀 6. 巡礼ループの実行
# =========================================================
print("=== 蜘蛛の巣ウェブ型・側頭抑制&ミーム交配システム 起動 ===")
ensemble = EnsembleVisionEcosystem(n_estimators=5, total_cells=INIT_CELLS)
for cycle in range(epochs):
# ☀️ 昼
ensemble.train_cycle(X_train, y_train, phase="day")
# 🌆 夕
ensemble.train_cycle(X_train, y_train, phase="evening")
# 🌙 夜(夢)
ensemble.sleep_cycle(steps=60)
# 🌌 夢の最深部で「ミーム文化的交配」が発生
ensemble.cultural_crossover(crossover_rate=0.2)
# 🧬 新陳代謝(分裂コスト制限 & 淘汰)
ensemble.evolve()
# 📊 テスト
y_pred = ensemble.predict(X_test)
acc = accuracy_score(y_test, y_pred)
avg_cells = ensemble.get_average_cell_count()
print(f"Cycle {cycle+1:02d}/{epochs} | 平均現存細胞数: {avg_cells:>3} | アンサンブル正解率: {acc:.4f}")
# =========================================================
# 7. ダッシュボード:【 空想状態(Daydream) 】の可視化
# =========================================================
print("\n[Generating Daydream Images from the First Evolved Ecosystem...]")
fig, axes = plt.subplots(2, 5, figsize=(12, 5))
fig.suptitle("Daydreaming Mode (Top-Down Drive via Double Edges) - Model #1\nVisualizing the 'Ideals' of digits polished by Structural Control", fontsize=14)
first_model = ensemble.models[0]
for cls in range(10):
_, daydream_image = live_vision_cycle(first_model['space'], first_model['graph'],
sensory_drive=0.0, intrinsic_drive=0.2, top_down_drive=1.0,
target_recall_class=cls, learning_gate=0.0)
ax = axes[cls // 5, cls % 5]
ax.imshow(daydream_image.reshape(8, 8), cmap="bone")
ax.set_title(f"Concept: {cls}")
ax.axis("off")
plt.tight_layout()
plt.show()
第二版(このコードは精度を保証しません)
from collections import defaultdict
import matplotlib.pyplot as plt
import numpy as np
import random
from sklearn.datasets import load_digits
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler
# =========================================================
# 1. 環境とデータの準備(手書き数字 64次元データ)
# =========================================================
np.random.seed(42)
digits = load_digits()
X_raw, y_raw = digits.data, digits.target
# 生態系の変化を見やすくするため、400サンプルを抽出
total_indices = np.random.choice(len(X_raw), 400, replace=False)
X_400, y_400 = X_raw[total_indices], y_raw[total_indices]
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_400)
# 少数サンプルを訓練用に(残りをテスト環境に)
train_idx = []
for c in range(10):
c_idxs = np.where(y_400 == c)[0]
train_idx.extend(c_idxs[:1]) # 各クラス1枚ずつ
test_idx = [i for i in range(400) if i not in train_idx]
X_train, y_train = X_scaled[train_idx], y_400[train_idx]
X_test, y_test = X_scaled[test_idx], y_400[test_idx]
NUM_CLASSES = 10
INPUT_DIM = 64
# --- ハイパーパラメータ ---
INIT_CELLS = 690
MAX_CELLS = 10000
MIN_CELLS = 80
THOUGHT_STEPS = 8
lr_base = 0.30
lr_link_base = 0.10
DIVISION_THRESHOLD = 0.85 # 構造過学習を防ぐため、少し厳格化
DEATH_THRESHOLD = 0.08 # 淘汰圧を少し強める
GRAPH_DECAY = 0.90
EDGE_PRUNE_THRESHOLD = 0.02
epochs = 6 # 現象を観察するため、5サイクル回します
# =========================================================
# 2. 視覚記憶細胞(VisionMemoryCell)の定義
# =========================================================
class VisionMemoryCell:
def __init__(self, cell_id):
self.id = cell_id
self.w_in = np.random.normal(0, 1.0, INPUT_DIM) # 蜘蛛の巣初期化で再上書きされます
self.A = np.random.normal(0, 0.1, NUM_CLASSES)
self.links = {}
self.visits = 0.0
self.energy = 0.5
self.recent_activity = 0.0
def cosine_similarity(w, x):
return np.dot(w, x) / (np.linalg.norm(w) * np.linalg.norm(x) + 1e-9)
def softmax(x, temp=1.0):
x = np.array(x) / temp
x = x - np.max(x)
e = np.exp(x)
return e / (np.sum(e) + 1e-12)
# =========================================================
# 🧬 3. 代謝・新陳代謝コア(分裂コストの厳格化)
# =========================================================
def evolve_and_prune_space(memory_space, outer_graph):
# 睡眠・自然減衰
for cell in memory_space:
cell.energy *= 0.92
cell.recent_activity *= 0.50
if cell.energy < 0.01: cell.energy = 0.01
# --- 分裂フェーズ(ユーザーさんの提案によるコスト厳格化) ---
new_cells = []
current_max_id = max([c.id for c in memory_space]) if memory_space else 0
for cell in memory_space:
if len(memory_space) + len(new_cells) >= MAX_CELLS: break
# 分裂条件をエネルギーかつ、 visits累積に設定
if cell.energy > DIVISION_THRESHOLD and cell.visits > 15:
child = VisionMemoryCell(current_max_id + len(new_cells) + 1)
# 子は親の特性に少し変異(ノイズ)を乗せる
child.w_in = np.clip(cell.w_in + np.random.normal(0, 0.05, INPUT_DIM), -3.0, 3.0)
child.A = cell.A.copy()
for k, v in cell.links.items(): child.links[k] = v * 0.5
# 【ユーザーさんの設計思想】親の資源を大きく削り、子を未熟な状態からスタートさせる
cell.energy *= 0.3
cell.visits *= 0.5
child.energy = 0.4 # 子の初期エネルギーは低い
new_cells.append(child)
memory_space.extend(new_cells)
# --- 淘汰 & 外側マクログラフの再マッピング ---
survived_space = []
id_map = {}
for cell in memory_space:
if len(survived_space) > MIN_CELLS:
if cell.energy < DEATH_THRESHOLD and cell.visits < 1.5:
continue
new_id = len(survived_space)
id_map[cell.id] = new_id
cell.id = new_id
survived_space.append(cell)
for cell in survived_space:
cell.links = {id_map[old_id]: w * 0.85 for old_id, w in cell.links.items() if old_id in id_map and w > 0.05}
new_graph = defaultdict(dict)
for u, neighbors in outer_graph.items():
new_u = ("cell", id_map[u[1]]) if u[0] == "cell" and u[1] in id_map else u
if u[0] == "cell" and u[1] not in id_map: continue
for v, weight in neighbors.items():
decayed_weight = weight * GRAPH_DECAY
if decayed_weight < EDGE_PRUNE_THRESHOLD: continue
new_v = ("cell", id_map[v[1]]) if v[0] == "cell" and v[1] in id_map else v
if v[0] == "cell" and v[1] not in id_map: continue
new_graph[new_u][new_v] = decayed_weight
return survived_space, new_graph
# =========================================================
# 🔄 4. 融合生命ダイナミクス(思考の歩行・側頭抑制)
# =========================================================
def live_vision_cycle(memory_space, outer_graph, x_64d=None, target_class=None, target_recall_class=None,
sensory_drive=1.0, intrinsic_drive=0.0, top_down_drive=0.0, learning_gate=1.0):
if len(memory_space) == 0: return np.zeros(NUM_CLASSES), np.zeros(INPUT_DIM)
id_to_cell = {c.id: c for c in memory_space}
start_probabilities = np.zeros(len(memory_space))
if x_64d is not None and sensory_drive > 0.0:
sims = np.array([cosine_similarity(cell.w_in, x_64d) for cell in memory_space])
start_probabilities += softmax(sims, temp=0.1) * sensory_drive
if intrinsic_drive > 0.0:
activity_weights = np.array([cell.energy * (cell.recent_activity + 0.05) for cell in memory_space])
start_probabilities += (activity_weights / (np.sum(activity_weights) + 1e-12)) * intrinsic_drive
if target_recall_class is not None and top_down_drive > 0.0:
c_node = ("class", target_recall_class)
for next_node, weight in outer_graph.get(c_node, {}).items():
if next_node[0] == "cell" and next_node[1] in id_to_cell:
start_probabilities[next_node[1]] += weight * top_down_drive
if np.sum(start_probabilities) == 0:
start_probabilities = np.ones(len(memory_space)) / len(memory_space)
start_probabilities /= np.sum(start_probabilities)
current_cell_id = np.random.choice(list(id_to_cell.keys()), p=start_probabilities)
path = [current_cell_id]
collected_signals = [id_to_cell[current_cell_id].A.copy()]
imagined_pixels = [id_to_cell[current_cell_id].w_in.copy()]
# --- 思考がグラフを歩く(Graph Walk) ---
for step in range(THOUGHT_STEPS - 1):
current_cell = id_to_cell[current_cell_id]
current_cell.recent_activity += sensory_drive * 1.0
all_ids = list(id_to_cell.keys())
candidates = set(np.random.choice(all_ids, min(25, len(all_ids)), replace=False))
candidates.update(current_cell.links.keys())
scores = []
c_ids = []
for cid in candidates:
if cid in path or cid not in id_to_cell: continue
cell = id_to_cell[cid]
score = current_cell.links.get(cid, 0.0) * 2.5
if x_64d is not None:
score += 2.0 * cosine_similarity(cell.w_in, x_64d) * sensory_drive
score += 0.5 * cosine_similarity(current_cell.A, cell.A)
scores.append(score)
c_ids.append(cid)
if not scores: break
probs = softmax(scores, temp=0.3)
current_cell_id = np.random.choice(c_ids, p=probs)
path.append(current_cell_id)
collected_signals.append(id_to_cell[current_cell_id].A.copy())
imagined_pixels.append(id_to_cell[current_cell_id].w_in.copy())
final_perception = np.mean(np.stack(collected_signals), axis=0) if collected_signals else np.zeros(NUM_CLASSES)
final_image = np.mean(np.stack(imagined_pixels), axis=0) if imagined_pixels else np.zeros(INPUT_DIM)
sensory_factor = sensory_drive * learning_gate
intrinsic_factor = intrinsic_drive * learning_gate
inferred_class = np.argmax(final_perception)
target = target_class if target_class is not None else inferred_class
target_vec = np.zeros(NUM_CLASSES)
target_vec[target] = 1.0
# 報酬の決定(厳格化:専門化を促す)
reward = min(1.0 / (np.linalg.norm(target_vec - final_perception) + 0.15), 2.5)
# --- ヘブ則的な学習と、専門化のための側頭抑制(Lateral Inhibition) ---
if x_64d is not None and sensory_factor > 0.0:
# 1. 経路上の細胞の学習
for cid in path:
if cid in id_to_cell:
cell = id_to_cell[cid]
cell.visits += sensory_factor
cell.energy = min(1.0, cell.energy + reward * 0.15 * sensory_factor)
cell.w_in += (lr_base * 0.5 * sensory_factor) * (x_64d - cell.w_in)
cell.A = (1.0 - lr_base * sensory_factor) * cell.A + (lr_base * sensory_factor) * target_vec
# 2. 側頭抑制(同じ情報への過剰適合・重複増殖を防ぐ)
# 歩いた細胞の近隣リンクにありながら、今回選ばれなかった細胞のエネルギーを少し削る
for cid in path:
if cid in id_to_cell:
for neighbor_id in id_to_cell[cid].links.keys():
if neighbor_id not in path and neighbor_id in id_to_cell:
id_to_cell[neighbor_id].energy *= 0.95 # 側頭抑制によるエネルギー減衰
total_learning_power = (sensory_factor * reward + intrinsic_factor * 0.2)
if total_learning_power > 0.0:
for i in range(len(path) - 1):
u, v = path[i], path[i+1]
if u in id_to_cell and v in id_to_cell:
id_to_cell[u].links[v] = id_to_cell[u].links.get(v, 0.1) + (lr_link_base * total_learning_power)
c_node = ("class", int(target))
first_cell_node = ("cell", path[0])
last_cell_node = ("cell", path[-1])
for i in range(len(path) - 1):
f_node = ("cell", path[i])
t_node = ("cell", path[i+1])
outer_graph[f_node][t_node] = outer_graph[f_node].get(t_node, 0.0) + total_learning_power
outer_graph[t_node][f_node] = outer_graph[t_node].get(f_node, 0.0) + total_learning_power
# クラスから細胞、細胞からクラスへの「双方向エッジ」を正確にマッピング
outer_graph[first_cell_node][c_node] = outer_graph[first_cell_node].get(c_node, 0.0) + total_learning_power
outer_graph[c_node][first_cell_node] = outer_graph[c_node].get(first_cell_node, 0.0) + total_learning_power
outer_graph[last_cell_node][c_node] = outer_graph[last_cell_node].get(c_node, 0.0) + total_learning_power
outer_graph[c_node][last_cell_node] = outer_graph[c_node].get(last_cell_node, 0.0) + total_learning_power
return final_perception, final_image
# =========================================================
# 👥 5. アンサンブル・蜘蛛の巣トポロジー管理(ミーム交配の実装)
# =========================================================
class EnsembleVisionEcosystem:
def __init__(self, n_estimators=5, total_cells=1000):
self.n_estimators = n_estimators
self.models = []
NUM_RINGS = 5 # 蜘蛛の巣の階層数
for _ in range(n_estimators):
space = []
graph = defaultdict(dict)
# 密度の調整:外側(感覚層)ほど細胞を多く、内側(概念層)ほど少なく
cells_per_ring = []
remaining = total_cells
for r in range(NUM_RINGS):
if r == NUM_RINGS - 1:
cells_per_ring.append(remaining)
else:
count = int(total_cells * (0.45 / (1.6**r)))
cells_per_ring.append(count)
remaining -= count
cell_id_counter = 0
ring_nodes = {}
# --- 1. 蜘蛛の巣の細胞配置と受容野初期化 ---
for ring_idx, n_cells in enumerate(cells_per_ring):
ring_nodes[ring_idx] = []
for i in range(n_cells):
cell = VisionMemoryCell(cell_id_counter)
# 外周(感覚層)ほど実際の数字に近い初期受容野を持たせる
sensory_proximity = 1.0 - (ring_idx / NUM_RINGS)
if sensory_proximity > 0.2:
ref_idx = np.random.choice(len(X_train))
base_pattern = X_train[ref_idx]
cell.w_in = base_pattern * sensory_proximity + np.random.normal(0, 0.3, INPUT_DIM) * (1.0 - sensory_proximity)
cell.A[y_train[ref_idx]] = 0.5 * sensory_proximity
space.append(cell)
ring_nodes[ring_idx].append(cell_id_counter)
cell_id_counter += 1
# --- 2. 蜘蛛の巣の糸(リンク)の構築 ---
id_to_cell = {c.id: c for c in space}
for ring_idx in range(NUM_RINGS):
nodes = ring_nodes[ring_idx]
num_nodes = len(nodes)
if num_nodes == 0: continue
for i in range(num_nodes):
curr_id = nodes[i]
# (a) 同心円の横の糸(横方向リンク)
next_id = nodes[(i + 1) % num_nodes]
id_to_cell[curr_id].links[next_id] = 0.4
id_to_cell[next_id].links[curr_id] = 0.4
# (b) 放射方向の縦の糸(抽象化への階層リンク)
if ring_idx < NUM_RINGS - 1:
inner_nodes = ring_nodes[ring_idx + 1]
if len(inner_nodes) > 0:
inner_idx = int(i * len(inner_nodes) / num_nodes)
inner_id = inner_nodes[inner_idx]
id_to_cell[curr_id].links[inner_id] = 0.6
id_to_cell[inner_id].links[curr_id] = 0.6
self.models.append({'space': space, 'graph': graph})
def train_cycle(self, X, y, phase="day"):
for model in self.models:
# 脳の個性を引き出すブートストラップサンプリング
indices = np.random.choice(len(X), len(X), replace=True)
for idx in indices:
if phase == "day":
live_vision_cycle(model['space'], model['graph'], x_64d=X[idx],
target_class=y[idx], sensory_drive=1.0, intrinsic_drive=0.0)
elif phase == "evening":
live_vision_cycle(model['space'], model['graph'], x_64d=X[idx],
target_class=y[idx], sensory_drive=0.3, intrinsic_drive=0.7)
def sleep_cycle(self, steps=60):
for model in self.models:
for _ in range(steps):
live_vision_cycle(model['space'], model['graph'], x_64d=None,
target_class=None, sensory_drive=0.0, intrinsic_drive=1.0)
def cultural_crossover(self, crossover_rate=0.2):
"""異なる脳の間で、最も洗練されたイデア(ミーム)を交換する"""
elite_cells_per_model = []
for m_idx, model in enumerate(self.models):
space = model['space']
if not space: continue
# 最深部(Ring 4に相当する後半10%のコア層)からエリートを選別
core_start_idx = int(len(space) * 0.9)
core_cells = space[core_start_idx:]
if not core_cells: continue
# エネルギーが高く、キャラクター(特定のクラスへの確信)が立っている細胞をスカウト
elite_cell = max(core_cells, key=lambda c: (c.energy * np.max(c.A)))
elite_cells_per_model.append((m_idx, elite_cell))
# 集団間での文化伝播(シナプス感染)
for src_idx, src_cell in elite_cells_per_model:
target_indices = [i for i in range(self.n_estimators) if i != src_idx]
if not target_indices: continue
dst_idx = random.choice(target_indices)
dst_model = self.models[dst_idx]
dst_core_cells = dst_model['space'][int(len(dst_model['space']) * 0.9):]
if not dst_core_cells: continue
# 伝播先の脳の中で、最も迷っている(エネルギーの低い)細胞にミームをブレンドする
weak_cell = min(dst_core_cells, key=lambda c: c.energy)
# 知識の混ざり合い
weak_cell.w_in = (1.0 - crossover_rate) * weak_cell.w_in + crossover_rate * src_cell.w_in
weak_cell.A = (1.0 - crossover_rate) * weak_cell.A + crossover_rate * src_cell.A
weak_cell.energy = max(weak_cell.energy, src_cell.energy * 0.8) # 文化刺激による活性化
def evolve(self):
for model in self.models:
model['space'], model['graph'] = evolve_and_prune_space(model['space'], model['graph'])
def predict(self, X, run_evolution=True):
"""
推論を実行しつつ、その入力データにリアルタイムに適応して
細胞を更新・分裂・淘汰させる(リアルタイム・オンライン学習モード)
"""
y_pred = []
for idx in range(len(X)):
ensemble_perceptions = []
# 各ベースモデル(脳)がデータに反応し、同時に学習・更新を行う
for model in self.models:
# 【変更点】
# 1. target_class=None にすることで、自分の推論結果をターゲット(疑似ラベル)にする
# 2. learning_gate=1.0 にして、推論時もヘブ則や側頭抑制を完全に作動させる
perception, _ = live_vision_cycle(
model['space'], model['graph'], x_64d=X[idx],
target_class=None, # 自身の推論(inferred_class)で自己学習
sensory_drive=1.0,
intrinsic_drive=0.1,
learning_gate=1.0 # ゲートを開放!推論時も動的に更新
)
ensemble_perceptions.append(perception)
# アンサンブルとしての最終決定
avg_perception = np.mean(ensemble_perceptions, axis=0)
final_class = np.argmax(avg_perception)
y_pred.append(final_class)
# 【変更点】細胞を増やす・間引く(新陳代謝の実行)
# サンプルごと(または数サンプルごと)に代謝を回すことで、推論中にリアルタイムに細胞が増減します
if run_evolution and (idx + 1) % 10 == 0: # ここでは10サンプルごとに新陳代謝を実行
self.evolve()
return y_pred
def get_average_cell_count(self):
return int(np.mean([len(m['space']) for m in self.models]))
# =========================================================
# 🚀 6. 交互巡礼・適応ループの実行(訓練 ⇄ 推論の交互サイクル)
# =========================================================
print("=== 蜘蛛の巣ウェブ型・交互生態系システム 起動 ===")
ensemble = EnsembleVisionEcosystem(n_estimators=5, total_cells=INIT_CELLS)
# 全体のマクロサイクル数
MACRO_CYCLES = 1
for m_cycle in range(MACRO_CYCLES):
print(f"\n==================================================")
print(f"🌀 マクロサイクル {m_cycle+1}/{MACRO_CYCLES} 開始")
print(f"==================================================")
# -----------------------------------------------------
# 【フェーズA】訓練サイクル(基底知識のアップデート)
# -----------------------------------------------------
print("\n--- [Phase A: Training] 基礎訓練・睡眠・ミーム交配 ---")
# ☀️ 昼
ensemble.train_cycle(X_train, y_train, phase="day")
# 🌆 夕
ensemble.train_cycle(X_train, y_train, phase="evening")
# 🌙 夜(夢)
ensemble.sleep_cycle(steps=600)
# 🌌 ミーム文化的交配
ensemble.cultural_crossover(crossover_rate=0.2)
# 🧬 新陳代謝
ensemble.evolve()
avg_cells = ensemble.get_average_cell_count()
print(f"訓練完了 | 平均現存細胞数: {avg_cells}")
# -----------------------------------------------------
# 【フェーズB】推論&リアルタイム適応(5回連続パス)
# -----------------------------------------------------
print("\n--- [Phase B: Inference & Adaptation] 5回連続推論実験 ---")
# このマクロサイクル内での推論ログ
pass_logs = []
for pass_idx in range(3):
# 推論を実行しつつ、リアルタイムに自己更新(learning_gate=1.0)と代謝を実行
y_pred = ensemble.predict(X_test, run_evolution=True)
acc = accuracy_score(y_test, y_pred)
avg_cells_infer = ensemble.get_average_cell_count()
pass_logs.append({
'Pass': pass_idx + 1,
'Accuracy': acc,
'Avg Cells': avg_cells_infer,
})
print(f" Pass {pass_idx+1}/5 | 正解率: {acc:.4f} | 平均細胞数: {avg_cells_infer}")
# --- 現在のサイクルのサマリー表示 ---
print(f"\n【マクロサイクル {m_cycle+1} の実験結果】")
print(f" Pass | Accuracy | Avg Cells ")
print(" -------------------------------")
for log in pass_logs:
print(f" {log['Pass']:<4} | {log['Accuracy']:<10.4f} | {log['Avg Cells']:<10}")
print("\n=== すべての交互サイクルが完了しました ===")
# 🔄 6.5. 実験:同じテストデータを5回連続で流す(生態追跡)
# =========================================================
print("\n=== 実験: 同一テストセットを5回連続で推論(動的適応の観察) ===")
# 進化ログの格納用
pass_logs = []
# テストデータをシャffleせず、完全に同じ順序で2回流す
for pass_idx in range(2):
# predict内部で学習(learning_gate=1.0)と、10サンプルごとのevolve()が走る
y_pred = ensemble.predict(X_test, run_evolution=True)
acc = accuracy_score(y_test, y_pred)
# 現在のアンサンブルの平均細胞数を取得
avg_cells = ensemble.get_average_cell_count()
# グラフの総エッジ数(リンク数)を簡易計算
total_edges = sum(len(m['graph']) for m in ensemble.models)
pass_logs.append({
'Pass': pass_idx + 1,
'Accuracy': acc,
'Avg Cells': avg_cells,
})
print(f"Pass {pass_idx+1}/5 | 正解率: {acc:.4f} | 平均細胞数: {avg_cells}")
# --- 結果のテーブル表示 ---
print("\n【実験結果サマリー】")
print(f"{'Pass':<6} | {'Accuracy':<10} | {'Avg Cells':<10}")
print("-" * 34)
for log in pass_logs:
print(f"{log['Pass']:<6} | {log['Accuracy']:<10.4f} | {log['Avg Cells']:<10}")
# =========================================================
# 7. ダッシュボード:【 空想状態(Daydream) 】の可視化
# =========================================================
print("\n[Generating Daydream Images from the First Evolved Ecosystem...]")
fig, axes = plt.subplots(2, 5, figsize=(12, 5))
fig.suptitle("Daydreaming Mode (Top-Down Drive via Double Edges) - Model #1\nVisualizing the 'Ideals' of digits polished by Structural Control", fontsize=14)
first_model = ensemble.models[0]
for cls in range(10):
_, daydream_image = live_vision_cycle(first_model['space'], first_model['graph'],
sensory_drive=0.0, intrinsic_drive=0.2, top_down_drive=1.0,
target_recall_class=cls, learning_gate=0.0)
ax = axes[cls // 5, cls % 5]
ax.imshow(daydream_image.reshape(8, 8), cmap="bone")
ax.set_title(f"Concept: {cls}")
ax.axis("off")
plt.tight_layout()
plt.show()